Skip to content

Commit af7c2a6

Browse files
committed
Added async retries to RetryLogic
This commit makes retry logic able to retry work that executes async operations and return a `Future`. It's a first step towards supporting async transaction functions. Also renamed future combinator methods to better represent their intent.
1 parent c1f2f84 commit af7c2a6

File tree

14 files changed

+840
-55
lines changed

14 files changed

+840
-55
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal;
2020

2121
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.util.concurrent.EventExecutorGroup;
2223

2324
import java.io.IOException;
2425
import java.net.URI;
@@ -72,9 +73,12 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7273
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
7374
SecurityPlan securityPlan = createSecurityPlan( address, config );
7475
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
75-
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );
7676

77-
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config );
77+
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
78+
RetryLogic retryLogic = createRetryLogic( retrySettings, bootstrap.config().group(), config.logging() );
79+
80+
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
81+
config );
7882

7983
try
8084
{
@@ -98,14 +102,13 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
98102
}
99103

100104
private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
101-
Config config )
105+
Bootstrap bootstrap, Config config )
102106
{
103107
Clock clock = createClock();
104108
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
105109
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
106110
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
107111
activeChannelTracker, config.logging(), clock );
108-
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
109112
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
110113
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
111114
config.maxConnectionPoolSize(),
@@ -250,9 +253,10 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
250253
* <p>
251254
* <b>This method is protected only for testing</b>
252255
*/
253-
protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging )
256+
protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup,
257+
Logging logging )
254258
{
255-
return new ExponentialBackoffRetryLogic( settings, createClock(), logging );
259+
return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
256260
}
257261

258262
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
3434
import org.neo4j.driver.internal.spi.Connection;
3535
import org.neo4j.driver.internal.types.InternalTypeSystem;
36+
import org.neo4j.driver.internal.util.BiConsumer;
3637
import org.neo4j.driver.v1.Record;
3738
import org.neo4j.driver.v1.Response;
3839
import org.neo4j.driver.v1.Statement;
@@ -219,7 +220,7 @@ public Response<Void> commitAsync()
219220
return internalCommitAsync();
220221
}
221222

222-
private InternalFuture<Void> internalCommitAsync()
223+
InternalFuture<Void> internalCommitAsync()
223224
{
224225
if ( state == State.COMMITTED )
225226
{
@@ -259,12 +260,12 @@ else if ( state == State.ROLLED_BACK )
259260
}
260261
}
261262

