Skip to content

Commit 026e684

Browse files
authored
Merge pull request #439 from lutovich/1.5-auto-read-issue
Fixed auto-read problem with result summary
2 parents 9765378 + 7356f97 commit 026e684

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ else if ( summary != null )
170170
{
171171
if ( summaryFuture == null )
172172
{
173+
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
174+
// future will be completed with summary on SUCCESS and completed exceptionally on FAILURE
175+
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
176+
connection.enableAutoRead();
173177
summaryFuture = new CompletableFuture<>();
174178
}
175179
return summaryFuture;
@@ -190,6 +194,10 @@ else if ( finished )
190194
{
191195
if ( failureFuture == null )
192196
{
197+
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
198+
// future will be completed with null on SUCCESS and completed with Throwable on FAILURE
199+
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
200+
connection.enableAutoRead();
193201
failureFuture = new CompletableFuture<>();
194202
}
195203
return failureFuture;

driver/src/test/java/org/neo4j/driver/internal/handlers/PullAllResponseHandlerTest.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,6 @@ public void shouldNotDisableAutoReadWhenSummaryRequested()
708708
handler.onRecord( values( "a", "b" ) );
709709
}
710710

711-
verify( connection, never() ).enableAutoRead();
712711
verify( connection, never() ).disableAutoRead();
713712

714713
handler.onSuccess( emptyMap() );
@@ -745,7 +744,6 @@ public void shouldNotDisableAutoReadWhenFailureRequested()
745744
handler.onRecord( values( 123, 456 ) );
746745
}
747746

748-
verify( connection, never() ).enableAutoRead();
749747
verify( connection, never() ).disableAutoRead();
750748

751749
IllegalStateException error = new IllegalStateException( "Wrong config" );
@@ -766,6 +764,61 @@ public void shouldNotDisableAutoReadWhenFailureRequested()
766764
assertNull( await( handler.nextAsync() ) );
767765
}
768766

767+
@Test
768+
public void shouldEnableAutoReadOnConnectionWhenFailureRequestedButNotAvailable() throws Exception
769+
{
770+
Connection connection = connectionMock();
771+
PullAllResponseHandler handler = newHandler( asList( "key1", "key2" ), connection );
772+
773+
handler.onRecord( values( 1, 2 ) );
774+
handler.onRecord( values( 3, 4 ) );
775+
776+
verify( connection, never() ).enableAutoRead();
777+
verify( connection, never() ).disableAutoRead();
778+
779+
CompletableFuture<Throwable> failureFuture = handler.failureAsync().toCompletableFuture();
780+
assertFalse( failureFuture.isDone() );
781+
782+
verify( connection ).enableAutoRead();
783+
verify( connection, never() ).disableAutoRead();
784+
785+
assertNotNull( await( handler.nextAsync() ) );
786+
assertNotNull( await( handler.nextAsync() ) );
787+
788+
RuntimeException error = new RuntimeException( "Oh my!" );
789+
handler.onFailure( error );
790+
791+
assertTrue( failureFuture.isDone() );
792+
assertEquals( error, failureFuture.get() );
793+
}
794+
795+
@Test
796+
public void shouldEnableAutoReadOnConnectionWhenSummaryRequestedButNotAvailable() throws Exception
797+
{
798+
Connection connection = connectionMock();
799+
PullAllResponseHandler handler = newHandler( asList( "key1", "key2", "key3" ), connection );
800+
801+
handler.onRecord( values( 1, 2, 3 ) );
802+
handler.onRecord( values( 4, 5, 6 ) );
803+
804+
verify( connection, never() ).enableAutoRead();
805+
verify( connection, never() ).disableAutoRead();
806+
807+
CompletableFuture<ResultSummary> summaryFuture = handler.summaryAsync().toCompletableFuture();
808+
assertFalse( summaryFuture.isDone() );
809+
810+
verify( connection ).enableAutoRead();
811+
verify( connection, never() ).disableAutoRead();
812+
813+
assertNotNull( await( handler.nextAsync() ) );
814+
assertNotNull( await( handler.nextAsync() ) );
815+
816+
handler.onSuccess( emptyMap() );
817+
818+
assertTrue( summaryFuture.isDone() );
819+
assertNotNull( summaryFuture.get() );
820+
}
821+
769822
private static PullAllResponseHandler newHandler()
770823
{
771824
return newHandler( new Statement( "RETURN 1" ) );

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,52 @@ public void shouldAllowAccessingRecordsAfterSessionClosed()
14051405
}
14061406
}
14071407

1408+
@Test
1409+
public void shouldAllowToConsumeRecordsSlowlyAndCloseSession() throws InterruptedException
1410+
{
1411+
Session session = neo4j.driver().session();
1412+
1413+
StatementResult result = session.run( "UNWIND range(10000, 0, -1) AS x RETURN 10 / x" );
1414+
1415+
// consume couple records slowly with a sleep in-between
1416+
for ( int i = 0; i < 10; i++ )
1417+
{
1418+
assertTrue( result.hasNext() );
1419+
assertNotNull( result.next() );
1420+
Thread.sleep( 50 );
1421+
}
1422+
1423+
try
1424+
{
1425+
session.close();
1426+
fail( "Exception expected" );
1427+
}
1428+
catch ( ClientException e )
1429+
{
1430+
assertThat( e, is( arithmeticError() ) );
1431+
}
1432+
}
1433+
1434+
@Test
1435+
public void shouldAllowToConsumeRecordsSlowlyAndRetrieveSummary() throws InterruptedException
1436+
{
1437+
try ( Session session = neo4j.driver().session() )
1438+
{
1439+
StatementResult result = session.run( "UNWIND range(8000, 1, -1) AS x RETURN 42 / x" );
1440+
1441+
// consume couple records slowly with a sleep in-between
1442+
for ( int i = 0; i < 12; i++ )
1443+
{
1444+
assertTrue( result.hasNext() );
1445+
assertNotNull( result.next() );
1446+
Thread.sleep( 50 );
1447+
}
1448+
1449+
ResultSummary summary = result.summary();
1450+
assertNotNull( summary );
1451+
}
1452+
}
1453+
14081454
private void assumeServerIs31OrLater()
14091455
{
14101456
ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );

0 commit comments

Comments
 (0)