Skip to content

Commit f55cbcd

Browse files
authored
Merge pull request #423 from lutovich/1.5-consume-failures-when-tx-close
Propagate not consumed failures when closing session and transaction
2 parents 5ecb109 + cc6e520 commit f55cbcd

15 files changed

+1197
-448
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 93 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,20 @@
2020

2121
import java.util.Map;
2222
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
2425
import java.util.function.BiConsumer;
26+
import java.util.function.BiFunction;
2527

28+
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2629
import org.neo4j.driver.internal.async.QueryRunner;
30+
import org.neo4j.driver.internal.async.ResultCursorsHolder;
2731
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
2832
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
2933
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
3034
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
3135
import org.neo4j.driver.internal.spi.Connection;
36+
import org.neo4j.driver.internal.spi.ResponseHandler;
3237
import org.neo4j.driver.internal.types.InternalTypeSystem;
3338
import org.neo4j.driver.v1.Record;
3439
import org.neo4j.driver.v1.Session;
@@ -43,6 +48,7 @@
4348

4449
import static java.util.Collections.emptyMap;
4550
import static java.util.concurrent.CompletableFuture.completedFuture;
51+
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
4652
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4753
import static org.neo4j.driver.internal.util.Futures.getBlocking;
4854
import static org.neo4j.driver.v1.Values.value;
@@ -56,28 +62,36 @@ public class ExplicitTransaction implements Transaction
5662
private enum State
5763
{
5864
/** The transaction is running with no explicit success or failure marked */
59-
ACTIVE,
65+
ACTIVE( true ),
6066

6167
/** Running, user marked for success, meaning it'll value committed */
62-
MARKED_SUCCESS,
68+
MARKED_SUCCESS( true ),
6369

6470
/** User marked as failed, meaning it'll be rolled back. */
65-
MARKED_FAILED,
71+
MARKED_FAILED( true ),
6672

6773
/**
6874
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
6975
*/
70-
TERMINATED,
76+
TERMINATED( false ),
7177

7278
/** This transaction has successfully committed */
73-
COMMITTED,
79+
COMMITTED( false ),
7480

7581
/** This transaction has been rolled back */
76-
ROLLED_BACK
82+
ROLLED_BACK( false );
83+
84+
final boolean txOpen;
85+
86+
State( boolean txOpen )
87+
{
88+
this.txOpen = txOpen;
89+
}
7790
}
7891

7992
private final Connection connection;
8093
private final NetworkSession session;
94+
private final ResultCursorsHolder resultCursors;
8195

8296
private volatile Bookmark bookmark = Bookmark.empty();
8397
private volatile State state = State.ACTIVE;
@@ -86,6 +100,7 @@ public ExplicitTransaction( Connection connection, NetworkSession session )
86100
{
87101
this.connection = connection;
88102
this.session = session;
103+
this.resultCursors = new ResultCursorsHolder();
89104
}
90105

91106
public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
@@ -162,7 +177,9 @@ else if ( state == State.TERMINATED )
162177
}
163178
else
164179
{
165-
return doCommitAsync().whenComplete( transactionClosed( State.COMMITTED ) );
180+
return resultCursors.retrieveNotConsumedError()
181+
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
182+
.whenComplete( transactionClosed( State.COMMITTED ) );
166183
}
167184
}
168185

@@ -185,38 +202,12 @@ else if ( state == State.TERMINATED )
185202
}
186203
else
187204
{
188-
return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) );
205+
return resultCursors.retrieveNotConsumedError()
206+
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
207+
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
189208
}
190209
}
191210

192-
private BiConsumer<Void,Throwable> transactionClosed( State newState )
193-
{
194-
return ( ignore, error ) ->
195-
{
196-
state = newState;
197-
connection.releaseInBackground();
198-
session.setBookmark( bookmark );
199-
};
200-
}
201-
202-
private CompletionStage<Void> doCommitAsync()
203-
{
204-
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
205-
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
206-
new CommitTxResponseHandler( commitFuture, this ) );
207-
208-
return commitFuture.thenRun( () -> state = State.COMMITTED );
209-
}
210-
211-
private CompletionStage<Void> doRollbackAsync()
212-
{
213-
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
214-
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
215-
new RollbackTxResponseHandler( rollbackFuture ) );
216-
217-
return rollbackFuture.thenRun( () -> state = State.ROLLED_BACK );
218-
}
219-
220211
@Override
221212
public StatementResult run( String statementText, Value statementParameters )
222213
{
@@ -273,23 +264,31 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
273264
@Override
274265
public StatementResult run( Statement statement )
275266
{
276-
ensureCanRunQueries();
277-
StatementResultCursor cursor = getBlocking( QueryRunner.runAsBlocking( connection, statement, this ) );
267+
StatementResultCursor cursor = getBlocking( run( statement, false ) );
278268
return new InternalStatementResult( cursor );
279269
}
280270

281271
@Override
282272
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
283273
{
284-
ensureCanRunQueries();
285274
//noinspection unchecked
286-
return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this );
275+
return (CompletionStage) run( statement, true );
287276
}
288277

