Skip to content

Commit 91e0bb4

Browse files
author
Zhen Li
committed
Ensure StatementResult#summary will discard all local or remote records.
Feature left: Nested session runs should buffer all unconsumed records into memory. AutoPull handler does not support auto read depending on local record buffer.
1 parent 47f709d commit 91e0bb4

17 files changed

+81
-119
lines changed

driver/src/main/java/org/neo4j/driver/internal/handlers/pulln/AutoPullResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,13 @@ public synchronized CompletionStage<ResultSummary> summaryAsync()
155155
{
156156
if ( isDone() )
157157
{
158+
records.clear();
158159
return completedWithValueIfNoFailure( summary );
159160
}
160161
else
161162
{
162163
cancel();
164+
records.clear();
163165
if ( summaryFuture == null )
164166
{
165167
summaryFuture = new CompletableFuture<>();

driver/src/test/java/org/neo4j/driver/integration/ConnectionHandlingIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,15 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitted()
219219
verify( connection1, never() ).release();
220220

221221
StatementResult result = createNodes( 5, tx );
222+
int size = result.list().size();
222223
tx.commit();
223224
tx.close();
224225

225226
Connection connection2 = connectionPool.lastAcquiredConnectionSpy;
226227
assertSame( connection1, connection2 );
227228
verify( connection1 ).release();
228229

229-
assertEquals( 5, result.list().size() );
230+
assertEquals( 5, size );
230231
}
231232

232233
@Test
@@ -240,14 +241,15 @@ void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBack()
240241
verify( connection1, never() ).release();
241242

242243
StatementResult result = createNodes( 8, tx );
244+
int size = result.list().size();
243245
tx.rollback();
244246
tx.close();
245247

246248
Connection connection2 = connectionPool.lastAcquiredConnectionSpy;
247249
assertSame( connection1, connection2 );
248250
verify( connection1 ).release();
249251

250-
assertEquals( 8, result.list().size() );
252+
assertEquals( 8, size );
251253
}
252254

253255
@Test

driver/src/test/java/org/neo4j/driver/integration/ResultStreamIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ void shouldBeAbleToAccessSummaryAfterTransactionFailure()
175175
}
176176

177177
@Test
178-
void shouldBufferRecordsAfterSummary()
178+
void shouldNotBufferRecordsAfterSummary()
179179
{
180180
// Given
181181
StatementResult result = session.run("UNWIND [1,2] AS a RETURN a");
@@ -188,8 +188,7 @@ void shouldBufferRecordsAfterSummary()
188188
assertThat( summary.server().address(), equalTo( "localhost:" + session.boltPort() ) );
189189
assertThat( summary.counters().nodesCreated(), equalTo( 0 ) );
190190

191-
assertThat( result.next().get( "a" ).asInt(), equalTo( 1 ) );
192-
assertThat( result.next().get( "a" ).asInt(), equalTo( 2 ) );
191+
assertFalse( result.hasNext() );
193192
}
194193

195194
@Test

driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ void shouldHandleAcquireReadTransaction() throws IOException, InterruptedExcepti
114114
Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
115115

116116
{
117-
List<String> result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) )
118-
.list( record -> record.get( "n.name" ).asString() );
117+
List<String> result = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" )
118+
.list( record -> record.get( "n.name" ).asString() ) );
119119

120120
assertThat( result, equalTo( asList( "Bob", "Alice", "Tina" ) ) );
121121
}
@@ -960,12 +960,12 @@ void shouldTreatRoutingTableWithSingleRouterAsValid() throws Exception
960960
// read multiple times without additional rediscovery
961961

962962
StatementResult readResult1 = session.run( "MATCH (n) RETURN n.name" );
963-
assertEquals( "127.0.0.1:9003", readResult1.summary().server().address() );
964963
assertEquals( 3, readResult1.list().size() );
964+
assertEquals( "127.0.0.1:9003", readResult1.summary().server().address() );
965965

