Skip to content

Commit a51a15f

Browse files
committed
Session#closeAsync() waits result to be fetched
This commit aligns behaviour of `Session#close()` and `Session#closeAsync()` so that later one also waits for the latest result to be fully fetched. It makes possible to consume result after session is closed and allows propagation of query errors in `#closeAsync()`. Errors are propagated only if they were not consumed by reading the result cursor.
1 parent d8c0d37 commit a51a15f

File tree

7 files changed

+166
-50
lines changed

7 files changed

+166
-50
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -153,43 +153,42 @@ public boolean isOpen()
153153
@Override
154154
public void close()
155155
{
156-
if ( open.compareAndSet( true, false ) )
157-
{
158-
// todo: should closeAsync() also do this waiting for buffered result?
159-
// todo: unit test result buffering?
160-
getBlocking( lastResultStage
161-
.exceptionally( error -> null )
162-
.thenCompose( this::ensureBuffered )
163-
.thenCompose( error -> releaseResources().thenApply( ignore ->
164-
{
165-
if ( error != null )
166-
{
167-
throw new CompletionException( error );
168-
}
169-
return null;
170-
} ) ) );
171-
}
156+
getBlocking( closeAsync() );
172157
}
173158

174159
@Override
175160
public CompletionStage<Void> closeAsync()
176161
{
177-
// todo: wait for buffered result?
178162
if ( open.compareAndSet( true, false ) )
179163
{
180-
return releaseResources();
164+
return lastResultStage.thenCompose( this::receiveError )
165+
.exceptionally( error -> error ) // connection acquisition or RUN failed, propagate error
166+
.thenCompose( error -> releaseResources().thenApply( connectionReleased ->
167+
{
168+
Throwable queryError = Futures.completionErrorCause( error );
169+
if ( queryError != null && connectionReleased )
170+
{
171+
// connection has been acquired and there is an unconsumed error in result cursor
172+
throw new CompletionException( queryError );
173+
}
174+
else
175+
{
176+
// either connection acquisition failed or
177+
// there are no unconsumed errors in the result cursor
178+
return null;
179+
}
180+
} ) );
181181
}
182182
return completedFuture( null );
183183
}
184184

185-
// todo: test this method
186-
CompletionStage<Throwable> ensureBuffered( InternalStatementResultCursor cursor )
185+
private CompletionStage<Throwable> receiveError( InternalStatementResultCursor cursor )
187186
{
188187
if ( cursor == null )
189188
{
190189
return completedFuture( null );
191190
}
192-
return cursor.resultBuffered();
191+
return cursor.failureAsync();
193192
}
194193

195194
@Override
@@ -477,11 +476,21 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
477476
return connectionStage;
478477
}
479478

480-
private CompletionStage<Void> releaseResources()
479+
/**
480+
* Rollback existing transaction and release existing connection.
481+
*
482+
* @return {@link CompletionStage} as returned by {@link #releaseConnectionNow()}.
483+
*/
484+
private CompletionStage<Boolean> releaseResources()
481485
{
482486
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
483487
}
484488

489+
/**
490+
* Rollback existing transaction, if any. Errors will be ignored.
491+
*
492+
* @return {@link CompletionStage} completed with {@code null} when transaction rollback completes or fails.
493+
*/
485494
private CompletionStage<Void> rollbackTransaction()
486495
{
487496
return existingTransactionOrNull().thenCompose( tx ->
@@ -499,15 +508,22 @@ private CompletionStage<Void> rollbackTransaction()
499508
} );
500509
}
501510

502-
private CompletionStage<Void> releaseConnectionNow()
511+
/**
512+
* Release existing connection or do nothing when none has been acquired.
513+
*
514+
* @return {@link CompletionStage} completed with {@code true} when there was a connection and it has been released,
515+
* {@link CompletionStage} completed with {@code false} when connection has not been acquired and nothing has been
516+
* released.
517+
*/
518+
private CompletionStage<Boolean> releaseConnectionNow()
503519
{
504520
return existingConnectionOrNull().thenCompose( connection ->
505521
{
506522
if ( connection != null )
507523
{
508-
return connection.releaseNow();
524+
return connection.releaseNow().thenApply( ignore -> true );
509525
}
510-
return completedFuture( null );
526+
return completedFuture( false );
511527
} );
512528
}
513529

driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.v1.util.Function;
3535
import org.neo4j.driver.v1.util.Functions;
3636

37+
// todo: unit tests
3738
public class InternalStatementResultCursor implements StatementResultCursor
3839
{
3940
// todo: maybe smth better than these two string constants?
@@ -142,10 +143,9 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
142143
return resultFuture;
143144
}
144145

145-
// todo: test this method and give it better name
146-
public CompletionStage<Throwable> resultBuffered()
146+
public CompletionStage<Throwable> failureAsync()
147147
{
148-
return pullAllHandler.resultBuffered();
148+
return pullAllHandler.failureAsync();
149149
}
150150