262-
private Runnable releaseConnectionAndNotifySession()
263+
private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
263264
{
264-
return new Runnable()
265+
return new BiConsumer<Void,Throwable>()
265266
{
266267
@Override
267-
public void run()
268+
public void accept( Void result, Throwable error )
268269
{
269270
asyncConnection.release();
270271
session.asyncTransactionClosed( ExplicitTransaction.this );

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public Response<StatementResultCursor> runAsync( final Statement statement )
153153

154154
InternalFuture<AsyncConnection> connectionFuture = acquireAsyncConnection( mode );
155155

156-
return connectionFuture.thenCombine( new Function<AsyncConnection,InternalFuture<StatementResultCursor>>()
156+
return connectionFuture.thenCompose( new Function<AsyncConnection,InternalFuture<StatementResultCursor>>()
157157
{
158158
@Override
159159
public InternalFuture<StatementResultCursor> apply( AsyncConnection connection )
@@ -240,7 +240,7 @@ public Response<Void> closeAsync()
240240
{
241241
if ( asyncConnectionFuture != null )
242242
{
243-
return asyncConnectionFuture.thenCombine( new Function<AsyncConnection,InternalFuture<Void>>()
243+
return asyncConnectionFuture.thenCompose( new Function<AsyncConnection,InternalFuture<Void>>()
244244
{
245245
@Override
246246
public InternalFuture<Void> apply( AsyncConnection connection )
@@ -251,7 +251,7 @@ public InternalFuture<Void> apply( AsyncConnection connection )
251251
}
252252
else if ( currentAsyncTransactionFuture != null )
253253
{
254-
return currentAsyncTransactionFuture.thenCombine( new Function<ExplicitTransaction,InternalFuture<Void>>()
254+
return currentAsyncTransactionFuture.thenCompose( new Function<ExplicitTransaction,InternalFuture<Void>>()
255255
{
256256
@Override
257257
public InternalFuture<Void> apply( ExplicitTransaction tx )
@@ -283,7 +283,8 @@ public synchronized Transaction beginTransaction( String bookmark )
283283
@Override
284284
public Response<Transaction> beginTransactionAsync()
285285
{
286-
return beginTransactionAsync( mode );
286+
//noinspection unchecked
287+
return (Response) beginTransactionAsync( mode );
287288
}
288289

289290
@Override
@@ -412,14 +413,14 @@ private synchronized Transaction beginTransaction( AccessMode mode )
412413
return currentTransaction;
413414
}
414415

415-
private synchronized Response<Transaction> beginTransactionAsync( AccessMode mode )
416+
private synchronized InternalFuture<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
416417
{
417418
ensureSessionIsOpen();
418419
ensureNoOpenTransactionBeforeOpeningTransaction();
419420

420421
InternalFuture<AsyncConnection> connectionFuture = acquireAsyncConnection( mode );
421422

422-
currentAsyncTransactionFuture = connectionFuture.thenCombine(
423+
currentAsyncTransactionFuture = connectionFuture.thenCompose(
423424
new Function<AsyncConnection,InternalFuture<ExplicitTransaction>>()
424425
{
425426
@Override
@@ -431,7 +432,7 @@ public InternalFuture<ExplicitTransaction> apply( AsyncConnection connection )
431432
} );
432433

433434
//noinspection unchecked
434-
return (Response) currentAsyncTransactionFuture;
435+
return currentAsyncTransactionFuture;
435436
}
436437

437438
private void ensureNoUnrecoverableError()
@@ -495,7 +496,7 @@ private InternalFuture<AsyncConnection> acquireAsyncConnection( final AccessMode
495496
// memorize in local so same instance is transformed and used in callbacks
496497
final InternalFuture<AsyncConnection> currentAsyncConnectionFuture = asyncConnectionFuture;
497498

498-
asyncConnectionFuture = currentAsyncConnectionFuture.thenCombine(
499+
asyncConnectionFuture = currentAsyncConnectionFuture.thenCompose(
499500
new Function<AsyncConnection,InternalFuture<AsyncConnection>>()
500501
{
501502
@Override

driver/src/main/java/org/neo4j/driver/internal/async/Futures.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.netty.util.concurrent.GenericFutureListener;
2424
import io.netty.util.concurrent.Promise;
2525

26+
import org.neo4j.driver.internal.util.BiConsumer;
2627
import org.neo4j.driver.v1.util.Function;
2728

2829
public final class Futures
@@ -45,14 +46,14 @@ public static <T, U> InternalFuture<U> thenApply( Future<T> future, Bootstrap bo
4546
return result;
4647
}
4748

48-
public static <T, U> InternalFuture<U> thenCombine( InternalFuture<T> future, Function<T,InternalFuture<U>> fn )
49+
public static <T, U> InternalFuture<U> thenCompose( InternalFuture<T> future, Function<T,InternalFuture<U>> fn )
4950
{
5051
InternalPromise<U> result = new InternalPromise<>( future.eventExecutor() );
51-
future.addListener( new ThenCombineListener<>( result, fn ) );
52+
future.addListener( new ThenComposeListener<>( result, fn ) );
5253
return result;
5354
}
5455

55-
public static <T> InternalFuture<T> whenComplete( InternalFuture<T> future, Runnable action )
56+
public static <T> InternalFuture<T> whenComplete( InternalFuture<T> future, BiConsumer<T,Throwable> action )
5657
{
5758
InternalPromise<T> result = new InternalPromise<>( future.eventExecutor() );
5859
future.addListener( new CompletionListener<>( result, action ) );
@@ -97,12 +98,12 @@ else if ( future.isSuccess() )
9798
}
9899
}
99100

100-
private static class ThenCombineListener<T, U> implements GenericFutureListener<Future<T>>
101+
private static class ThenComposeListener<T, U> implements GenericFutureListener<Future<T>>
101102
{
102103
final Promise<U> result;
103104
final Function<T,InternalFuture<U>> fn;
104105

105-
ThenCombineListener( Promise<U> result, Function<T,InternalFuture<U>> fn )
106+
ThenComposeListener( Promise<U> result, Function<T,InternalFuture<U>> fn )
106107
{
107108
this.result = result;
108109
this.fn = fn;
@@ -168,9 +169,9 @@ else if ( future.isSuccess() )
168169
private static class CompletionListener<T> implements GenericFutureListener<Future<T>>
169170
{
170171
final Promise<T> result;
171-
final Runnable action;
172+
final BiConsumer<T,Throwable> action;
172173

173-
CompletionListener( Promise<T> result, Runnable action )
174+
CompletionListener( Promise<T> result, BiConsumer<T,Throwable> action )
174175
{
175176
this.result = result;
176177
this.action = action;
@@ -187,7 +188,7 @@ else if ( future.isSuccess() )
187188
{
188189
try
189190
{
190-
action.run();
191+
action.accept( future.getNow(), null );
191192
result.setSuccess( future.getNow() );
192193
}
193194
catch ( Throwable t )
@@ -200,7 +201,7 @@ else if ( future.isSuccess() )
200201
Throwable error = future.cause();
201202
try
202203
{
203-
action.run();
204+
action.accept( null, error );
204205
}
205206
catch ( Throwable t )
206207
{

driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.util.concurrent.EventExecutor;
2222
import io.netty.util.concurrent.Future;
2323

24+
import org.neo4j.driver.internal.util.BiConsumer;
2425
import org.neo4j.driver.v1.Response;
2526
import org.neo4j.driver.v1.util.Function;
2627

@@ -30,7 +31,7 @@ public interface InternalFuture<T> extends Future<T>, Response<T>
3031

3132
<U> InternalFuture<U> thenApply( Function<T,U> fn );
3233

33-
<U> InternalFuture<U> thenCombine( Function<T,InternalFuture<U>> fn );
34+
<U> InternalFuture<U> thenCompose( Function<T,InternalFuture<U>> fn );
3435

35-
InternalFuture<T> whenComplete( Runnable action );
36+
InternalFuture<T> whenComplete( BiConsumer<T,Throwable> action );
3637
}

driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.TimeUnit;
3030
import java.util.concurrent.TimeoutException;
3131

32+
import org.neo4j.driver.internal.util.BiConsumer;
3233
import org.neo4j.driver.v1.ResponseListener;
3334
import org.neo4j.driver.v1.util.Function;
3435

@@ -61,13 +62,13 @@ public <U> InternalFuture<U> thenApply( Function<T,U> fn )
6162
}
6263

6364
@Override
64-
public <U> InternalFuture<U> thenCombine( Function<T,InternalFuture<U>> fn )
65+
public <U> InternalFuture<U> thenCompose( Function<T,InternalFuture<U>> fn )
6566
{
66-
return Futures.thenCombine( this, fn );
67+
return Futures.thenCompose( this, fn );
6768
}
6869

6970
@Override
70-
public InternalFuture<T> whenComplete( Runnable action )
71+
public InternalFuture<T> whenComplete( BiConsumer<T,Throwable> action )
7172
{
7273
return Futures.whenComplete( this, action );
7374
}

0 commit comments

Comments
 (0)