966966
StatementResult readResult2 = session.run( "MATCH (n) RETURN n.name" );
967-
assertEquals( "127.0.0.1:9004", readResult2.summary().server().address() );
968967
assertEquals( 3, readResult2.list().size() );
968+
assertEquals( "127.0.0.1:9004", readResult2.summary().server().address() );
969969
}
970970
finally
971971
{
@@ -1076,11 +1076,13 @@ void shouldUseResolverDuringRediscoveryWhenExistingRoutersFail() throws Exceptio
10761076
try ( Session session = driver.session() )
10771077
{
10781078
// run first query against 9001, which should return result and exit
1079-
List<String> names1 = session.run( "MATCH (n) RETURN n.name AS name" ).list( record -> record.get( "name" ).asString() );
1079+
List<String> names1 = session.run( "MATCH (n) RETURN n.name AS name" )
1080+
.list( record -> record.get( "name" ).asString() );
10801081
assertEquals( asList( "Alice", "Bob", "Eve" ), names1 );
10811082

10821083
// run second query with retries, it should rediscover using 9042 returned by the resolver and read from 9005
1083-
List<String> names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) ).list( record -> record.get( 0 ).asString() );
1084+
List<String> names2 = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" )
1085+
.list( record -> record.get( 0 ).asString() ) );
10841086
assertEquals( asList( "Bob", "Alice", "Tina" ), names2 );
10851087
}
10861088
}
@@ -1140,7 +1142,7 @@ void shouldRevertToInitialRouterIfKnownRouterThrowsProtocolErrors() throws Excep
11401142
{
11411143
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
11421144
{
1143-
List<Record> records = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ) ).list();
1145+
List<Record> records = session.readTransaction( tx -> tx.run( "MATCH (n) RETURN n.name" ).list() );
11441146
assertEquals( 3, records.size() );
11451147
}
11461148
}

driver/src/test/java/org/neo4j/driver/integration/SessionBoltV3IT.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,16 @@
3030
import java.util.Map;
3131
import java.util.concurrent.CompletionStage;
3232

33+
import org.neo4j.driver.Bookmark;
3334
import org.neo4j.driver.Driver;
35+
import org.neo4j.driver.Record;
3436
import org.neo4j.driver.Session;
3537
import org.neo4j.driver.StatementResult;
3638
import org.neo4j.driver.Transaction;
3739
import org.neo4j.driver.TransactionConfig;
3840
import org.neo4j.driver.async.AsyncSession;
3941
import org.neo4j.driver.async.StatementResultCursor;
4042
import org.neo4j.driver.exceptions.TransientException;
41-
import org.neo4j.driver.Bookmark;
4243
import org.neo4j.driver.internal.cluster.RoutingSettings;
4344
import org.neo4j.driver.internal.messaging.Message;
4445
import org.neo4j.driver.internal.messaging.request.GoodbyeMessage;
@@ -329,12 +330,11 @@ private static void testTransactionMetadataWithAsyncTransactionFunctions( boolea
329330
.build();
330331

331332
// call listTransactions procedure that should list itself with the specified metadata
332-
CompletionStage<StatementResultCursor> cursorFuture =
333-
read ? asyncSession.readTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ), config )
334-
: asyncSession.writeTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ), config );
333+
CompletionStage<Record> singleFuture =
334+
read ? asyncSession.readTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ).thenCompose( StatementResultCursor::singleAsync ), config )
335+
: asyncSession.writeTransactionAsync( tx -> tx.runAsync( "CALL dbms.listTransactions()" ).thenCompose( StatementResultCursor::singleAsync ), config );
335336

336-
CompletionStage<Map<String,Object>> metadataFuture = cursorFuture.thenCompose( StatementResultCursor::singleAsync )
337-
.thenApply( record -> record.get( "metaData" ).asMap() );
337+
CompletionStage<Map<String,Object>> metadataFuture = singleFuture.thenApply( record -> record.get( "metaData" ).asMap() );
338338

339339
assertEquals( metadata, await( metadataFuture ) );
340340
}
@@ -352,10 +352,10 @@ private static void testTransactionMetadataWithTransactionFunctions( boolean rea
352352
.build();
353353

354354
// call listTransactions procedure that should list itself with the specified metadata
355-
StatementResult result = read ? session.readTransaction( tx -> tx.run( "CALL dbms.listTransactions()" ), config )
356-
: session.writeTransaction( tx -> tx.run( "CALL dbms.listTransactions()" ), config );
355+
Record single = read ? session.readTransaction( tx -> tx.run( "CALL dbms.listTransactions()" ).single(), config )
356+
: session.writeTransaction( tx -> tx.run( "CALL dbms.listTransactions()" ).single(), config );
357357

358-
Map<String,Object> receivedMetadata = result.single().get( "metaData" ).asMap();
358+
Map<String,Object> receivedMetadata = single.get( "metaData" ).asMap();
359359

360360
assertEquals( metadata, receivedMetadata );
361361
}