151151
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )

driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Conn
4848
}
4949

5050
public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Connection connection,
51-
Statement statement,
52-
ExplicitTransaction tx )
51+
Statement statement, ExplicitTransaction tx )
5352
{
5453
return runAsAsync( connection, statement, tx, false );
5554
}
@@ -61,8 +60,7 @@ public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connect
6160
}
6261

6362
public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
64-
Statement statement,
65-
ExplicitTransaction tx )
63+
Statement statement, ExplicitTransaction tx )
6664
{
6765
return runAsAsync( connection, statement, tx, true );
6866
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static java.util.concurrent.CompletableFuture.completedFuture;
5050
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5151

52+
// todo: unit tests
5253
public abstract class PullAllResponseHandler implements ResponseHandler
5354
{
5455
private static final boolean TOUCH_AUTO_READ = false;
@@ -59,17 +60,15 @@ public abstract class PullAllResponseHandler implements ResponseHandler
5960

6061
private final Queue<Record> records = new LinkedList<>();
6162

62-
// todo: use presence of summary as a "finished" indicator and remove this field
6363
private boolean finished;
6464
private Throwable failure;
6565
private ResultSummary summary;
6666

6767
private CompletableFuture<Record> recordFuture;
6868
private CompletableFuture<ResultSummary> summaryFuture;
69-
private CompletableFuture<Throwable> resultBufferedFuture;
69+
private CompletableFuture<Throwable> failureFuture;
7070

71-
public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
72-
Connection connection )
71+
public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, Connection connection )
7372
{
7473
this.statement = requireNonNull( statement );
7574
this.runResponseHandler = requireNonNull( runResponseHandler );
@@ -86,7 +85,7 @@ public synchronized void onSuccess( Map<String,Value> metadata )
8685

8786
completeRecordFuture( null );
8887
completeSummaryFuture( summary );
89-
completeResultBufferedFuture( null );
88+
completeFailureFuture( null );
9089
}
9190

9291
protected abstract void afterSuccess();
@@ -104,21 +103,22 @@ public synchronized void onFailure( Throwable error )
104103
{
105104
// error propagated through record future, complete other two
106105
completeSummaryFuture( summary );
107-
completeResultBufferedFuture( null );
106+
completeFailureFuture( null );
108107
}
109108
else
110109
{
111110
boolean failedSummaryFuture = failSummaryFuture( error );
112111
if ( failedSummaryFuture )
113112
{
114113
// error propagated through summary future, complete other one
115-
completeResultBufferedFuture( null );
114+
completeFailureFuture( null );
116115
}
117116
else
118117
{
119-
boolean completedResultBufferedFuture = completeResultBufferedFuture( error );
120-
if ( !completedResultBufferedFuture )
118+
boolean completedFailureFuture = completeFailureFuture( error );
119+
if ( !completedFailureFuture )
121120
{
121+
// error has not been propagated to the user, remember it
122122
failure = error;
123123
}
124124
}
@@ -189,7 +189,7 @@ public synchronized CompletionStage<ResultSummary> summaryAsync()
189189
}
190190
}
191191

192-
public synchronized CompletionStage<Throwable> resultBuffered()
192+
public synchronized CompletionStage<Throwable> failureAsync()
193193
{
194194
if ( failure != null )
195195
{
@@ -203,11 +203,11 @@ else if ( finished )
203203
}
204204
else
205205
{
206-
if ( resultBufferedFuture == null )
206+
if ( failureFuture == null )
207207
{
208-
resultBufferedFuture = new CompletableFuture<>();
208+
failureFuture = new CompletableFuture<>();
209209
}
210-
return resultBufferedFuture;
210+
return failureFuture;
211211
}
212212
}
213213

@@ -280,12 +280,12 @@ private boolean failSummaryFuture( Throwable error )
280280
return false;
281281
}
282282

283-
private boolean completeResultBufferedFuture( Throwable error )
283+
private boolean completeFailureFuture( Throwable error )
284284
{
285-
if ( resultBufferedFuture != null )
285+
if ( failureFuture != null )
286286
{
287-
CompletableFuture<Throwable> future = resultBufferedFuture;
288-
resultBufferedFuture = null;
287+
CompletableFuture<Throwable> future = failureFuture;
288+
failureFuture = null;
289289
future.complete( error );
290290
return true;
291291
}

driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.internal.spi.Connection;
3535
import org.neo4j.driver.internal.spi.ConnectionProvider;
3636
import org.neo4j.driver.internal.spi.ResponseHandler;
37+
import org.neo4j.driver.internal.util.ServerVersion;
3738
import org.neo4j.driver.internal.util.Supplier;
3839
import org.neo4j.driver.v1.AccessMode;
3940
import org.neo4j.driver.v1.Session;
@@ -88,6 +89,7 @@ public void setUp()
8889
{
8990
connection = connectionMock();
9091
when( connection.releaseNow() ).thenReturn( completedFuture( null ) );
92+
when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 );
9193
connectionProvider = mock( ConnectionProvider.class );
9294
when( connectionProvider.acquireConnection( any( AccessMode.class ) ) )
9395
.thenReturn( completedFuture( connection ) );
@@ -247,8 +249,15 @@ public void acquiresNewConnectionWhenUnableToUseCurrentOneForRun()
247249
}
248250

