diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index 45ad5e0b8a..527874e44f 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -19,6 +19,8 @@
package org.neo4j.driver.internal;
import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
import java.io.IOException;
import java.net.URI;
@@ -72,14 +74,18 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
- RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );
- AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config );
+ Bootstrap bootstrap = BootstrapFactory.newBootstrap();
+ EventLoopGroup eventLoopGroup = bootstrap.config().group();
+ RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );
+
+ AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
+ config );
try
{
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
- asyncConnectionPool );
+ asyncConnectionPool, eventLoopGroup );
}
catch ( Throwable driverError )
{
@@ -98,14 +104,13 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}
private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
- Config config )
+ Bootstrap bootstrap, Config config )
{
Clock clock = createClock();
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
activeChannelTracker, config.logging(), clock );
- Bootstrap bootstrap = BootstrapFactory.newBootstrap();
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
config.maxConnectionPoolSize(),
@@ -116,16 +121,18 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
- RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
+ RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
- return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
+ return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
+ eventExecutorGroup );
case BOLT_ROUTING_URI_SCHEME:
- return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
+ return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
+ eventExecutorGroup );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
@@ -137,11 +144,13 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* This method is protected only for testing
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
- SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
+ SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
+ EventExecutorGroup eventExecutorGroup )
{
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
- SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
+ SessionFactory sessionFactory =
+ createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
return createDriver( config, securityPlan, sessionFactory );
}
@@ -151,14 +160,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* This method is protected only for testing
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
- Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
+ Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
+ EventExecutorGroup eventExecutorGroup )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
- SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
+ SessionFactory sessionFactory =
+ createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
return createDriver( config, securityPlan, sessionFactory );
}
@@ -239,10 +250,10 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
*
* This method is protected only for testing
*/
- protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
- RetryLogic retryLogic, Config config )
+ protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
+ EventExecutorGroup eventExecutorGroup, Config config )
{
- return new SessionFactoryImpl( connectionProvider, retryLogic, config );
+ return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
}
/**
@@ -250,9 +261,10 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
*
* This method is protected only for testing
*/
- protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging )
+ protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup,
+ Logging logging )
{
- return new ExponentialBackoffRetryLogic( settings, createClock(), logging );
+ return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
}
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
index 8d0ff5a364..7551c88373 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
@@ -33,6 +33,7 @@
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.types.InternalTypeSystem;
+import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.Statement;
@@ -219,7 +220,7 @@ public Response commitAsync()
return internalCommitAsync();
}
- private InternalFuture internalCommitAsync()
+ InternalFuture internalCommitAsync()
{
if ( state == State.COMMITTED )
{
@@ -259,12 +260,12 @@ else if ( state == State.ROLLED_BACK )
}
}
- private Runnable releaseConnectionAndNotifySession()
+ private BiConsumer releaseConnectionAndNotifySession()
{
- return new Runnable()
+ return new BiConsumer()
{
@Override
- public void run()
+ public void accept( Void result, Throwable error )
{
asyncConnection.release();
session.asyncTransactionClosed( ExplicitTransaction.this );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
index 21833b86fd..5eac6f1e6a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
@@ -50,7 +50,7 @@ public class InternalStatementResult implements StatementResult
{
this.statement = statement;
this.connection = connection;
- this.runResponseHandler = new RunResponseHandler( null );
+ this.runResponseHandler = new RunResponseHandler( null, null );
this.pullAllResponseHandler = new RecordsResponseHandler( runResponseHandler );
this.resourcesHandler = resourcesHandler;
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
index eeecf4f0fe..63420be0e8 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
@@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.EventExecutorGroup;
+
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
@@ -30,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession
private final String stackTrace;
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
- Logging logging )
+ EventExecutorGroup eventExecutorGroup, Logging logging )
{
- super( connectionProvider, mode, retryLogic, logging );
+ super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
this.stackTrace = captureStackTrace();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
index 51881a2c2b..4a39bab241 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
@@ -18,7 +18,9 @@
*/
package org.neo4j.driver.internal;
-import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +42,7 @@
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Response;
+import org.neo4j.driver.v1.ResponseListener;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
@@ -61,6 +64,7 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR
private final ConnectionProvider connectionProvider;
private final AccessMode mode;
private final RetryLogic retryLogic;
+ private final EventExecutorGroup eventExecutorGroup;
protected final Logger logger;
private volatile Bookmark bookmark = Bookmark.empty();
@@ -73,11 +77,12 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR
private final AtomicBoolean isOpen = new AtomicBoolean( true );
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
- Logging logging )
+ EventExecutorGroup eventExecutorGroup, Logging logging )
{
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
+ this.eventExecutorGroup = eventExecutorGroup;
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
}
@@ -153,7 +158,7 @@ public Response runAsync( final Statement statement )
InternalFuture connectionFuture = acquireAsyncConnection( mode );
- return connectionFuture.thenCombine( new Function>()
+ return connectionFuture.thenCompose( new Function>()
{
@Override
public InternalFuture apply( AsyncConnection connection )
@@ -240,7 +245,7 @@ public Response closeAsync()
{
if ( asyncConnectionFuture != null )
{
- return asyncConnectionFuture.thenCombine( new Function>()
+ return asyncConnectionFuture.thenCompose( new Function>()
{
@Override
public InternalFuture apply( AsyncConnection connection )
@@ -251,7 +256,7 @@ public InternalFuture apply( AsyncConnection connection )
}
else if ( currentAsyncTransactionFuture != null )
{
- return currentAsyncTransactionFuture.thenCombine( new Function>()
+ return currentAsyncTransactionFuture.thenCompose( new Function>()
{
@Override
public InternalFuture apply( ExplicitTransaction tx )
@@ -262,7 +267,7 @@ public InternalFuture apply( ExplicitTransaction tx )
}
else
{
- return new InternalPromise( GlobalEventExecutor.INSTANCE ).setSuccess( null );
+ return new InternalPromise( eventExecutorGroup ).setSuccess( null );
}
}
@@ -283,7 +288,8 @@ public synchronized Transaction beginTransaction( String bookmark )
@Override
public Response beginTransactionAsync()
{
- return beginTransactionAsync( mode );
+ //noinspection unchecked
+ return (Response) beginTransactionAsync( mode );
}
@Override
@@ -292,12 +298,24 @@ public T readTransaction( TransactionWork work )
return transaction( AccessMode.READ, work );
}
+ @Override
+ public Response readTransactionAsync( TransactionWork> work )
+ {
+ return transactionAsync( AccessMode.READ, work );
+ }
+
@Override
public T writeTransaction( TransactionWork work )
{
return transaction( AccessMode.WRITE, work );
}
+ @Override
+ public Response writeTransactionAsync( TransactionWork> work )
+ {
+ return transactionAsync( AccessMode.WRITE, work );
+ }
+
void setBookmark( Bookmark bookmark )
{
if ( bookmark != null && !bookmark.isEmpty() )
@@ -397,6 +415,129 @@ public T get()
} );
}
+ private InternalFuture transactionAsync( final AccessMode mode, final TransactionWork> work )
+ {
+ return retryLogic.retryAsync( new Supplier>()
+ {
+ @Override
+ public InternalFuture get()
+ {
+ final InternalFuture txFuture = beginTransactionAsync( mode );
+ final InternalPromise resultPromise = new InternalPromise<>( txFuture.eventExecutor() );
+
+ txFuture.addListener( new FutureListener()
+ {
+ @Override
+ public void operationComplete( Future future ) throws Exception
+ {
+ if ( future.isCancelled() )
+ {
+ resultPromise.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ executeWork( resultPromise, future.getNow(), work );
+ }
+ else
+ {
+ resultPromise.setFailure( future.cause() );
+ }
+ }
+ } );
+
+ return resultPromise;
+ }
+ } );
+ }
+
+ private void executeWork( final InternalPromise resultPromise, final ExplicitTransaction tx,
+ TransactionWork> work )
+ {
+ Response workResponse = safeExecuteWork( tx, work );
+ workResponse.addListener( new ResponseListener()
+ {
+ @Override
+ public void operationCompleted( T result, Throwable error )
+ {
+ if ( error != null )
+ {
+ rollbackTxAfterFailedTransactionWork( tx, resultPromise, error );
+ }
+ else
+ {
+ commitTxAfterSucceededTransactionWork( tx, resultPromise, result );
+ }
+ }
+ } );
+ }
+
+ private Response safeExecuteWork( ExplicitTransaction tx, TransactionWork> work )
+ {
+ // given work might fail in both async and sync way
+ // async failure will result in a failed future being returned
+ // sync failure will result in an exception being thrown
+ try
+ {
+ return work.execute( tx );
+ }
+ catch ( Throwable workError )
+ {
+ // work threw an exception, wrap it in a future and proceed
+ return new InternalPromise( eventExecutorGroup ).setFailure( workError );
+ }
+ }
+
+ private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx,
+ final InternalPromise resultPromise, final Throwable error )
+ {
+ if ( tx.isOpen() )
+ {
+ tx.rollbackAsync().addListener( new ResponseListener()
+ {
+ @Override
+ public void operationCompleted( Void ignore, Throwable rollbackError )
+ {
+ if ( rollbackError != null )
+ {
+ error.addSuppressed( rollbackError );
+ }
+ resultPromise.setFailure( error );
+ }
+ } );
+ }
+ else
+ {
+ resultPromise.setFailure( error );
+ }
+ }
+
+ private void commitTxAfterSucceededTransactionWork( ExplicitTransaction tx,
+ final InternalPromise resultPromise, final T result )
+ {
+ if ( tx.isOpen() )
+ {
+ tx.commitAsync().addListener( new ResponseListener()
+ {
+ @Override
+ public void operationCompleted( Void ignore, Throwable commitError )
+ {
+ if ( commitError != null )
+ {
+ resultPromise.setFailure( commitError );
+ }
+ else
+ {
+ resultPromise.setSuccess( result );
+ }
+ }
+ } );
+ }
+ else
+ {
+ resultPromise.setSuccess( result );
+ }
+ }
+
private synchronized Transaction beginTransaction( AccessMode mode )
{
ensureSessionIsOpen();
@@ -412,14 +553,14 @@ private synchronized Transaction beginTransaction( AccessMode mode )
return currentTransaction;
}
- private synchronized Response beginTransactionAsync( AccessMode mode )
+ private synchronized InternalFuture beginTransactionAsync( AccessMode mode )
{
ensureSessionIsOpen();
ensureNoOpenTransactionBeforeOpeningTransaction();
InternalFuture connectionFuture = acquireAsyncConnection( mode );
- currentAsyncTransactionFuture = connectionFuture.thenCombine(
+ currentAsyncTransactionFuture = connectionFuture.thenCompose(
new Function>()
{
@Override
@@ -430,8 +571,7 @@ public InternalFuture apply( AsyncConnection connection )
}
} );
- //noinspection unchecked
- return (Response) currentAsyncTransactionFuture;
+ return currentAsyncTransactionFuture;
}
private void ensureNoUnrecoverableError()
@@ -495,7 +635,7 @@ private InternalFuture acquireAsyncConnection( final AccessMode
// memorize in local so same instance is transformed and used in callbacks
final InternalFuture currentAsyncConnectionFuture = asyncConnectionFuture;
- asyncConnectionFuture = currentAsyncConnectionFuture.thenCombine(
+ asyncConnectionFuture = currentAsyncConnectionFuture.thenCompose(
new Function>()
{
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
index 6d633019ec..cc9c7050ac 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
@@ -18,6 +18,8 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.EventExecutorGroup;
+
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
@@ -29,12 +31,15 @@ public class SessionFactoryImpl implements SessionFactory
{
private final ConnectionProvider connectionProvider;
private final RetryLogic retryLogic;
+ private final EventExecutorGroup eventExecutorGroup;
private final Logging logging;
private final boolean leakedSessionsLoggingEnabled;
- SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config )
+ SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic,
+ EventExecutorGroup eventExecutorGroup, Config config )
{
this.connectionProvider = connectionProvider;
+ this.eventExecutorGroup = eventExecutorGroup;
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
this.retryLogic = retryLogic;
this.logging = config.logging();
@@ -51,9 +56,9 @@ public final Session newInstance( AccessMode mode, Bookmark bookmark )
protected NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic,
AccessMode mode, Logging logging )
{
- return leakedSessionsLoggingEnabled ?
- new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, logging ) :
- new NetworkSession( connectionProvider, mode, retryLogic, logging );
+ return leakedSessionsLoggingEnabled
+ ? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging )
+ : new NetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
index 8f14c7d182..04b0240eee 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
@@ -23,6 +23,7 @@
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
+import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.util.Function;
public final class Futures
@@ -45,14 +46,14 @@ public static InternalFuture thenApply( Future future, Bootstrap bo
return result;
}
- public static InternalFuture thenCombine( InternalFuture future, Function> fn )
+ public static InternalFuture thenCompose( InternalFuture future, Function> fn )
{
InternalPromise result = new InternalPromise<>( future.eventExecutor() );
- future.addListener( new ThenCombineListener<>( result, fn ) );
+ future.addListener( new ThenComposeListener<>( result, fn ) );
return result;
}
- public static InternalFuture whenComplete( InternalFuture future, Runnable action )
+ public static InternalFuture whenComplete( InternalFuture future, BiConsumer action )
{
InternalPromise result = new InternalPromise<>( future.eventExecutor() );
future.addListener( new CompletionListener<>( result, action ) );
@@ -97,12 +98,12 @@ else if ( future.isSuccess() )
}
}
- private static class ThenCombineListener implements GenericFutureListener>
+ private static class ThenComposeListener implements GenericFutureListener>
{
final Promise result;
final Function> fn;
- ThenCombineListener( Promise result, Function> fn )
+ ThenComposeListener( Promise result, Function> fn )
{
this.result = result;
this.fn = fn;
@@ -168,9 +169,9 @@ else if ( future.isSuccess() )
private static class CompletionListener implements GenericFutureListener>
{
final Promise result;
- final Runnable action;
+ final BiConsumer action;
- CompletionListener( Promise result, Runnable action )
+ CompletionListener( Promise result, BiConsumer action )
{
this.result = result;
this.action = action;
@@ -187,7 +188,7 @@ else if ( future.isSuccess() )
{
try
{
- action.run();
+ action.accept( future.getNow(), null );
result.setSuccess( future.getNow() );
}
catch ( Throwable t )
@@ -200,7 +201,7 @@ else if ( future.isSuccess() )
Throwable error = future.cause();
try
{
- action.run();
+ action.accept( null, error );
}
catch ( Throwable t )
{
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java
index 54245824be..6444384fe3 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java
@@ -21,6 +21,7 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
+import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.util.Function;
@@ -30,7 +31,7 @@ public interface InternalFuture extends Future, Response
InternalFuture thenApply( Function fn );
- InternalFuture thenCombine( Function> fn );
+ InternalFuture thenCompose( Function> fn );
- InternalFuture whenComplete( Runnable action );
+ InternalFuture whenComplete( BiConsumer action );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java
index dd1db10afd..6a48065b4e 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java
@@ -20,6 +20,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.GenericFutureListener;
@@ -29,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.ResponseListener;
import org.neo4j.driver.v1.util.Function;
@@ -39,7 +41,12 @@ public class InternalPromise implements InternalFuture, Promise
public InternalPromise( Bootstrap bootstrap )
{
- this( bootstrap.config().group().next() );
+ this( bootstrap.config().group() );
+ }
+
+ public InternalPromise( EventExecutorGroup eventExecutorGroup )
+ {
+ this( eventExecutorGroup.next() );
}
public InternalPromise( EventExecutor eventExecutor )
@@ -61,13 +68,13 @@ public InternalFuture thenApply( Function fn )
}
@Override
- public InternalFuture thenCombine( Function> fn )
+ public InternalFuture thenCompose( Function> fn )
{
- return Futures.thenCombine( this, fn );
+ return Futures.thenCompose( this, fn );
}
@Override
- public InternalFuture whenComplete( Runnable action )
+ public InternalFuture whenComplete( BiConsumer action )
{
return Futures.whenComplete( this, action );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
index ca09fd5c01..bb2cd5ed4b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
@@ -18,25 +18,37 @@
*/
package org.neo4j.driver.internal.async;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
+import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.StatementResultCursor;
import org.neo4j.driver.v1.summary.ResultSummary;
+import static java.util.Objects.requireNonNull;
+
public class InternalStatementResultCursor implements StatementResultCursor
{
+ private final AsyncConnection connection;
private final RunResponseHandler runResponseHandler;
private final PullAllResponseHandler pullAllHandler;
- public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
+ private InternalFuture peekedRecordResponse;
+
+ public InternalStatementResultCursor( AsyncConnection connection, RunResponseHandler runResponseHandler,
+ PullAllResponseHandler pullAllHandler )
{
- this.runResponseHandler = runResponseHandler;
- this.pullAllHandler = pullAllHandler;
+ this.connection = requireNonNull( connection );
+ this.runResponseHandler = requireNonNull( runResponseHandler );
+ this.pullAllHandler = requireNonNull( pullAllHandler );
}
@Override
@@ -53,14 +65,116 @@ public Response summaryAsync()
}
@Override
- public Response fetchAsync()
+ public Response nextAsync()
+ {
+ return internalNextAsync();
+ }
+
+ @Override
+ public Response peekAsync()
+ {
+ if ( peekedRecordResponse == null )
+ {
+ peekedRecordResponse = pullAllHandler.nextAsync();
+ }
+ return peekedRecordResponse;
+ }
+
+ @Override
+ public Response forEachAsync( final Consumer action )
{
- return pullAllHandler.fetchRecordAsync();
+ InternalPromise result = connection.newPromise();
+ internalForEachAsync( action, result );
+ return result;
}
@Override
- public Record current()
+ public Response> listAsync()
+ {
+ InternalPromise> result = connection.newPromise();
+ internalListAsync( new ArrayList(), result );
+ return result;
+ }
+
+ private void internalForEachAsync( final Consumer action, final InternalPromise result )
+ {
+ final InternalFuture recordFuture = internalNextAsync();
+
+ recordFuture.addListener( new FutureListener()
+ {
+ @Override
+ public void operationComplete( Future future )
+ {
+ if ( future.isCancelled() )
+ {
+ result.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ Record record = future.getNow();
+ if ( record != null )
+ {
+ action.accept( record );
+ internalForEachAsync( action, result );
+ }
+ else
+ {
+ result.setSuccess( null );
+ }
+ }
+ else
+ {
+ result.setFailure( future.cause() );
+ }
+ }
+ } );
+ }
+
+ private void internalListAsync( final List records, final InternalPromise> result )
+ {
+ final InternalFuture recordFuture = internalNextAsync();
+
+ recordFuture.addListener( new FutureListener()
+ {
+ @Override
+ public void operationComplete( Future future )
+ {
+ if ( future.isCancelled() )
+ {
+ result.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ Record record = future.getNow();
+ if ( record != null )
+ {
+ records.add( record );
+ internalListAsync( records, result );
+ }
+ else
+ {
+ result.setSuccess( records );
+ }
+ }
+ else
+ {
+ result.setFailure( future.cause() );
+ }
+ }
+ } );
+ }
+
+ private InternalFuture internalNextAsync()
{
- return pullAllHandler.currentRecord();
+ if ( peekedRecordResponse != null )
+ {
+ InternalFuture result = peekedRecordResponse;
+ peekedRecordResponse = null;
+ return result;
+ }
+ else
+ {
+ return pullAllHandler.nextAsync();
+ }
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Main.java b/driver/src/main/java/org/neo4j/driver/internal/async/Main.java
index 2abd234d4c..225b1a8e46 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/Main.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/Main.java
@@ -158,9 +158,9 @@ public void apply( Driver driver, MutableInt recordsRead )
Session session = driver.session();
Response cursorResponse = session.runAsync( QUERY, PARAMS_OBJ );
StatementResultCursor cursor = await( cursorResponse );
- while ( await( cursor.fetchAsync() ) )
+ Record record;
+ while ( (record = await( cursor.nextAsync() )) != null )
{
- Record record = cursor.current();
useRecord( record );
recordsRead.increment();
}
@@ -202,9 +202,9 @@ public void apply( Driver driver, MutableInt recordsRead )
Session session = driver.session();
Transaction tx = await( session.beginTransactionAsync() );
StatementResultCursor cursor = await( tx.runAsync( QUERY, PARAMS_OBJ ) );
- while ( await( cursor.fetchAsync() ) )
+ Record record;
+ while ( (record = await( cursor.nextAsync() )) != null )
{
- Record record = cursor.current();
useRecord( record );
recordsRead.increment();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java
index 2afae41876..d08e3fc3f2 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java
@@ -43,14 +43,14 @@ public static InternalFuture runAsync( AsyncConnection co
return runAsync( connection, statement, null );
}
- public static InternalFuture runAsync( AsyncConnection connection, Statement statement,
+ public static InternalFuture runAsync( final AsyncConnection connection, Statement statement,
ExplicitTransaction tx )
{
String query = statement.text();
Map params = statement.parameters().asMap( ofValue() );
InternalPromise runCompletedPromise = connection.newPromise();
- final RunResponseHandler runHandler = new RunResponseHandler( runCompletedPromise );
+ final RunResponseHandler runHandler = new RunResponseHandler( runCompletedPromise, tx );
final PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx );
connection.run( query, params, runHandler );
@@ -62,7 +62,7 @@ public static InternalFuture runAsync( AsyncConnection co
@Override
public StatementResultCursor apply( Void ignore )
{
- return new InternalStatementResultCursor( runHandler, pullAllHandler );
+ return new InternalStatementResultCursor( connection, runHandler, pullAllHandler );
}
} );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java
index 3803bec368..6e5e1f2345 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java
@@ -53,15 +53,14 @@ public abstract class PullAllResponseHandler implements ResponseHandler
private final RunResponseHandler runResponseHandler;
protected final AsyncConnection connection;
- private final Queue records;
+ private final Queue records = new LinkedList<>();
+
private boolean succeeded;
private Throwable failure;
-
private ResultSummary summary;
- private volatile Record current;
- private InternalPromise recordAvailablePromise;
- private InternalPromise summaryAvailablePromise;
+ private InternalPromise recordPromise;
+ private InternalPromise summaryPromise;
public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler,
AsyncConnection connection )
@@ -69,27 +68,21 @@ public PullAllResponseHandler( Statement statement, RunResponseHandler runRespon
this.statement = requireNonNull( statement );
this.runResponseHandler = requireNonNull( runResponseHandler );
this.connection = requireNonNull( connection );
- this.records = new LinkedList<>();
}
@Override
public synchronized void onSuccess( Map metadata )
{
summary = extractResultSummary( metadata );
- if ( summaryAvailablePromise != null )
+ if ( summaryPromise != null )
{
- summaryAvailablePromise.setSuccess( summary );
- summaryAvailablePromise = null;
+ summaryPromise.setSuccess( summary );
+ summaryPromise = null;
}
succeeded = true;
afterSuccess();
-
- if ( recordAvailablePromise != null )
- {
- recordAvailablePromise.setSuccess( false );
- recordAvailablePromise = null;
- }
+ succeedRecordPromise( null );
}
protected abstract void afterSuccess();
@@ -99,12 +92,7 @@ public synchronized void onFailure( Throwable error )
{
failure = error;
afterFailure( error );
-
- if ( recordAvailablePromise != null )
- {
- recordAvailablePromise.setFailure( error );
- recordAvailablePromise = null;
- }
+ failRecordPromise( error );
}
protected abstract void afterFailure( Throwable error );
@@ -114,11 +102,9 @@ public synchronized void onRecord( Value[] fields )
{
Record record = new InternalRecord( runResponseHandler.statementKeys(), fields );
- if ( recordAvailablePromise != null )
+ if ( recordPromise != null )
{
- current = record;
- recordAvailablePromise.setSuccess( true );
- recordAvailablePromise = null;
+ succeedRecordPromise( record );
}
else
{
@@ -126,42 +112,33 @@ public synchronized void onRecord( Value[] fields )
}
}
- public synchronized InternalFuture fetchRecordAsync()
+ public synchronized InternalFuture nextAsync()
{
Record record = dequeueRecord();
if ( record == null )
{
if ( succeeded )
{
- return connection.newPromise().setSuccess( false );
+ return connection.newPromise().setSuccess( null );
}
if ( failure != null )
{
- return connection.newPromise().setFailure( failure );
+ return connection.newPromise().setFailure( failure );
}
- if ( recordAvailablePromise == null )
+ if ( recordPromise == null )
{
- recordAvailablePromise = connection.newPromise();
+ recordPromise = connection.newPromise();
}
-
- return recordAvailablePromise;
+ return recordPromise;
}
else
{
- current = record;
- return connection.newPromise().setSuccess( true );
+ return connection.newPromise().setSuccess( record );
}
}
- public Record currentRecord()
- {
- Record result = current;
- current = null;
- return result;
- }
-
public synchronized InternalFuture summaryAsync()
{
if ( summary != null )
@@ -170,11 +147,11 @@ public synchronized InternalFuture summaryAsync()
}
else
{
- if ( summaryAvailablePromise == null )
+ if ( summaryPromise == null )
{
- summaryAvailablePromise = connection.newPromise();
+ summaryPromise = connection.newPromise();
}
- return summaryAvailablePromise;
+ return summaryPromise;
}
}
@@ -203,6 +180,26 @@ private Record dequeueRecord()
return record;
}
+ private void succeedRecordPromise( Record record )
+ {
+ if ( recordPromise != null )
+ {
+ InternalPromise promise = recordPromise;
+ recordPromise = null;
+ promise.setSuccess( record );
+ }
+ }
+
+ private void failRecordPromise( Throwable error )
+ {
+ if ( recordPromise != null )
+ {
+ InternalPromise promise = recordPromise;
+ recordPromise = null;
+ promise.setFailure( error );
+ }
+ }
+
private ResultSummary extractResultSummary( Map metadata )
{
return new InternalResultSummary( statement, connection.serverInfo(), extractStatementType( metadata ),
diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java
index 434554e21e..83ba08c75f 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java
@@ -25,19 +25,22 @@
import java.util.List;
import java.util.Map;
+import org.neo4j.driver.internal.ExplicitTransaction;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.v1.Value;
public class RunResponseHandler implements ResponseHandler
{
private final Promise runCompletedPromise;
+ private final ExplicitTransaction tx;
private List statementKeys;
private long resultAvailableAfter;
- public RunResponseHandler( Promise runCompletedPromise )
+ public RunResponseHandler( Promise runCompletedPromise, ExplicitTransaction tx )
{
this.runCompletedPromise = runCompletedPromise;
+ this.tx = tx;
}
@Override
@@ -55,6 +58,10 @@ public void onSuccess( Map metadata )
@Override
public void onFailure( Throwable error )
{
+ if ( tx != null )
+ {
+ tx.resultFailed( error );
+ }
if ( runCompletedPromise != null )
{
runCompletedPromise.setFailure( error );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java
index a30290e42b..a1d675a005 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java
@@ -18,10 +18,18 @@
*/
package org.neo4j.driver.internal.retry;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.neo4j.driver.internal.async.InternalFuture;
+import org.neo4j.driver.internal.async.InternalPromise;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.Supplier;
import org.neo4j.driver.v1.Logger;
@@ -41,27 +49,31 @@ public class ExponentialBackoffRetryLogic implements RetryLogic
private static final long INITIAL_RETRY_DELAY_MS = SECONDS.toMillis( 1 );
private static final double RETRY_DELAY_MULTIPLIER = 2.0;
private static final double RETRY_DELAY_JITTER_FACTOR = 0.2;
+ private static final long MAX_RETRY_DELAY = Long.MAX_VALUE / 2;
private final long maxRetryTimeMs;
private final long initialRetryDelayMs;
private final double multiplier;
private final double jitterFactor;
+ private final EventExecutorGroup eventExecutorGroup;
private final Clock clock;
private final Logger log;
- public ExponentialBackoffRetryLogic( RetrySettings settings, Clock clock, Logging logging )
+ public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup, Clock clock,
+ Logging logging )
{
this( settings.maxRetryTimeMs(), INITIAL_RETRY_DELAY_MS, RETRY_DELAY_MULTIPLIER, RETRY_DELAY_JITTER_FACTOR,
- clock, logging );
+ eventExecutorGroup, clock, logging );
}
ExponentialBackoffRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, double multiplier,
- double jitterFactor, Clock clock, Logging logging )
+ double jitterFactor, EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging )
{
this.maxRetryTimeMs = maxRetryTimeMs;
this.initialRetryDelayMs = initialRetryDelayMs;
this.multiplier = multiplier;
this.jitterFactor = jitterFactor;
+ this.eventExecutorGroup = eventExecutorGroup;
this.clock = clock;
this.log = logging.getLog( RETRY_LOGIC_LOG_NAME );
@@ -95,7 +107,7 @@ public T retry( Supplier work )
if ( elapsedTime < maxRetryTimeMs )
{
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
- log.error( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error );
+ log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error );
sleep( delayWithJitterMs );
nextDelayMs = (long) (nextDelayMs * multiplier);
@@ -109,8 +121,124 @@ public T retry( Supplier work )
}
}
+ @Override
+ public InternalFuture retryAsync( Supplier> work )
+ {
+ InternalPromise result = new InternalPromise<>( eventExecutorGroup );
+ executeWorkInEventLoop( result, work );
+ return result;
+ }
+
+ protected boolean canRetryOn( Throwable error )
+ {
+ return error instanceof SessionExpiredException ||
+ error instanceof ServiceUnavailableException ||
+ isTransientError( error );
+ }
+
+ private void executeWorkInEventLoop( final InternalPromise result, final Supplier> work )
+ {
+ // this is the very first time we execute given work
+ EventExecutor eventExecutor = eventExecutorGroup.next();
+
+ eventExecutor.execute( new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ executeWork( result, work, -1, initialRetryDelayMs, null );
+ }
+ } );
+ }
+
+ private void retryWorkInEventLoop( final InternalPromise result, final Supplier> work,
+ final Throwable error, final long startTime, final long delayMs, final List errors )
+ {
+ // work has failed before, we need to schedule retry with the given delay
+ EventExecutor eventExecutor = eventExecutorGroup.next();
+
+ long delayWithJitterMs = computeDelayWithJitter( delayMs );
+ log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
+
+ eventExecutor.schedule( new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ long newRetryDelayMs = (long) (delayMs * multiplier);
+ executeWork( result, work, startTime, newRetryDelayMs, errors );
+ }
+ }, delayWithJitterMs, TimeUnit.MILLISECONDS );
+ }
+
+ private void executeWork( final InternalPromise result, final Supplier> work,
+ final long startTime, final long retryDelayMs, final List errors )
+ {
+ InternalFuture workFuture;
+ try
+ {
+ workFuture = work.get();
+ }
+ catch ( Throwable error )
+ {
+ // work failed in a sync way, attempt to schedule a retry
+ retryOnError( result, work, startTime, retryDelayMs, error, errors );
+ return;
+ }
+
+ workFuture.addListener( new FutureListener()
+ {
+ @Override
+ public void operationComplete( Future future )
+ {
+ if ( future.isCancelled() )
+ {
+ result.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ result.setSuccess( future.getNow() );
+ }
+ else
+ {
+ // work failed in async way, attempt to schedule a retry
+ retryOnError( result, work, startTime, retryDelayMs, future.cause(), errors );
+ }
+ }
+ } );
+ }
+
+ private void retryOnError( InternalPromise result, Supplier> work, long startTime,
+ long retryDelayMs, Throwable error, List errors )
+ {
+ if ( canRetryOn( error ) )
+ {
+ long currentTime = clock.millis();
+ if ( startTime == -1 )
+ {
+ startTime = currentTime;
+ }
+
+ long elapsedTime = currentTime - startTime;
+ if ( elapsedTime < maxRetryTimeMs )
+ {
+ errors = recordError( error, errors );
+ retryWorkInEventLoop( result, work, error, startTime, retryDelayMs, errors );
+ return;
+ }
+ }
+
+ addSuppressed( error, errors );
+ result.setFailure( error );
+ }
+
private long computeDelayWithJitter( long delayMs )
{
+ if ( delayMs > MAX_RETRY_DELAY )
+ {
+ delayMs = MAX_RETRY_DELAY;
+ }
+
long jitter = (long) (delayMs * jitterFactor);
long min = delayMs - jitter;
long max = delayMs + jitter;
@@ -154,13 +282,6 @@ private void verifyAfterConstruction()
}
}
- private static boolean canRetryOn( Throwable error )
- {
- return error instanceof SessionExpiredException ||
- error instanceof ServiceUnavailableException ||
- isTransientError( error );
- }
-
private static boolean isTransientError( Throwable error )
{
if ( error instanceof TransientException )
diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java
index d37b2b3055..5f6569b87d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java
@@ -18,9 +18,12 @@
*/
package org.neo4j.driver.internal.retry;
+import org.neo4j.driver.internal.async.InternalFuture;
import org.neo4j.driver.internal.util.Supplier;
public interface RetryLogic
{
T retry( Supplier work );
+
+ InternalFuture retryAsync( Supplier> work );
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java
new file mode 100644
index 0000000000..a9c09139db
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.util;
+
+public interface BiConsumer
+{
+ void accept( T t, U u );
+}
diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java
index a9674128cf..a3c9e3e61e 100644
--- a/driver/src/main/java/org/neo4j/driver/v1/Session.java
+++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java
@@ -96,6 +96,8 @@ public interface Session extends Resource, StatementRunner
*/
T readTransaction( TransactionWork work );
+ Response readTransactionAsync( TransactionWork> work );
+
/**
* Execute given unit of work in a {@link AccessMode#WRITE write} transaction.
*
@@ -108,6 +110,8 @@ public interface Session extends Resource, StatementRunner
*/
T writeTransaction( TransactionWork work );
+ Response writeTransactionAsync( TransactionWork> work );
+
/**
* Return the bookmark received following the last completed
* {@linkplain Transaction transaction}. If no bookmark was received
diff --git a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java
index d44232fff7..528838b63c 100644
--- a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.summary.ResultSummary;
public interface StatementResultCursor
@@ -33,7 +34,11 @@ public interface StatementResultCursor
Response summaryAsync();
- Response fetchAsync();
+ Response nextAsync();
- Record current();
+ Response peekAsync();
+
+ Response forEachAsync( Consumer action );
+
+ Response> listAsync();
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java
index dd7be8391b..7a89922175 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.EventExecutorGroup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -169,7 +170,8 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
@Override
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
- RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
+ RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
+ EventExecutorGroup eventExecutorGroup )
{
throw new UnsupportedOperationException( "Can't create routing driver" );
}
@@ -200,9 +202,10 @@ protected LoadBalancer createLoadBalancer( BoltServerAddress address, Connection
@Override
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
- RetryLogic retryLogic, Config config )
+ RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup, Config config )
{
- SessionFactory sessionFactory = super.createSessionFactory( connectionProvider, retryLogic, config );
+ SessionFactory sessionFactory =
+ super.createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
capturedSessionFactory = sessionFactory;
return sessionFactory;
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java
index 1249acb047..6188e4d51b 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.GlobalEventExecutor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -99,7 +100,7 @@ private static void finalize( Session session ) throws Exception
private static LeakLoggingNetworkSession newSession( Logging logging, boolean openConnection )
{
return new LeakLoggingNetworkSession( connectionProviderMock( openConnection ), READ,
- new FixedRetryLogic( 0 ), logging );
+ new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, logging );
}
private static ConnectionProvider connectionProviderMock( final boolean openConnection )
diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java
index 132a4a826f..23343fff11 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.GlobalEventExecutor;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -1010,7 +1011,8 @@ private static NetworkSession newSession( ConnectionProvider connectionProvider,
private static NetworkSession newSession( ConnectionProvider connectionProvider, AccessMode mode,
RetryLogic retryLogic, Bookmark bookmark )
{
- NetworkSession session = new NetworkSession( connectionProvider, mode, retryLogic, DEV_NULL_LOGGING );
+ NetworkSession session = new NetworkSession( connectionProvider, mode, retryLogic,
+ GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGING );
session.setBookmark( bookmark );
return session;
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java
index 6212951034..5e6a61800d 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.GlobalEventExecutor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -439,7 +440,7 @@ private static class NetworkSessionWithAddressFactory extends SessionFactoryImpl
{
NetworkSessionWithAddressFactory( ConnectionProvider connectionProvider, Config config )
{
- super( connectionProvider, new FixedRetryLogic( 0 ), config );
+ super( connectionProvider, new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, config );
}
@Override
@@ -456,7 +457,7 @@ private static class NetworkSessionWithAddress extends NetworkSession
NetworkSessionWithAddress( ConnectionProvider connectionProvider, AccessMode mode, Logging logging )
{
- super( connectionProvider, mode, new FixedRetryLogic( 0 ), logging );
+ super( connectionProvider, mode, new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, logging );
try ( PooledConnection connection = connectionProvider.acquireConnection( mode ) )
{
this.address = connection.boltServerAddress();
diff --git a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java
index fd27004032..fdf07758d2 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.GlobalEventExecutor;
import org.junit.Test;
import org.neo4j.driver.internal.retry.FixedRetryLogic;
@@ -61,6 +62,7 @@ public void createsLeakLoggingNetworkSessions()
private static SessionFactory newSessionFactory( Config config )
{
- return new SessionFactoryImpl( mock( ConnectionProvider.class ), new FixedRetryLogic( 0 ), config );
+ return new SessionFactoryImpl( mock( ConnectionProvider.class ), new FixedRetryLogic( 0 ),
+ GlobalEventExecutor.INSTANCE, config );
}
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
index 8cec050acc..f80e26c9b3 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java
@@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.cluster.loadbalancing;
+import io.netty.util.concurrent.GlobalEventExecutor;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
@@ -384,8 +385,10 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne
private static Session newSession( LoadBalancer loadBalancer )
{
SleeplessClock clock = new SleeplessClock();
- RetryLogic retryLogic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, clock, DEV_NULL_LOGGING );
- return new NetworkSession( loadBalancer, AccessMode.WRITE, retryLogic, DEV_NULL_LOGGING );
+ RetryLogic retryLogic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, GlobalEventExecutor.INSTANCE,
+ clock, DEV_NULL_LOGGING );
+ return new NetworkSession( loadBalancer, AccessMode.WRITE, retryLogic, GlobalEventExecutor.INSTANCE,
+ DEV_NULL_LOGGING );
}
private static PooledConnection newConnectionWithFailingSync( BoltServerAddress address )
diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java
index e6916c474c..be0dd58a27 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java
@@ -23,10 +23,13 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutionException;
-import org.neo4j.driver.internal.logging.DevNullLogging;
+import org.neo4j.driver.internal.async.InternalFuture;
+import org.neo4j.driver.internal.async.InternalPromise;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.Supplier;
+import org.neo4j.driver.internal.util.TrackingEventExecutor;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
@@ -51,9 +54,13 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
+import static org.neo4j.driver.v1.util.TestUtil.await;
public class ExponentialBackoffRetryLogicTest
{
+ private final TrackingEventExecutor eventExecutor = new TrackingEventExecutor();
+
@Test
public void throwsForIllegalMaxRetryTime()
{
@@ -155,6 +162,24 @@ public void nextDelayCalculatedAccordingToMultiplier() throws Exception
assertEquals( delaysWithoutJitter( initialDelay, multiplier, retries ), sleepValues( clock, retries ) );
}
+ @Test
+ public void nextDelayCalculatedAccordingToMultiplierAsync() throws Exception
+ {
+ String result = "The Result";
+ int retries = 14;
+ int initialDelay = 1;
+ int multiplier = 2;
+ int noJitter = 0;
+
+ ExponentialBackoffRetryLogic retryLogic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, noJitter,
+ Clock.SYSTEM );
+
+ InternalFuture