driver/src/test/java/org/neo4j/driver/integration/SessionIT.java

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,9 @@ void readTxRolledBackWithTxFailure()
372372
long answer = session.readTransaction( tx ->
373373
{
374374
StatementResult result = tx.run( "RETURN 42" );
375+
long single = result.single().get( 0 ).asLong();
375376
tx.rollback();
376-
return result.single().get( 0 ).asLong();
377+
return single;
377378
} );
378379
assertEquals( 42, answer );
379380

@@ -786,7 +787,7 @@ void shouldPropagatePullAllFailureWhenClosed()
786787
}
787788

788789
@Test
789-
void shouldBePossibleToConsumeResultAfterSessionIsClosed()
790+
void shouldNotBePossibleToConsumeResultAfterSessionIsClosed()
790791
{
791792
StatementResult result;
792793
try ( Session session = neo4j.driver().session() )
@@ -795,7 +796,7 @@ void shouldBePossibleToConsumeResultAfterSessionIsClosed()
795796
}
796797

797798
List<Integer> ints = result.list( record -> record.get( 0 ).asInt() );
798-
assertEquals( 20000, ints.size() );
799+
assertEquals( 0, ints.size() );
799800
}
800801

801802
@Test
@@ -899,7 +900,7 @@ void shouldNotRetryOnConnectionAcquisitionTimeout()
899900
}
900901

901902
@Test
902-
void shouldAllowConsumingRecordsAfterFailureInSessionClose()
903+
void shouldNotAllowConsumingRecordsAfterFailureInSessionClose()
903904
{
904905
Session session = neo4j.driver().session();
905906

@@ -908,17 +909,11 @@ void shouldAllowConsumingRecordsAfterFailureInSessionClose()
908909
ClientException e = assertThrows( ClientException.class, session::close );
909910
assertThat( e, is( arithmeticError() ) );
910911

911-
assertTrue( result.hasNext() );
912-
assertEquals( 16, result.next().get( 0 ).asInt() );
913-
assertTrue( result.hasNext() );
914-
assertEquals( 8, result.next().get( 0 ).asInt() );
915-
assertTrue( result.hasNext() );
916-
assertEquals( 4, result.next().get( 0 ).asInt() );
917912
assertFalse( result.hasNext() );
918913
}
919914

920915
@Test
921-
void shouldAllowAccessingRecordsAfterSummary()
916+
void shouldNotAllowAccessingRecordsAfterSummary()
922917
{
923918
int recordCount = 10_000;
924919
String query = "UNWIND range(1, " + recordCount + ") AS x RETURN x";
@@ -932,17 +927,12 @@ void shouldAllowAccessingRecordsAfterSummary()
932927
assertEquals( StatementType.READ_ONLY, summary.statementType() );
933928

934929
List<Record> records = result.list();
935-
assertEquals( recordCount, records.size() );
936-
for ( int i = 1; i <= recordCount; i++ )
937-
{
938-
Record record = records.get( i - 1 );
939-
assertEquals( i, record.get( 0 ).asInt() );
940-
}
930+
assertEquals( 0, records.size() );
941931
}
942932
}
943933

944934
@Test
945-
void shouldAllowAccessingRecordsAfterSessionClosed()
935+
void shouldNotAllowAccessingRecordsAfterSessionClosed()
946936
{
947937
int recordCount = 11_333;
948938
String query = "UNWIND range(1, " + recordCount + ") AS x RETURN 'Result-' + x";
@@ -954,12 +944,7 @@ void shouldAllowAccessingRecordsAfterSessionClosed()
954944
}
955945

