Skip to content

Commit 70a2ea6

Browse files
committed
Consume failures when tx commit/rollback
This commit makes transactions surface unconsumed query errors when doing commit or rollback. It's achieved by keeping track of all result cursors and polling them for unconsumed failures when doing commit or rollback.
1 parent a334569 commit 70a2ea6

File tree

10 files changed

+530
-107
lines changed

10 files changed

+530
-107
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,25 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.ArrayList;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionException;
2326
import java.util.concurrent.CompletionStage;
2427
import java.util.function.BiConsumer;
28+
import java.util.function.BiFunction;
2529

30+
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2631
import org.neo4j.driver.internal.async.QueryRunner;
2732
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
2833
import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
2934
import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
3035
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
3136
import org.neo4j.driver.internal.spi.Connection;
37+
import org.neo4j.driver.internal.spi.ResponseHandler;
3238
import org.neo4j.driver.internal.types.InternalTypeSystem;
39+
import org.neo4j.driver.internal.util.Futures;
3340
import org.neo4j.driver.v1.Record;
3441
import org.neo4j.driver.v1.Session;
3542
import org.neo4j.driver.v1.Statement;
@@ -43,6 +50,7 @@
4350

4451
import static java.util.Collections.emptyMap;
4552
import static java.util.concurrent.CompletableFuture.completedFuture;
53+
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
4654
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4755
import static org.neo4j.driver.internal.util.Futures.getBlocking;
4856
import static org.neo4j.driver.v1.Values.value;
@@ -86,6 +94,7 @@ private enum State
8694
private final Connection connection;
8795
private final NetworkSession session;
8896

97+
private final List<CompletionStage<InternalStatementResultCursor>> resultCursors = new ArrayList<>();
8998
private volatile Bookmark bookmark = Bookmark.empty();
9099
private volatile State state = State.ACTIVE;
91100

@@ -169,7 +178,9 @@ else if ( state == State.TERMINATED )
169178
}
170179
else
171180
{
172-
return doCommitAsync().whenComplete( transactionClosed( State.COMMITTED ) );
181+
return receiveFailures()
182+
.thenCompose( failure -> doCommitAsync().handle( handleCommitOrRollback( failure ) ) )
183+
.whenComplete( transactionClosed( State.COMMITTED ) );
173184
}
174185
}
175186

@@ -192,38 +203,12 @@ else if ( state == State.TERMINATED )
192203
}
193204
else
194205
{
195-
return doRollbackAsync().whenComplete( transactionClosed( State.ROLLED_BACK ) );
206+
return receiveFailures()
207+
.thenCompose( failure -> doRollbackAsync().handle( handleCommitOrRollback( failure ) ) )
208+
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
196209
}
197210
}
198211

199-
private BiConsumer<Void,Throwable> transactionClosed( State newState )
200-
{
201-
return ( ignore, error ) ->
202-
{
203-
state = newState;
204-
connection.releaseInBackground();
205-
session.setBookmark( bookmark );
206-
};
207-
}
208-
209-
private CompletionStage<Void> doCommitAsync()
210-
{
211-
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
212-
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
213-
new CommitTxResponseHandler( commitFuture, this ) );
214-
215-
return commitFuture.thenRun( () -> state = State.COMMITTED );
216-
}
217-
218-
private CompletionStage<Void> doRollbackAsync()
219-
{
220-
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
221-
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE,
222-
new RollbackTxResponseHandler( rollbackFuture ) );
223-
224-
return rollbackFuture.thenRun( () -> state = State.ROLLED_BACK );
225-
}
226-
227212
@Override
228213
public StatementResult run( String statementText, Value statementParameters )
229214
{
@@ -280,23 +265,31 @@ public CompletionStage<StatementResultCursor> runAsync( String statementTemplate
280265
@Override
281266
public StatementResult run( Statement statement )
282267
{
283-
ensureCanRunQueries();
284-
StatementResultCursor cursor = getBlocking( QueryRunner.runAsBlocking( connection, statement, this ) );
268+
StatementResultCursor cursor = getBlocking( run( statement, false ) );
285269
return new InternalStatementResult( cursor );
286270
}
287271

288272
@Override
289273
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
290274
{
291-
ensureCanRunQueries();
292275
//noinspection unchecked
293-
return (CompletionStage) QueryRunner.runAsAsync( connection, statement, this );
276+
return (CompletionStage) run( statement, true );
294277
}
295278

296-
@Override
297-
public boolean isOpen()
279+
private CompletionStage<InternalStatementResultCursor> run( Statement statement, boolean asAsync )
298280
{
299-
return state.txOpen;
281+
ensureCanRunQueries();
282+
CompletionStage<InternalStatementResultCursor> result;
283+
if ( asAsync )
284+
{
285+
result = QueryRunner.runAsAsync( connection, statement, this );
286+
}
287+
else
288+
{
289+
result = QueryRunner.runAsBlocking( connection, statement, this );
290+
}
291+
resultCursors.add( result );
292+
return result;
300293
}
301294

302295
private void ensureCanRunQueries()
@@ -324,6 +317,12 @@ else if ( state == State.TERMINATED )
324317
}
325318
}
326319