289-
@Override
290-
public boolean isOpen()
278+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean asAsync )
291279
{
292-
return state != State.COMMITTED && state != State.ROLLED_BACK && state != State.TERMINATED;
280+
ensureCanRunQueries();
281+
CompletionStage<InternalStatementResultCursor> cursorStage;
282+
if ( asAsync )
283+
{
284+
cursorStage = QueryRunner.runAsAsync( connection, statement, this );
285+
}
286+
else
287+
{
288+
cursorStage = QueryRunner.runAsBlocking( connection, statement, this );
289+
}
290+
resultCursors.add( cursorStage );
291+
return cursorStage;
293292
}
294293

295294
private void ensureCanRunQueries()
@@ -317,6 +316,12 @@ else if ( state == State.TERMINATED )
317316
}
318317
}
319318

319+
@Override
320+
public boolean isOpen()
321+
{
322+
return state.txOpen;
323+
}
324+
320325
@Override
321326
public TypeSystem typeSystem()
322327
{
@@ -340,4 +345,49 @@ public void setBookmark( Bookmark bookmark )
340345
this.bookmark = bookmark;
341346
}
342347
}
348+
349+
private CompletionStage<Void> doCommitAsync()
350+
{
351+
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
352+
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this );
353+
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
354+
return commitFuture;
355+
}
356+
357+
private CompletionStage<Void> doRollbackAsync()
358+
{
359+
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
360+
ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture );
361+
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
362+
return rollbackFuture;
363+
}
364+
365+
private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursorFailure )
366+
{
367+
return ( ignore, commitOrRollbackError ) ->
368+
{
369+
if ( cursorFailure != null )
370+
{
371+
throw new CompletionException( completionErrorCause( cursorFailure ) );
372+
}
373+
else if ( commitOrRollbackError != null )
374+
{
375+
throw new CompletionException( completionErrorCause( commitOrRollbackError ) );
376+
}
377+
else
378+
{
379+
return null;
380+
}
381+
};
382+
}
383+
384+
private BiConsumer<Object,Throwable> transactionClosed( State newState )
385+
{
386+
return ( ignore, error ) ->
387+
{
388+
state = newState;
389+
connection.releaseInBackground();
390+
session.setBookmark( bookmark );
391+
};
392+
}
343393
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2828
import org.neo4j.driver.internal.async.QueryRunner;
29+
import org.neo4j.driver.internal.async.ResultCursorsHolder;
2930
import org.neo4j.driver.internal.logging.DelegatingLogger;
3031
import org.neo4j.driver.internal.retry.RetryLogic;
3132
import org.neo4j.driver.internal.spi.Connection;
@@ -59,12 +60,12 @@ public class NetworkSession implements Session
5960
private final ConnectionProvider connectionProvider;
6061
private final AccessMode mode;
6162
private final RetryLogic retryLogic;
63+
private final ResultCursorsHolder resultCursors;
6264
protected final Logger logger;
6365

6466
private volatile Bookmark bookmark = Bookmark.empty();
6567
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedFuture( null );
6668
private volatile CompletionStage<Connection> connectionStage = completedFuture( null );
67-
private volatile CompletionStage<InternalStatementResultCursor> lastResultStage = completedFuture( null );
6869

6970
private final AtomicBoolean open = new AtomicBoolean( true );
7071

@@ -74,6 +75,7 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
7475
this.connectionProvider = connectionProvider;
7576
this.mode = mode;
7677
this.retryLogic = retryLogic;
78+
this.resultCursors = new ResultCursorsHolder();
7779
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
7880
}
7981

