Skip to content

Commit 16b0753

Browse files
committed
Cleanup of CompletionException handling
Exceptions can get wrapped in `CompletionException` when propagated through `CompletionStage#whenComplete()` and `CompletionStage#handle()`. It is also needed to wrap throwable in a `CompletionException` before throwing it from a `CompletionStage` chain because checked exceptions are not allowed there. This commit cleans up helpers to unwrap `CompletionException` and convert throwable to `CompletionException`.
1 parent 5d005dd commit 16b0753

17 files changed

+55
-38
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import java.util.concurrent.CompletableFuture;
23-
import java.util.concurrent.CompletionException;
2423
import java.util.concurrent.CompletionStage;
2524
import java.util.function.BiConsumer;
2625
import java.util.function.BiFunction;
@@ -49,7 +48,6 @@
4948
import static java.util.Collections.emptyMap;
5049
import static java.util.concurrent.CompletableFuture.completedFuture;
5150
import static org.neo4j.driver.internal.util.Futures.blockingGet;
52-
import static org.neo4j.driver.internal.util.Futures.completionErrorCause;
5351
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5452
import static org.neo4j.driver.v1.Values.value;
5553

@@ -122,7 +120,7 @@ public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark
122120
{
123121
// release connection if begin failed, transaction can't be started
124122
connection.release();
125-
throw new CompletionException( Futures.completionErrorCause( beginError ) );
123+
throw Futures.asCompletionException( beginError );
126124
}
127125
return tx;
128126
} );
@@ -372,11 +370,11 @@ private BiFunction<Void,Throwable,Void> handleCommitOrRollback( Throwable cursor
372370
{
373371
if ( cursorFailure != null )
374372
{
375-
throw new CompletionException( completionErrorCause( cursorFailure ) );
373+
throw Futures.asCompletionException( cursorFailure );
376374
}
377375
else if ( commitOrRollbackError != null )
378376
{
379-
throw new CompletionException( completionErrorCause( commitOrRollbackError ) );
377+
throw Futures.asCompletionException( commitOrRollbackError );
380378
}
381379
else
382380
{

driver/src/main/java/org/neo4j/driver/internal/InternalStatementResultCursor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ private void internalForEachAsync( Consumer<Record> action, CompletableFuture<Vo
136136
// the caller thread to get StackOverflowError when result is large and buffered
137137
recordFuture.whenCompleteAsync( ( record, completionError ) ->
138138
{
139-
Throwable error = Futures.completionErrorCause( completionError );
139+
Throwable error = Futures.completionExceptionCause( completionError );
140140
if ( error != null )
141141
{
142142
resultFuture.completeExceptionally( error );
@@ -170,7 +170,7 @@ private <T> void internalListAsync( List<T> result, CompletableFuture<List<T>> r
170170
// the caller thread to get StackOverflowError when result is large and buffered
171171
recordFuture.whenCompleteAsync( ( record, completionError ) ->
172172
{
173-
Throwable error = Futures.completionErrorCause( completionError );
173+
Throwable error = Futures.completionExceptionCause( completionError );
174174
if ( error != null )
175175
{
176176
resultFuture.completeExceptionally( error );

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,10 @@ public CompletionStage<Void> closeAsync()
169169
return cursor.failureAsync();
170170
} ).thenCompose( error -> releaseResources().thenApply( ignore ->
171171
{
172-
Throwable queryError = Futures.completionErrorCause( error );
173-
if ( queryError != null )
172+
if ( error != null )
174173
{
175174
// connection has been acquired and there is an unconsumed error in result cursor
176-
throw new CompletionException( queryError );
175+
throw Futures.asCompletionException( error );
177176
}
178177
else
179178
{
@@ -316,7 +315,7 @@ private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWor
316315

317316
txFuture.whenComplete( ( tx, completionError ) ->
318317
{
319-
Throwable error = Futures.completionErrorCause( completionError );
318+
Throwable error = Futures.completionExceptionCause( completionError );
320319
if ( error != null )
321320
{
322321
resultFuture.completeExceptionally( error );
@@ -337,7 +336,7 @@ private <T> void executeWork( CompletableFuture<T> resultFuture, ExplicitTransac
337336
CompletionStage<T> workFuture = safeExecuteWork( tx, work );
338337
workFuture.whenComplete( ( result, completionError ) ->
339338
{
340-
Throwable error = Futures.completionErrorCause( completionError );
339+
Throwable error = Futures.completionExceptionCause( completionError );
341340
if ( error != null )
342341
{
343342
rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
@@ -393,7 +392,7 @@ private <T> void closeTxAfterSucceededTransactionWork( ExplicitTransaction tx, C
393392
tx.success();
394393
tx.closeAsync().whenComplete( ( ignore, completionError ) ->
395394
{
396-
Throwable commitError = Futures.completionErrorCause( completionError );
395+
Throwable commitError = Futures.completionExceptionCause( completionError );
397396
if ( commitError != null )
398397
{
399398
resultFuture.completeExceptionally( commitError );
@@ -498,7 +497,7 @@ private CompletionStage<Void> rollbackTransaction()
498497
return completedFuture( null );
499498
} ).exceptionally( error ->
500499
{
501-
Throwable cause = Futures.completionErrorCause( error );
500+
Throwable cause = Futures.completionExceptionCause( error );
502501
logger.warn( "Active transaction rolled back with an error", cause );
503502
return null;
504503
} );

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ private EventLoopGroup eventLoopGroup()
180180

181181
private void processAcquisitionError( Throwable error )
182182
{
183-
Throwable cause = Futures.completionErrorCause( error );
183+
Throwable cause = Futures.completionExceptionCause( error );
184184
if ( cause != null )
185185
{
186186
if ( cause instanceof TimeoutException )

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private void lookupClusterComposition( RoutingTable routingTable, ConnectionPool
9393
{
9494
lookup( routingTable, pool ).whenComplete( ( composition, completionError ) ->
9595
{
96-
Throwable error = Futures.completionErrorCause( completionError );
96+
Throwable error = Futures.completionExceptionCause( completionError );
9797
if ( error != null )
9898
{
9999
result.completeExceptionally( error );
@@ -225,7 +225,7 @@ private CompletionStage<ClusterComposition> lookupOnRouter( BoltServerAddress ro
225225

226226
return provider.getClusterComposition( connectionStage ).handle( ( response, error ) ->
227227
{
228-
Throwable cause = Futures.completionErrorCause( error );
228+
Throwable cause = Futures.completionExceptionCause( error );
229229
if ( cause != null )
230230
{
231231
return handleRoutingProcedureError( cause, routingTable, routerAddress );

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private CompletionStage<List<Record>> releaseConnection( Connection connection,
9191
private RoutingProcedureResponse processProcedureResponse( Statement procedure, List<Record> records,
9292
Throwable error )
9393
{
94-
Throwable cause = Futures.completionErrorCause( error );
94+
Throwable cause = Futures.completionExceptionCause( error );
9595
if ( cause != null )
9696
{
9797
return handleError( procedure, cause );

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ else if ( routingTable.isStaleFor( mode ) )
146146
rediscovery.lookupClusterComposition( routingTable, connectionPool )
147147
.whenComplete( ( composition, completionError ) ->
148148
{
149-
Throwable error = Futures.completionErrorCause( completionError );
149+
Throwable error = Futures.completionExceptionCause( completionError );
150150
if ( error != null )
151151
{
152152
clusterCompositionLookupFailed( error );
@@ -214,7 +214,7 @@ private void acquire( AccessMode mode, AddressSet addresses, CompletableFuture<C
214214

215215
connectionPool.acquire( address ).whenComplete( ( connection, completionError ) ->
216216
{
217-
Throwable error = Futures.completionErrorCause( completionError );
217+
Throwable error = Futures.completionExceptionCause( completionError );
218218
if ( error != null )
219219
{
220220
if ( error instanceof ServiceUnavailableException )

driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.Map;
2222
import java.util.Objects;
23-
import java.util.concurrent.CompletionException;
2423

2524
import org.neo4j.driver.internal.BoltServerAddress;
2625
import org.neo4j.driver.internal.RoutingErrorHandler;
@@ -72,11 +71,7 @@ public void onRecord( Value[] fields )
7271

7372
private Throwable handledError( Throwable receivedError )
7473
{
75-
Throwable error = Futures.completionErrorCause( receivedError );
76-
if ( error instanceof CompletionException )
77-
{
78-
error = error.getCause();
79-
}
74+
Throwable error = Futures.completionExceptionCause( receivedError );
8075

8176
if ( error instanceof ServiceUnavailableException )
8277
{

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private <T> void executeWork( CompletableFuture<T> resultFuture, Supplier<Comple
176176

177177
workStage.whenComplete( ( result, completionError ) ->
178178
{
179-
Throwable error = Futures.completionErrorCause( completionError );
179+
Throwable error = Futures.completionExceptionCause( completionError );
180180
if ( error != null )
181181
{
182182
// work failed in async way, attempt to schedule a retry

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.concurrent.CompletionStage;
2626
import java.util.concurrent.ExecutionException;
2727
import java.util.concurrent.Future;
28+
import java.util.function.BiConsumer;
29+
import java.util.function.BiFunction;
2830

2931
import org.neo4j.driver.internal.async.EventLoopGroupFactory;
3032

@@ -110,13 +112,36 @@ public static <V> V blockingGet( CompletionStage<V> stage )
110112
}
111113
}
112114

113-
// todo: test all call sites
114-
public static Throwable completionErrorCause( Throwable error )
115+
/**
116+
* Helper method to extract cause of a {@link CompletionException}.
117+
* <p>
118+
* When using {@link CompletionStage#whenComplete(BiConsumer)} and {@link CompletionStage#handle(BiFunction)}
119+
* propagated exceptions might get wrapped in a {@link CompletionException}.
120+
*
121+
* @param error the exception to get cause for.
122+
* @return cause of the given exception if it is a {@link CompletionException}, given exception otherwise.
123+
*/
124+
public static Throwable completionExceptionCause( Throwable error )
115125
{
116126
if ( error instanceof CompletionException )
117127
{
118128
return error.getCause();
119129
}
120130
return error;
121131
}
132+
133+
/**
134+
* Helped method to turn given exception into a {@link CompletionException}.
135+
*
136+
* @param error the exception to convert.
137+
* @return given exception wrapped with {@link CompletionException} if it's not one already.
138+
*/
139+
public static CompletionException asCompletionException( Throwable error )
140+
{
141+
if ( error instanceof CompletionException )
142+
{
143+
return ((CompletionException) error);
144+
}
145+
return new CompletionException( error );
146+
}
122147
}

driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1198,7 +1198,7 @@ private void runNestedQuery( StatementResultCursor inputCursor, Record record,
11981198
{
11991199
if ( error != null )
12001200
{
1201-
resultFuture.completeExceptionally( Futures.completionErrorCause( error ) );
1201+
resultFuture.completeExceptionally( Futures.completionExceptionCause( error ) );
12021202
}
12031203
else
12041204
{
@@ -1300,7 +1300,7 @@ public CompletionStage<Record> execute( Transaction tx )
13001300
CompletableFuture<Record> resultFuture = new CompletableFuture<>();
13011301

13021302
tx.runAsync( query ).whenComplete( ( cursor, error ) ->
1303-
processQueryResult( cursor, Futures.completionErrorCause( error ), resultFuture ) );
1303+
processQueryResult( cursor, Futures.completionExceptionCause( error ), resultFuture ) );
13041304

13051305
return resultFuture;
13061306
}
@@ -1315,7 +1315,7 @@ private void processQueryResult( StatementResultCursor cursor, Throwable error,
13151315
}
13161316

13171317
cursor.nextAsync().whenComplete( ( record, fetchError ) ->
1318-
processFetchResult( record, Futures.completionErrorCause( fetchError ), resultFuture ) );
1318+
processFetchResult( record, Futures.completionExceptionCause( fetchError ), resultFuture ) );
13191319
}
13201320

13211321
private void processFetchResult( Record record, Throwable error, CompletableFuture<Record> resultFuture )

driver/src/test/java/org/neo4j/driver/v1/stress/AsyncFailingQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public CompletionStage<Void> execute( C context )
5050
session.closeAsync();
5151

5252
assertNull( records );
53-
Throwable cause = Futures.completionErrorCause( error );
53+
Throwable cause = Futures.completionExceptionCause( error );
5454
assertThat( cause, is( arithmeticError() ) );
5555

5656
return null;

driver/src/test/java/org/neo4j/driver/v1/stress/AsyncFailingQueryInTx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public CompletionStage<Void> execute( C context )
5050
.handle( ( records, error ) ->
5151
{
5252
assertNull( records );
53-
Throwable cause = Futures.completionErrorCause( error );
53+
Throwable cause = Futures.completionExceptionCause( error );
5454
assertThat( cause, is( arithmeticError() ) );
5555

5656
return tx;

driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWriteQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public CompletionStage<Void> execute( C context )
5151

5252
if ( error != null )
5353
{
54-
handleError( Futures.completionErrorCause( error ), context );
54+
handleError( Futures.completionExceptionCause( error ), context );
5555
}
5656
else
5757
{

driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWriteQueryInTx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public CompletionStage<Void> execute( C context )
5454

5555
if ( error != null )
5656
{
57-
handleError( Futures.completionErrorCause( error ), context );
57+
handleError( Futures.completionExceptionCause( error ), context );
5858
}
5959
else
6060
{

driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public CompletionStage<Void> execute( C context )
5353
session.closeAsync();
5454
assertNull( record );
5555

56-
Throwable cause = Futures.completionErrorCause( error );
56+
Throwable cause = Futures.completionExceptionCause( error );
5757
assertNotNull( cause );
5858
assertThat( cause, instanceOf( ClientException.class ) );
5959
assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) );

driver/src/test/java/org/neo4j/driver/v1/stress/AsyncWrongQueryInTx.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public CompletionStage<Void> execute( C context )
5454
{
5555
assertNull( record );
5656

57-
Throwable cause = Futures.completionErrorCause( error );
57+
Throwable cause = Futures.completionExceptionCause( error );
5858
assertNotNull( cause );
5959
assertThat( cause, instanceOf( ClientException.class ) );
6060
assertThat( ((Neo4jException) cause).code(), containsString( "SyntaxError" ) );

0 commit comments

Comments
 (0)