320+
@Override
321+
public boolean isOpen()
322+
{
323+
return state.txOpen;
324+
}
325+
327326
@Override
328327
public TypeSystem typeSystem()
329328
{
@@ -347,4 +346,56 @@ public void setBookmark( Bookmark bookmark )
347346
this.bookmark = bookmark;
348347
}
349348
}
349+
350+
private CompletionStage<Void> doCommitAsync()
351+
{
352+
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
353+
ResponseHandler pullAllHandler = new CommitTxResponseHandler( commitFuture, this );
354+
connection.runAndFlush( COMMIT_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
355+
return commitFuture;
356+
}
357+
358+
private CompletionStage<Void> doRollbackAsync()
359+
{
360+
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
361+
ResponseHandler pullAllHandler = new RollbackTxResponseHandler( rollbackFuture );
362+
connection.runAndFlush( ROLLBACK_QUERY, emptyMap(), NoOpResponseHandler.INSTANCE, pullAllHandler );
363+
return rollbackFuture;
364+
}
365+
366+
private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursorFailure )
367+
{
368+
return ( ignore, commitOrRollbackError ) ->
369+
{
370+
if ( cursorFailure != null )
371+
{
372+
throw new CompletionException( completionErrorCause( cursorFailure ) );
373+
}
374+
else if ( commitOrRollbackError != null )
375+
{
376+
throw new CompletionException( completionErrorCause( commitOrRollbackError ) );
377+
}
378+
else
379+
{
380+
return null;
381+
}
382+
};
383+
}
384+
385+
private BiConsumer<Object,Throwable> transactionClosed( State newState )
386+
{
387+
return ( ignore, error ) ->
388+
{
389+
state = newState;
390+
connection.releaseInBackground();
391+
session.setBookmark( bookmark );
392+
};
393+
}
394+
395+
private CompletionStage<Throwable> receiveFailures()
396+
{
397+
return resultCursors.stream()
398+
.map( stage -> stage.thenCompose( InternalStatementResultCursor::failureAsync ) )
399+
.reduce( completedFuture( null ), Futures::firstNotNull );
400+
}
350401
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,13 @@ public CompletionStage<Void> closeAsync()
161161
{
162162
if ( open.compareAndSet( true, false ) )
163163
{
164-
return lastResultStage.thenCompose( this::receiveError )
165-
.exceptionally( error -> error ) // connection acquisition or RUN failed, propagate error
166-
.thenCompose( error -> releaseResources().thenApply( connectionReleased ->
164+
return lastResultStage
165+
.exceptionally( error -> null ) // ignore connection acquisition failures
166+
.thenCompose( this::receiveFailure )
167+
.thenCompose( error -> releaseResources().thenApply( ignore ->
167168
{
168169
Throwable queryError = Futures.completionErrorCause( error );
169-
if ( queryError != null && connectionReleased )
170+
if ( queryError != null )
170171
{
171172
// connection has been acquired and there is an unconsumed error in result cursor
172173
throw new CompletionException( queryError );
@@ -182,7 +183,7 @@ public CompletionStage<Void> closeAsync()
182183
return completedFuture( null );
183184
}
184185

185-
private CompletionStage<Throwable> receiveError( InternalStatementResultCursor cursor )
186+
private CompletionStage<Throwable> receiveFailure( InternalStatementResultCursor cursor )
186187
{
187188
if ( cursor == null )
188189
{
@@ -476,21 +477,11 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
476477
return connectionStage;
477478
}
478479

479-
/**
480-
* Rollback existing transaction and release existing connection.
481-
*
482-
* @return {@link CompletionStage} as returned by {@link #releaseConnectionNow()}.
483-
*/
484-
private CompletionStage<Boolean> releaseResources()
480+
private CompletionStage<Void> releaseResources()
485481
{
486482
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
487483
}
488484

489-
/**
490-
* Rollback existing transaction, if any. Errors will be ignored.
491-
*
492-
* @return {@link CompletionStage} completed with {@code null} when transaction rollback completes or fails.
493-
*/
494485
private CompletionStage<Void> rollbackTransaction()
495486
{
496487
return existingTransactionOrNull().thenCompose( tx ->
@@ -503,27 +494,20 @@ private CompletionStage<Void> rollbackTransaction()
503494
} ).exceptionally( error ->
504495
{
505496
Throwable cause = Futures.completionErrorCause( error );
506-
logger.error( "Failed to rollback active transaction", cause );
497+
logger.warn( "Active transaction rolled back with an error", cause );
507498
return null;
508499
} );
509500
}
510501

511-
/**
512-
* Release existing connection or do nothing when none has been acquired.
513-
*
514-
* @return {@link CompletionStage} completed with {@code true} when there was a connection and it has been released,
515-
* {@link CompletionStage} completed with {@code false} when connection has not been acquired and nothing has been
516-
* released.
517-
*/
518-
private CompletionStage<Boolean> releaseConnectionNow()
502+
private CompletionStage<Void> releaseConnectionNow()
519503
{
520504
return existingConnectionOrNull().thenCompose( connection ->
521505
{
522506
if ( connection != null )
523507
{
524-
return connection.releaseNow().thenApply( ignore -> true );
508+
return connection.releaseNow();
525509
}
526-
return completedFuture( false );
510+
return completedFuture( null );
527511
} );
528512
}
529513

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ public synchronized CompletionStage<Record> nextAsync()
173173
} );
174174
}
175175

176+
// todo: propagate failure from here as well
176177
public synchronized CompletionStage<ResultSummary> summaryAsync()
177178
{
178179
if ( summary != null )

driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ public void onSuccess( Map<String,Value> metadata )
4646
statementKeys = extractKeys( metadata );
4747
resultAvailableAfter = extractResultAvailableAfter( metadata );
4848

49-
runCompletedFuture.complete( null );
49+
completeRunFuture();
5050
}
5151

5252
@Override
5353
public void onFailure( Throwable error )
5454
{
55-
runCompletedFuture.completeExceptionally( error );
55+
completeRunFuture();
5656
}
5757

5858
@Override
@@ -71,6 +71,16 @@ public long resultAvailableAfter()
7171
return resultAvailableAfter;
7272
}
7373

74+
/**
75+
* Complete the given future with {@code null}. Future is never completed exceptionally because callers are only
76+
* interested in when RUN completes and not how. Async API needs to wait for RUN because it needs to access
77+
* statement keys.
78+
*/
79+
private void completeRunFuture()
80+
{
81+
runCompletedFuture.complete( null );
82+
}
83+
7484
private static List<String> extractKeys( Map<String,Value> metadata )
7585
{
7686
Value keysValue = metadata.get( "fields" );

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.util.concurrent.ExecutionException;
2727
import java.util.concurrent.Future;
2828

29+
import static java.util.concurrent.CompletableFuture.completedFuture;
30+
2931
public final class Futures
3032
{
3133
private Futures()
@@ -116,4 +118,16 @@ public static Throwable completionErrorCause( Throwable error )
116118
}
117119
return error;
118120
}
121+
122+
public static <T> CompletionStage<T> firstNotNull( CompletionStage<T> stage1, CompletionStage<T> stage2 )
123+
{
124+
return stage1.thenCompose( value ->
125+
{
126+
if ( value != null )
127+
{
128+
return completedFuture( value );
129+
}
130+
return stage2;
131+
} );
132+
}
119133
}

0 commit comments

Comments
 (0)