249251
@Test
250-
public void forceReleasesOpenConnectionUsedForRunWhenSessionIsClosed()
252+
public void releasesOpenConnectionUsedForRunWhenSessionIsClosed()
251253
{
254+
doAnswer( invocation ->
255+
{
256+
ResponseHandler pullAllHandler = invocation.getArgumentAt( 3, ResponseHandler.class );
257+
pullAllHandler.onSuccess( emptyMap() );
258+
return null;
259+
} ).when( connection ).runAndFlush( eq( "RETURN 1" ), eq( emptyMap() ), any(), any() );
260+
252261
session.run( "RETURN 1" );
253262

254263
getBlocking( session.closeAsync() );

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,50 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection()
720720
assertNull( getBlocking( tx.rollbackAsync() ) );
721721
}
722722

723+
@Test
724+
public void shouldPropagateRunFailureWhenClosed()
725+
{
726+
session.runAsync( "RETURN 10 / 0" );
727+
728+
try
729+
{
730+
getBlocking( session.closeAsync() );
731+
fail( "Exception expected" );
732+
}
733+
catch ( ClientException e )
734+
{
735+
assertThat( e.getMessage(), containsString( "/ by zero" ) );
736+
}
737+
}
738+
739+
@Test
740+
public void shouldPropagatePullAllFailureWhenClosed()
741+
{
742+
session.runAsync( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" );
743+
744+
try
745+
{
746+
getBlocking( session.closeAsync() );
747+
fail( "Exception expected" );
748+
}
749+
catch ( ClientException e )
750+
{
751+
assertThat( e.getMessage(), containsString( "/ by zero" ) );
752+
}
753+
}
754+
755+
@Test
756+
public void shouldBePossibleToConsumeResultAfterSessionIsClosed()
757+
{
758+
CompletionStage<StatementResultCursor> cursorStage = session.runAsync( "UNWIND range(1, 20000) AS x RETURN x" );
759+
760+
getBlocking( session.closeAsync() );
761+
762+
StatementResultCursor cursor = getBlocking( cursorStage );
763+
List<Integer> ints = getBlocking( cursor.listAsync( record -> record.get( 0 ).asInt() ) );
764+
assertEquals( 20000, ints.size() );
765+
}
766+
723767
private Future<List<CompletionStage<Record>>> runNestedQueries( StatementResultCursor inputCursor )
724768
{
725769
CompletableFuture<List<CompletionStage<Record>>> resultFuture = new CompletableFuture<>();

driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,55 @@ public Void execute( Transaction tx )
12031203
}
12041204
}
12051205

1206+
@Test
1207+
public void shouldPropagateRunFailureWhenClosed()
1208+
{
1209+
Session session = neo4j.driver().session();
1210+
1211+
session.run( "RETURN 10 / 0" );
1212+
1213+
try
1214+
{
1215+
session.close();
1216+
fail( "Exception expected" );
1217+
}
1218+
catch ( ClientException e )
1219+
{
1220+
assertThat( e.getMessage(), containsString( "/ by zero" ) );
1221+
}
1222+
}
1223+
1224+
@Test
1225+
public void shouldPropagatePullAllFailureWhenClosed()
1226+
{
1227+
Session session = neo4j.driver().session();
1228+
1229+
session.run( "UNWIND range(20000, 0, -1) AS x RETURN 10 / x" );
1230+
1231+
try
1232+
{
1233+
session.close();
1234+
fail( "Exception expected" );
1235+
}
1236+
catch ( ClientException e )
1237+
{
1238+
assertThat( e.getMessage(), containsString( "/ by zero" ) );
1239+
}
1240+
}
1241+
1242+
@Test
1243+
public void shouldBePossibleToConsumeResultAfterSessionIsClosed()
1244+
{
1245+
StatementResult result;
1246+
try ( Session session = neo4j.driver().session() )
1247+
{
1248+
result = session.run( "UNWIND range(1, 20000) AS x RETURN x" );
1249+
}
1250+
1251+
List<Integer> ints = result.list( record -> record.get( 0 ).asInt() );
1252+
assertEquals( 20000, ints.size() );
1253+
}
1254+
12061255
private void assumeServerIs31OrLater()
12071256
{
12081257
ServerVersion serverVersion = ServerVersion.version( neo4j.driver() );

0 commit comments

Comments
 (0)