@@ -153,45 +155,34 @@ public boolean isOpen()
153155
@Override
154156
public void close()
155157
{
156-
if ( open.compareAndSet( true, false ) )
157-
{
158-
// todo: should closeAsync() also do this waiting for buffered result?
159-
// todo: unit test result buffering?
160-
getBlocking( lastResultStage
161-
.exceptionally( error -> null )
162-
.thenCompose( this::ensureBuffered )
163-
.thenCompose( error -> releaseResources().thenApply( ignore ->
164-
{
165-
if ( error != null )
166-
{
167-
throw new CompletionException( error );
168-
}
169-
return null;
170-
} ) ) );
171-
}
158+
getBlocking( closeAsync() );
172159
}
173160

174161
@Override
175162
public CompletionStage<Void> closeAsync()
176163
{
177-
// todo: wait for buffered result?
178164
if ( open.compareAndSet( true, false ) )
179165
{
180-
return releaseResources();
166+
return resultCursors.retrieveNotConsumedError()
167+
.thenCompose( error -> releaseResources().thenApply( ignore ->
168+
{
169+
Throwable queryError = Futures.completionErrorCause( error );
170+
if ( queryError != null )
171+
{
172+
// connection has been acquired and there is an unconsumed error in result cursor
173+
throw new CompletionException( queryError );
174+
}
175+
else
176+
{
177+
// either connection acquisition failed or
178+
// there are no unconsumed errors in the result cursor
179+
return null;
180+
}
181+
} ) );
181182
}
182183
return completedFuture( null );
183184
}
184185

185-
// todo: test this method
186-
CompletionStage<Throwable> ensureBuffered( InternalStatementResultCursor cursor )
187-
{
188-
if ( cursor == null )
189-
{
190-
return completedFuture( null );
191-
}
192-
return cursor.resultBuffered();
193-
}
194-
195186
@Override
196187
public Transaction beginTransaction()
197188
{
@@ -421,7 +412,7 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
421412
{
422413
ensureSessionIsOpen();
423414

424-
lastResultStage = ensureNoOpenTxBeforeRunningQuery()
415+
CompletionStage<InternalStatementResultCursor> cursorStage = ensureNoOpenTxBeforeRunningQuery()
425416
.thenCompose( ignore -> acquireConnection( mode ) )
426417
.thenCompose( connection ->
427418
{
@@ -435,7 +426,8 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
435426
}
436427
} );
437428

438-
return lastResultStage;
429+
resultCursors.add( cursorStage );
430+
return cursorStage;
439431
}
440432

441433
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
@@ -496,7 +488,7 @@ private CompletionStage<Void> rollbackTransaction()
496488
} ).exceptionally( error ->
497489
{
498490
Throwable cause = Futures.completionErrorCause( error );
499-
logger.error( "Failed to rollback active transaction", cause );
491+
logger.warn( "Active transaction rolled back with an error", cause );
500492
return null;
501493
} );
502494
}

driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.neo4j.driver.v1.util.Function;
3535
import org.neo4j.driver.v1.util.Functions;
3636

37+
// todo: unit tests
3738
public class InternalStatementResultCursor implements StatementResultCursor
3839
{
3940
// todo: maybe smth better than these two string constants?
@@ -142,10 +143,9 @@ public <T> CompletionStage<List<T>> listAsync( Function<Record,T> mapFunction )
142143
return resultFuture;
143144
}
144145

145-
// todo: test this method and give it better name
146-
public CompletionStage<Throwable> resultBuffered()
146+
public CompletionStage<Throwable> failureAsync()
147147
{
148-
return pullAllHandler.resultBuffered();
148+
return pullAllHandler.failureAsync();
149149
}
150150

151151
private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Void> resultFuture )

driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Conn
4848
}
4949

5050
public static CompletionStage<InternalStatementResultCursor> runAsBlocking( Connection connection,
51-
Statement statement,
52-
ExplicitTransaction tx )
51+
Statement statement, ExplicitTransaction tx )
5352
{
5453
return runAsAsync( connection, statement, tx, false );
5554
}
@@ -61,8 +60,7 @@ public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connect
6160
}
6261

6362
public static CompletionStage<InternalStatementResultCursor> runAsAsync( Connection connection,
64-
Statement statement,
65-
ExplicitTransaction tx )
63+
Statement statement, ExplicitTransaction tx )
6664
{
6765
return runAsAsync( connection, statement, tx, true );
6866
}

0 commit comments

Comments
 (0)