956946
List<Record> records = result.list();
957-
assertEquals( recordCount, records.size() );
958-
for ( int i = 1; i <= recordCount; i++ )
959-
{
960-
Record record = records.get( i - 1 );
961-
assertEquals( "Result-" + i, record.get( 0 ).asString() );
962-
}
947+
assertEquals( 0, records.size() );
963948
}
964949

965950
@Test
@@ -1402,8 +1387,8 @@ private void testExecuteWriteTx( AccessMode sessionMode )
14021387
String material = session.writeTransaction( tx ->
14031388
{
14041389
StatementResult result = tx.run( "CREATE (s:Shield {material: 'Vibranium'}) RETURN s" );
1405-
tx.commit();
14061390
Record record = result.single();
1391+
tx.commit();
14071392
return record.get( 0 ).asNode().get( "material" ).asString();
14081393
} );
14091394

@@ -1574,8 +1559,9 @@ public Record execute( Transaction tx )
15741559
{
15751560
throw new ServiceUnavailableException( "" );
15761561
}
1562+
Record single = result.single();
15771563
tx.commit();
1578-
return result.single();
1564+
return single;
15791565
}
15801566
}
15811567
}

driver/src/test/java/org/neo4j/driver/integration/async/AsyncSessionIT.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -817,15 +817,15 @@ void shouldCloseCleanlyWhenPullAllErrorConsumed()
817817
}
818818

819819
@Test
820-
void shouldBePossibleToConsumeResultAfterSessionIsClosed()
820+
void shouldNotBePossibleToConsumeResultAfterSessionIsClosed()
821821
{
822822
CompletionStage<StatementResultCursor> cursorStage = session.runAsync( "UNWIND range(1, 20000) AS x RETURN x" );
823823

824824
await( session.closeAsync() );
825825

826826
StatementResultCursor cursor = await( cursorStage );
827827
List<Integer> ints = await( cursor.listAsync( record -> record.get( 0 ).asInt() ) );
828-
assertEquals( 20000, ints.size() );
828+
assertEquals( 0, ints.size() );
829829
}
830830

831831
@Test
@@ -879,7 +879,7 @@ void shouldPropagateFailureFromFirstIllegalQuery()
879879
}
880880

881881
@Test
882-
void shouldAllowAccessingRecordsAfterSummary()
882+
void shouldNotAllowAccessingRecordsAfterSummary()
883883
{
884884
int recordCount = 10_000;
885885
String query = "UNWIND range(1, " + recordCount + ") AS x RETURN 'Hello-' + x";
@@ -899,16 +899,11 @@ void shouldAllowAccessingRecordsAfterSummary()
899899
assertEquals( query, summary.statement().text() );
900900
assertEquals( StatementType.READ_ONLY, summary.statementType() );
901901

902-
assertEquals( recordCount, records.size() );
903-
for ( int i = 1; i <= recordCount; i++ )
904-
{
905-
Record record = records.get( i - 1 );
906-
assertEquals( "Hello-" + i, record.get( 0 ).asString() );
907-
}
902+
assertEquals( 0, records.size() );
908903
}
909904

910905
@Test
911-
void shouldAllowAccessingRecordsAfterSessionClosed()
906+
void shouldNotAllowAccessingRecordsAfterSessionClosed()
912907
{
913908
int recordCount = 7_500;
914909
String query = "UNWIND range(1, " + recordCount + ") AS x RETURN x";
@@ -919,12 +914,7 @@ void shouldAllowAccessingRecordsAfterSessionClosed()
919914

920915
List<Record> records = await( recordsStage );
921916

922-
assertEquals( recordCount, records.size() );
923-
for ( int i = 1; i <= recordCount; i++ )
924-
{
925-
Record record = records.get( i - 1 );
926-
assertEquals( i, record.get( 0 ).asInt() );
927-
}
917+
assertEquals( 0, records.size() );
928918
}
929919

930920
@Test

driver/src/test/java/org/neo4j/driver/integration/async/AsyncTransactionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ private void testConsume( String query )
872872
assertEquals( query, summary.statement().text() );
873873
assertEquals( emptyMap(), summary.statement().parameters().asMap() );
874874

875-
// no records should be available, they should all be summaryd
875+
// no records should be available, they should all be consumed
876876
assertNull( await( cursor.nextAsync() ) );
877877
}
878878
}

0 commit comments

Comments
 (0)