diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index 45ad5e0b8a..527874e44f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -19,6 +19,8 @@ package org.neo4j.driver.internal; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.EventExecutorGroup; import java.io.IOException; import java.net.URI; @@ -72,14 +74,18 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) ); SecurityPlan securityPlan = createSecurityPlan( address, config ); ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config ); - RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() ); - AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config ); + Bootstrap bootstrap = BootstrapFactory.newBootstrap(); + EventLoopGroup eventLoopGroup = bootstrap.config().group(); + RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() ); + + AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap, + config ); try { return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic, - asyncConnectionPool ); + asyncConnectionPool, eventLoopGroup ); } catch ( Throwable driverError ) { @@ -98,14 +104,13 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r } private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan, - Config config ) + Bootstrap bootstrap, Config config ) { Clock clock = createClock(); ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() ); ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() ); AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan, activeChannelTracker, config.logging(), clock ); - Bootstrap bootstrap = BootstrapFactory.newBootstrap(); PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(), config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(), config.maxConnectionPoolSize(), @@ -116,16 +121,18 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool, Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, - RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool ) + RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup ) { String scheme = uri.getScheme().toLowerCase(); switch ( scheme ) { case BOLT_URI_SCHEME: assertNoRoutingContext( uri, routingSettings ); - return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool ); + return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool, + eventExecutorGroup ); case BOLT_ROUTING_URI_SCHEME: - return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic ); + return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic, + eventExecutorGroup ); default: throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); } @@ -137,11 +144,13 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool * This method is protected only for testing */ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool ) + SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, + EventExecutorGroup eventExecutorGroup ) { ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool, asyncConnectionPool ); - SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config ); + SessionFactory sessionFactory = + createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config ); return createDriver( config, securityPlan, sessionFactory ); } @@ -151,14 +160,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c * This method is protected only for testing */ protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic ) + Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup ) { if ( !securityPlan.isRoutingCompatible() ) { throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" ); } ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings ); - SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config ); + SessionFactory sessionFactory = + createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config ); return createDriver( config, securityPlan, sessionFactory ); } @@ -239,10 +250,10 @@ protected Connector createConnector( final ConnectionSettings connectionSettings *

* This method is protected only for testing */ - protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, - RetryLogic retryLogic, Config config ) + protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup, Config config ) { - return new SessionFactoryImpl( connectionProvider, retryLogic, config ); + return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config ); } /** @@ -250,9 +261,10 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv *

* This method is protected only for testing */ - protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging ) + protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup, + Logging logging ) { - return new ExponentialBackoffRetryLogic( settings, createClock(), logging ); + return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging ); } private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 8d0ff5a364..7551c88373 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -33,6 +33,7 @@ import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.types.InternalTypeSystem; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.Statement; @@ -219,7 +220,7 @@ public Response commitAsync() return internalCommitAsync(); } - private InternalFuture internalCommitAsync() + InternalFuture internalCommitAsync() { if ( state == State.COMMITTED ) { @@ -259,12 +260,12 @@ else if ( state == State.ROLLED_BACK ) } } - private Runnable releaseConnectionAndNotifySession() + private BiConsumer releaseConnectionAndNotifySession() { - return new Runnable() + return new BiConsumer() { @Override - public void run() + public void accept( Void result, Throwable error ) { asyncConnection.release(); session.asyncTransactionClosed( ExplicitTransaction.this ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java index 21833b86fd..5eac6f1e6a 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java @@ -50,7 +50,7 @@ public class InternalStatementResult implements StatementResult { this.statement = statement; this.connection = connection; - this.runResponseHandler = new RunResponseHandler( null ); + this.runResponseHandler = new RunResponseHandler( null, null ); this.pullAllResponseHandler = new RecordsResponseHandler( runResponseHandler ); this.resourcesHandler = resourcesHandler; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java index eeecf4f0fe..63420be0e8 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.EventExecutorGroup; + import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.v1.AccessMode; @@ -30,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession private final String stackTrace; LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, - Logging logging ) + EventExecutorGroup eventExecutorGroup, Logging logging ) { - super( connectionProvider, mode, retryLogic, logging ); + super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging ); this.stackTrace = captureStackTrace(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index 51881a2c2b..4a39bab241 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -18,7 +18,9 @@ */ package org.neo4j.driver.internal; -import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,6 +42,7 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; +import org.neo4j.driver.v1.ResponseListener; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.StatementResult; @@ -61,6 +64,7 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR private final ConnectionProvider connectionProvider; private final AccessMode mode; private final RetryLogic retryLogic; + private final EventExecutorGroup eventExecutorGroup; protected final Logger logger; private volatile Bookmark bookmark = Bookmark.empty(); @@ -73,11 +77,12 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR private final AtomicBoolean isOpen = new AtomicBoolean( true ); public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, - Logging logging ) + EventExecutorGroup eventExecutorGroup, Logging logging ) { this.connectionProvider = connectionProvider; this.mode = mode; this.retryLogic = retryLogic; + this.eventExecutorGroup = eventExecutorGroup; this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) ); } @@ -153,7 +158,7 @@ public Response runAsync( final Statement statement ) InternalFuture connectionFuture = acquireAsyncConnection( mode ); - return connectionFuture.thenCombine( new Function>() + return connectionFuture.thenCompose( new Function>() { @Override public InternalFuture apply( AsyncConnection connection ) @@ -240,7 +245,7 @@ public Response closeAsync() { if ( asyncConnectionFuture != null ) { - return asyncConnectionFuture.thenCombine( new Function>() + return asyncConnectionFuture.thenCompose( new Function>() { @Override public InternalFuture apply( AsyncConnection connection ) @@ -251,7 +256,7 @@ public InternalFuture apply( AsyncConnection connection ) } else if ( currentAsyncTransactionFuture != null ) { - return currentAsyncTransactionFuture.thenCombine( new Function>() + return currentAsyncTransactionFuture.thenCompose( new Function>() { @Override public InternalFuture apply( ExplicitTransaction tx ) @@ -262,7 +267,7 @@ public InternalFuture apply( ExplicitTransaction tx ) } else { - return new InternalPromise( GlobalEventExecutor.INSTANCE ).setSuccess( null ); + return new InternalPromise( eventExecutorGroup ).setSuccess( null ); } } @@ -283,7 +288,8 @@ public synchronized Transaction beginTransaction( String bookmark ) @Override public Response beginTransactionAsync() { - return beginTransactionAsync( mode ); + //noinspection unchecked + return (Response) beginTransactionAsync( mode ); } @Override @@ -292,12 +298,24 @@ public T readTransaction( TransactionWork work ) return transaction( AccessMode.READ, work ); } + @Override + public Response readTransactionAsync( TransactionWork> work ) + { + return transactionAsync( AccessMode.READ, work ); + } + @Override public T writeTransaction( TransactionWork work ) { return transaction( AccessMode.WRITE, work ); } + @Override + public Response writeTransactionAsync( TransactionWork> work ) + { + return transactionAsync( AccessMode.WRITE, work ); + } + void setBookmark( Bookmark bookmark ) { if ( bookmark != null && !bookmark.isEmpty() ) @@ -397,6 +415,129 @@ public T get() } ); } + private InternalFuture transactionAsync( final AccessMode mode, final TransactionWork> work ) + { + return retryLogic.retryAsync( new Supplier>() + { + @Override + public InternalFuture get() + { + final InternalFuture txFuture = beginTransactionAsync( mode ); + final InternalPromise resultPromise = new InternalPromise<>( txFuture.eventExecutor() ); + + txFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) throws Exception + { + if ( future.isCancelled() ) + { + resultPromise.cancel( true ); + } + else if ( future.isSuccess() ) + { + executeWork( resultPromise, future.getNow(), work ); + } + else + { + resultPromise.setFailure( future.cause() ); + } + } + } ); + + return resultPromise; + } + } ); + } + + private void executeWork( final InternalPromise resultPromise, final ExplicitTransaction tx, + TransactionWork> work ) + { + Response workResponse = safeExecuteWork( tx, work ); + workResponse.addListener( new ResponseListener() + { + @Override + public void operationCompleted( T result, Throwable error ) + { + if ( error != null ) + { + rollbackTxAfterFailedTransactionWork( tx, resultPromise, error ); + } + else + { + commitTxAfterSucceededTransactionWork( tx, resultPromise, result ); + } + } + } ); + } + + private Response safeExecuteWork( ExplicitTransaction tx, TransactionWork> work ) + { + // given work might fail in both async and sync way + // async failure will result in a failed future being returned + // sync failure will result in an exception being thrown + try + { + return work.execute( tx ); + } + catch ( Throwable workError ) + { + // work threw an exception, wrap it in a future and proceed + return new InternalPromise( eventExecutorGroup ).setFailure( workError ); + } + } + + private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, + final InternalPromise resultPromise, final Throwable error ) + { + if ( tx.isOpen() ) + { + tx.rollbackAsync().addListener( new ResponseListener() + { + @Override + public void operationCompleted( Void ignore, Throwable rollbackError ) + { + if ( rollbackError != null ) + { + error.addSuppressed( rollbackError ); + } + resultPromise.setFailure( error ); + } + } ); + } + else + { + resultPromise.setFailure( error ); + } + } + + private void commitTxAfterSucceededTransactionWork( ExplicitTransaction tx, + final InternalPromise resultPromise, final T result ) + { + if ( tx.isOpen() ) + { + tx.commitAsync().addListener( new ResponseListener() + { + @Override + public void operationCompleted( Void ignore, Throwable commitError ) + { + if ( commitError != null ) + { + resultPromise.setFailure( commitError ); + } + else + { + resultPromise.setSuccess( result ); + } + } + } ); + } + else + { + resultPromise.setSuccess( result ); + } + } + private synchronized Transaction beginTransaction( AccessMode mode ) { ensureSessionIsOpen(); @@ -412,14 +553,14 @@ private synchronized Transaction beginTransaction( AccessMode mode ) return currentTransaction; } - private synchronized Response beginTransactionAsync( AccessMode mode ) + private synchronized InternalFuture beginTransactionAsync( AccessMode mode ) { ensureSessionIsOpen(); ensureNoOpenTransactionBeforeOpeningTransaction(); InternalFuture connectionFuture = acquireAsyncConnection( mode ); - currentAsyncTransactionFuture = connectionFuture.thenCombine( + currentAsyncTransactionFuture = connectionFuture.thenCompose( new Function>() { @Override @@ -430,8 +571,7 @@ public InternalFuture apply( AsyncConnection connection ) } } ); - //noinspection unchecked - return (Response) currentAsyncTransactionFuture; + return currentAsyncTransactionFuture; } private void ensureNoUnrecoverableError() @@ -495,7 +635,7 @@ private InternalFuture acquireAsyncConnection( final AccessMode // memorize in local so same instance is transformed and used in callbacks final InternalFuture currentAsyncConnectionFuture = asyncConnectionFuture; - asyncConnectionFuture = currentAsyncConnectionFuture.thenCombine( + asyncConnectionFuture = currentAsyncConnectionFuture.thenCompose( new Function>() { @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java index 6d633019ec..cc9c7050ac 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.EventExecutorGroup; + import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.v1.AccessMode; @@ -29,12 +31,15 @@ public class SessionFactoryImpl implements SessionFactory { private final ConnectionProvider connectionProvider; private final RetryLogic retryLogic; + private final EventExecutorGroup eventExecutorGroup; private final Logging logging; private final boolean leakedSessionsLoggingEnabled; - SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config ) + SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup, Config config ) { this.connectionProvider = connectionProvider; + this.eventExecutorGroup = eventExecutorGroup; this.leakedSessionsLoggingEnabled = config.logLeakedSessions(); this.retryLogic = retryLogic; this.logging = config.logging(); @@ -51,9 +56,9 @@ public final Session newInstance( AccessMode mode, Bookmark bookmark ) protected NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic, AccessMode mode, Logging logging ) { - return leakedSessionsLoggingEnabled ? - new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, logging ) : - new NetworkSession( connectionProvider, mode, retryLogic, logging ); + return leakedSessionsLoggingEnabled + ? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging ) + : new NetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging ); } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java index 8f14c7d182..04b0240eee 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java @@ -23,6 +23,7 @@ import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.util.Function; public final class Futures @@ -45,14 +46,14 @@ public static InternalFuture thenApply( Future future, Bootstrap bo return result; } - public static InternalFuture thenCombine( InternalFuture future, Function> fn ) + public static InternalFuture thenCompose( InternalFuture future, Function> fn ) { InternalPromise result = new InternalPromise<>( future.eventExecutor() ); - future.addListener( new ThenCombineListener<>( result, fn ) ); + future.addListener( new ThenComposeListener<>( result, fn ) ); return result; } - public static InternalFuture whenComplete( InternalFuture future, Runnable action ) + public static InternalFuture whenComplete( InternalFuture future, BiConsumer action ) { InternalPromise result = new InternalPromise<>( future.eventExecutor() ); future.addListener( new CompletionListener<>( result, action ) ); @@ -97,12 +98,12 @@ else if ( future.isSuccess() ) } } - private static class ThenCombineListener implements GenericFutureListener> + private static class ThenComposeListener implements GenericFutureListener> { final Promise result; final Function> fn; - ThenCombineListener( Promise result, Function> fn ) + ThenComposeListener( Promise result, Function> fn ) { this.result = result; this.fn = fn; @@ -168,9 +169,9 @@ else if ( future.isSuccess() ) private static class CompletionListener implements GenericFutureListener> { final Promise result; - final Runnable action; + final BiConsumer action; - CompletionListener( Promise result, Runnable action ) + CompletionListener( Promise result, BiConsumer action ) { this.result = result; this.action = action; @@ -187,7 +188,7 @@ else if ( future.isSuccess() ) { try { - action.run(); + action.accept( future.getNow(), null ); result.setSuccess( future.getNow() ); } catch ( Throwable t ) @@ -200,7 +201,7 @@ else if ( future.isSuccess() ) Throwable error = future.cause(); try { - action.run(); + action.accept( null, error ); } catch ( Throwable t ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java index 54245824be..6444384fe3 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalFuture.java @@ -21,6 +21,7 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.util.Function; @@ -30,7 +31,7 @@ public interface InternalFuture extends Future, Response InternalFuture thenApply( Function fn ); - InternalFuture thenCombine( Function> fn ); + InternalFuture thenCompose( Function> fn ); - InternalFuture whenComplete( Runnable action ); + InternalFuture whenComplete( BiConsumer action ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java index dd1db10afd..6a48065b4e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java @@ -20,6 +20,7 @@ import io.netty.bootstrap.Bootstrap; import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.GenericFutureListener; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.neo4j.driver.internal.util.BiConsumer; import org.neo4j.driver.v1.ResponseListener; import org.neo4j.driver.v1.util.Function; @@ -39,7 +41,12 @@ public class InternalPromise implements InternalFuture, Promise public InternalPromise( Bootstrap bootstrap ) { - this( bootstrap.config().group().next() ); + this( bootstrap.config().group() ); + } + + public InternalPromise( EventExecutorGroup eventExecutorGroup ) + { + this( eventExecutorGroup.next() ); } public InternalPromise( EventExecutor eventExecutor ) @@ -61,13 +68,13 @@ public InternalFuture thenApply( Function fn ) } @Override - public InternalFuture thenCombine( Function> fn ) + public InternalFuture thenCompose( Function> fn ) { - return Futures.thenCombine( this, fn ); + return Futures.thenCompose( this, fn ); } @Override - public InternalFuture whenComplete( Runnable action ) + public InternalFuture whenComplete( BiConsumer action ) { return Futures.whenComplete( this, action ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java index ca09fd5c01..bb2cd5ed4b 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java @@ -18,25 +18,37 @@ */ package org.neo4j.driver.internal.async; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.neo4j.driver.internal.handlers.PullAllResponseHandler; import org.neo4j.driver.internal.handlers.RunResponseHandler; +import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.StatementResultCursor; import org.neo4j.driver.v1.summary.ResultSummary; +import static java.util.Objects.requireNonNull; + public class InternalStatementResultCursor implements StatementResultCursor { + private final AsyncConnection connection; private final RunResponseHandler runResponseHandler; private final PullAllResponseHandler pullAllHandler; - public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler ) + private InternalFuture peekedRecordResponse; + + public InternalStatementResultCursor( AsyncConnection connection, RunResponseHandler runResponseHandler, + PullAllResponseHandler pullAllHandler ) { - this.runResponseHandler = runResponseHandler; - this.pullAllHandler = pullAllHandler; + this.connection = requireNonNull( connection ); + this.runResponseHandler = requireNonNull( runResponseHandler ); + this.pullAllHandler = requireNonNull( pullAllHandler ); } @Override @@ -53,14 +65,116 @@ public Response summaryAsync() } @Override - public Response fetchAsync() + public Response nextAsync() + { + return internalNextAsync(); + } + + @Override + public Response peekAsync() + { + if ( peekedRecordResponse == null ) + { + peekedRecordResponse = pullAllHandler.nextAsync(); + } + return peekedRecordResponse; + } + + @Override + public Response forEachAsync( final Consumer action ) { - return pullAllHandler.fetchRecordAsync(); + InternalPromise result = connection.newPromise(); + internalForEachAsync( action, result ); + return result; } @Override - public Record current() + public Response> listAsync() + { + InternalPromise> result = connection.newPromise(); + internalListAsync( new ArrayList(), result ); + return result; + } + + private void internalForEachAsync( final Consumer action, final InternalPromise result ) + { + final InternalFuture recordFuture = internalNextAsync(); + + recordFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) + { + if ( future.isCancelled() ) + { + result.cancel( true ); + } + else if ( future.isSuccess() ) + { + Record record = future.getNow(); + if ( record != null ) + { + action.accept( record ); + internalForEachAsync( action, result ); + } + else + { + result.setSuccess( null ); + } + } + else + { + result.setFailure( future.cause() ); + } + } + } ); + } + + private void internalListAsync( final List records, final InternalPromise> result ) + { + final InternalFuture recordFuture = internalNextAsync(); + + recordFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) + { + if ( future.isCancelled() ) + { + result.cancel( true ); + } + else if ( future.isSuccess() ) + { + Record record = future.getNow(); + if ( record != null ) + { + records.add( record ); + internalListAsync( records, result ); + } + else + { + result.setSuccess( records ); + } + } + else + { + result.setFailure( future.cause() ); + } + } + } ); + } + + private InternalFuture internalNextAsync() { - return pullAllHandler.currentRecord(); + if ( peekedRecordResponse != null ) + { + InternalFuture result = peekedRecordResponse; + peekedRecordResponse = null; + return result; + } + else + { + return pullAllHandler.nextAsync(); + } } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Main.java b/driver/src/main/java/org/neo4j/driver/internal/async/Main.java index 2abd234d4c..225b1a8e46 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/Main.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/Main.java @@ -158,9 +158,9 @@ public void apply( Driver driver, MutableInt recordsRead ) Session session = driver.session(); Response cursorResponse = session.runAsync( QUERY, PARAMS_OBJ ); StatementResultCursor cursor = await( cursorResponse ); - while ( await( cursor.fetchAsync() ) ) + Record record; + while ( (record = await( cursor.nextAsync() )) != null ) { - Record record = cursor.current(); useRecord( record ); recordsRead.increment(); } @@ -202,9 +202,9 @@ public void apply( Driver driver, MutableInt recordsRead ) Session session = driver.session(); Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor = await( tx.runAsync( QUERY, PARAMS_OBJ ) ); - while ( await( cursor.fetchAsync() ) ) + Record record; + while ( (record = await( cursor.nextAsync() )) != null ) { - Record record = cursor.current(); useRecord( record ); recordsRead.increment(); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java index 2afae41876..d08e3fc3f2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/QueryRunner.java @@ -43,14 +43,14 @@ public static InternalFuture runAsync( AsyncConnection co return runAsync( connection, statement, null ); } - public static InternalFuture runAsync( AsyncConnection connection, Statement statement, + public static InternalFuture runAsync( final AsyncConnection connection, Statement statement, ExplicitTransaction tx ) { String query = statement.text(); Map params = statement.parameters().asMap( ofValue() ); InternalPromise runCompletedPromise = connection.newPromise(); - final RunResponseHandler runHandler = new RunResponseHandler( runCompletedPromise ); + final RunResponseHandler runHandler = new RunResponseHandler( runCompletedPromise, tx ); final PullAllResponseHandler pullAllHandler = newPullAllHandler( statement, runHandler, connection, tx ); connection.run( query, params, runHandler ); @@ -62,7 +62,7 @@ public static InternalFuture runAsync( AsyncConnection co @Override public StatementResultCursor apply( Void ignore ) { - return new InternalStatementResultCursor( runHandler, pullAllHandler ); + return new InternalStatementResultCursor( connection, runHandler, pullAllHandler ); } } ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java index 3803bec368..6e5e1f2345 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java @@ -53,15 +53,14 @@ public abstract class PullAllResponseHandler implements ResponseHandler private final RunResponseHandler runResponseHandler; protected final AsyncConnection connection; - private final Queue records; + private final Queue records = new LinkedList<>(); + private boolean succeeded; private Throwable failure; - private ResultSummary summary; - private volatile Record current; - private InternalPromise recordAvailablePromise; - private InternalPromise summaryAvailablePromise; + private InternalPromise recordPromise; + private InternalPromise summaryPromise; public PullAllResponseHandler( Statement statement, RunResponseHandler runResponseHandler, AsyncConnection connection ) @@ -69,27 +68,21 @@ public PullAllResponseHandler( Statement statement, RunResponseHandler runRespon this.statement = requireNonNull( statement ); this.runResponseHandler = requireNonNull( runResponseHandler ); this.connection = requireNonNull( connection ); - this.records = new LinkedList<>(); } @Override public synchronized void onSuccess( Map metadata ) { summary = extractResultSummary( metadata ); - if ( summaryAvailablePromise != null ) + if ( summaryPromise != null ) { - summaryAvailablePromise.setSuccess( summary ); - summaryAvailablePromise = null; + summaryPromise.setSuccess( summary ); + summaryPromise = null; } succeeded = true; afterSuccess(); - - if ( recordAvailablePromise != null ) - { - recordAvailablePromise.setSuccess( false ); - recordAvailablePromise = null; - } + succeedRecordPromise( null ); } protected abstract void afterSuccess(); @@ -99,12 +92,7 @@ public synchronized void onFailure( Throwable error ) { failure = error; afterFailure( error ); - - if ( recordAvailablePromise != null ) - { - recordAvailablePromise.setFailure( error ); - recordAvailablePromise = null; - } + failRecordPromise( error ); } protected abstract void afterFailure( Throwable error ); @@ -114,11 +102,9 @@ public synchronized void onRecord( Value[] fields ) { Record record = new InternalRecord( runResponseHandler.statementKeys(), fields ); - if ( recordAvailablePromise != null ) + if ( recordPromise != null ) { - current = record; - recordAvailablePromise.setSuccess( true ); - recordAvailablePromise = null; + succeedRecordPromise( record ); } else { @@ -126,42 +112,33 @@ public synchronized void onRecord( Value[] fields ) } } - public synchronized InternalFuture fetchRecordAsync() + public synchronized InternalFuture nextAsync() { Record record = dequeueRecord(); if ( record == null ) { if ( succeeded ) { - return connection.newPromise().setSuccess( false ); + return connection.newPromise().setSuccess( null ); } if ( failure != null ) { - return connection.newPromise().setFailure( failure ); + return connection.newPromise().setFailure( failure ); } - if ( recordAvailablePromise == null ) + if ( recordPromise == null ) { - recordAvailablePromise = connection.newPromise(); + recordPromise = connection.newPromise(); } - - return recordAvailablePromise; + return recordPromise; } else { - current = record; - return connection.newPromise().setSuccess( true ); + return connection.newPromise().setSuccess( record ); } } - public Record currentRecord() - { - Record result = current; - current = null; - return result; - } - public synchronized InternalFuture summaryAsync() { if ( summary != null ) @@ -170,11 +147,11 @@ public synchronized InternalFuture summaryAsync() } else { - if ( summaryAvailablePromise == null ) + if ( summaryPromise == null ) { - summaryAvailablePromise = connection.newPromise(); + summaryPromise = connection.newPromise(); } - return summaryAvailablePromise; + return summaryPromise; } } @@ -203,6 +180,26 @@ private Record dequeueRecord() return record; } + private void succeedRecordPromise( Record record ) + { + if ( recordPromise != null ) + { + InternalPromise promise = recordPromise; + recordPromise = null; + promise.setSuccess( record ); + } + } + + private void failRecordPromise( Throwable error ) + { + if ( recordPromise != null ) + { + InternalPromise promise = recordPromise; + recordPromise = null; + promise.setFailure( error ); + } + } + private ResultSummary extractResultSummary( Map metadata ) { return new InternalResultSummary( statement, connection.serverInfo(), extractStatementType( metadata ), diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java index 434554e21e..83ba08c75f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RunResponseHandler.java @@ -25,19 +25,22 @@ import java.util.List; import java.util.Map; +import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.spi.ResponseHandler; import org.neo4j.driver.v1.Value; public class RunResponseHandler implements ResponseHandler { private final Promise runCompletedPromise; + private final ExplicitTransaction tx; private List statementKeys; private long resultAvailableAfter; - public RunResponseHandler( Promise runCompletedPromise ) + public RunResponseHandler( Promise runCompletedPromise, ExplicitTransaction tx ) { this.runCompletedPromise = runCompletedPromise; + this.tx = tx; } @Override @@ -55,6 +58,10 @@ public void onSuccess( Map metadata ) @Override public void onFailure( Throwable error ) { + if ( tx != null ) + { + tx.resultFailed( error ); + } if ( runCompletedPromise != null ) { runCompletedPromise.setFailure( error ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index a30290e42b..a1d675a005 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -18,10 +18,18 @@ */ package org.neo4j.driver.internal.retry; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.neo4j.driver.internal.async.InternalFuture; +import org.neo4j.driver.internal.async.InternalPromise; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; import org.neo4j.driver.v1.Logger; @@ -41,27 +49,31 @@ public class ExponentialBackoffRetryLogic implements RetryLogic private static final long INITIAL_RETRY_DELAY_MS = SECONDS.toMillis( 1 ); private static final double RETRY_DELAY_MULTIPLIER = 2.0; private static final double RETRY_DELAY_JITTER_FACTOR = 0.2; + private static final long MAX_RETRY_DELAY = Long.MAX_VALUE / 2; private final long maxRetryTimeMs; private final long initialRetryDelayMs; private final double multiplier; private final double jitterFactor; + private final EventExecutorGroup eventExecutorGroup; private final Clock clock; private final Logger log; - public ExponentialBackoffRetryLogic( RetrySettings settings, Clock clock, Logging logging ) + public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup, Clock clock, + Logging logging ) { this( settings.maxRetryTimeMs(), INITIAL_RETRY_DELAY_MS, RETRY_DELAY_MULTIPLIER, RETRY_DELAY_JITTER_FACTOR, - clock, logging ); + eventExecutorGroup, clock, logging ); } ExponentialBackoffRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, double multiplier, - double jitterFactor, Clock clock, Logging logging ) + double jitterFactor, EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging ) { this.maxRetryTimeMs = maxRetryTimeMs; this.initialRetryDelayMs = initialRetryDelayMs; this.multiplier = multiplier; this.jitterFactor = jitterFactor; + this.eventExecutorGroup = eventExecutorGroup; this.clock = clock; this.log = logging.getLog( RETRY_LOGIC_LOG_NAME ); @@ -95,7 +107,7 @@ public T retry( Supplier work ) if ( elapsedTime < maxRetryTimeMs ) { long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); - log.error( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error ); + log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error ); sleep( delayWithJitterMs ); nextDelayMs = (long) (nextDelayMs * multiplier); @@ -109,8 +121,124 @@ public T retry( Supplier work ) } } + @Override + public InternalFuture retryAsync( Supplier> work ) + { + InternalPromise result = new InternalPromise<>( eventExecutorGroup ); + executeWorkInEventLoop( result, work ); + return result; + } + + protected boolean canRetryOn( Throwable error ) + { + return error instanceof SessionExpiredException || + error instanceof ServiceUnavailableException || + isTransientError( error ); + } + + private void executeWorkInEventLoop( final InternalPromise result, final Supplier> work ) + { + // this is the very first time we execute given work + EventExecutor eventExecutor = eventExecutorGroup.next(); + + eventExecutor.execute( new Runnable() + { + @Override + public void run() + { + executeWork( result, work, -1, initialRetryDelayMs, null ); + } + } ); + } + + private void retryWorkInEventLoop( final InternalPromise result, final Supplier> work, + final Throwable error, final long startTime, final long delayMs, final List errors ) + { + // work has failed before, we need to schedule retry with the given delay + EventExecutor eventExecutor = eventExecutorGroup.next(); + + long delayWithJitterMs = computeDelayWithJitter( delayMs ); + log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); + + eventExecutor.schedule( new Runnable() + { + @Override + public void run() + { + long newRetryDelayMs = (long) (delayMs * multiplier); + executeWork( result, work, startTime, newRetryDelayMs, errors ); + } + }, delayWithJitterMs, TimeUnit.MILLISECONDS ); + } + + private void executeWork( final InternalPromise result, final Supplier> work, + final long startTime, final long retryDelayMs, final List errors ) + { + InternalFuture workFuture; + try + { + workFuture = work.get(); + } + catch ( Throwable error ) + { + // work failed in a sync way, attempt to schedule a retry + retryOnError( result, work, startTime, retryDelayMs, error, errors ); + return; + } + + workFuture.addListener( new FutureListener() + { + @Override + public void operationComplete( Future future ) + { + if ( future.isCancelled() ) + { + result.cancel( true ); + } + else if ( future.isSuccess() ) + { + result.setSuccess( future.getNow() ); + } + else + { + // work failed in async way, attempt to schedule a retry + retryOnError( result, work, startTime, retryDelayMs, future.cause(), errors ); + } + } + } ); + } + + private void retryOnError( InternalPromise result, Supplier> work, long startTime, + long retryDelayMs, Throwable error, List errors ) + { + if ( canRetryOn( error ) ) + { + long currentTime = clock.millis(); + if ( startTime == -1 ) + { + startTime = currentTime; + } + + long elapsedTime = currentTime - startTime; + if ( elapsedTime < maxRetryTimeMs ) + { + errors = recordError( error, errors ); + retryWorkInEventLoop( result, work, error, startTime, retryDelayMs, errors ); + return; + } + } + + addSuppressed( error, errors ); + result.setFailure( error ); + } + private long computeDelayWithJitter( long delayMs ) { + if ( delayMs > MAX_RETRY_DELAY ) + { + delayMs = MAX_RETRY_DELAY; + } + long jitter = (long) (delayMs * jitterFactor); long min = delayMs - jitter; long max = delayMs + jitter; @@ -154,13 +282,6 @@ private void verifyAfterConstruction() } } - private static boolean canRetryOn( Throwable error ) - { - return error instanceof SessionExpiredException || - error instanceof ServiceUnavailableException || - isTransientError( error ); - } - private static boolean isTransientError( Throwable error ) { if ( error instanceof TransientException ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java index d37b2b3055..5f6569b87d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java @@ -18,9 +18,12 @@ */ package org.neo4j.driver.internal.retry; +import org.neo4j.driver.internal.async.InternalFuture; import org.neo4j.driver.internal.util.Supplier; public interface RetryLogic { T retry( Supplier work ); + + InternalFuture retryAsync( Supplier> work ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java new file mode 100644 index 0000000000..a9c09139db --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/util/BiConsumer.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +public interface BiConsumer +{ + void accept( T t, U u ); +} diff --git a/driver/src/main/java/org/neo4j/driver/v1/Session.java b/driver/src/main/java/org/neo4j/driver/v1/Session.java index a9674128cf..a3c9e3e61e 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/Session.java +++ b/driver/src/main/java/org/neo4j/driver/v1/Session.java @@ -96,6 +96,8 @@ public interface Session extends Resource, StatementRunner */ T readTransaction( TransactionWork work ); + Response readTransactionAsync( TransactionWork> work ); + /** * Execute given unit of work in a {@link AccessMode#WRITE write} transaction. *

@@ -108,6 +110,8 @@ public interface Session extends Resource, StatementRunner */ T writeTransaction( TransactionWork work ); + Response writeTransactionAsync( TransactionWork> work ); + /** * Return the bookmark received following the last completed * {@linkplain Transaction transaction}. If no bookmark was received diff --git a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java index d44232fff7..528838b63c 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java +++ b/driver/src/main/java/org/neo4j/driver/v1/StatementResultCursor.java @@ -20,6 +20,7 @@ import java.util.List; +import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.summary.ResultSummary; public interface StatementResultCursor @@ -33,7 +34,11 @@ public interface StatementResultCursor Response summaryAsync(); - Response fetchAsync(); + Response nextAsync(); - Record current(); + Response peekAsync(); + + Response forEachAsync( Consumer action ); + + Response> listAsync(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index dd7be8391b..7a89922175 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.EventExecutorGroup; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -169,7 +170,8 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, @Override protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, - RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic ) + RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic, + EventExecutorGroup eventExecutorGroup ) { throw new UnsupportedOperationException( "Can't create routing driver" ); } @@ -200,9 +202,10 @@ protected LoadBalancer createLoadBalancer( BoltServerAddress address, Connection @Override protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, - RetryLogic retryLogic, Config config ) + RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup, Config config ) { - SessionFactory sessionFactory = super.createSessionFactory( connectionProvider, retryLogic, config ); + SessionFactory sessionFactory = + super.createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config ); capturedSessionFactory = sessionFactory; return sessionFactory; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java index 1249acb047..6188e4d51b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -99,7 +100,7 @@ private static void finalize( Session session ) throws Exception private static LeakLoggingNetworkSession newSession( Logging logging, boolean openConnection ) { return new LeakLoggingNetworkSession( connectionProviderMock( openConnection ), READ, - new FixedRetryLogic( 0 ), logging ); + new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, logging ); } private static ConnectionProvider connectionProviderMock( final boolean openConnection ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 132a4a826f..23343fff11 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -1010,7 +1011,8 @@ private static NetworkSession newSession( ConnectionProvider connectionProvider, private static NetworkSession newSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic, Bookmark bookmark ) { - NetworkSession session = new NetworkSession( connectionProvider, mode, retryLogic, DEV_NULL_LOGGING ); + NetworkSession session = new NetworkSession( connectionProvider, mode, retryLogic, + GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGING ); session.setBookmark( bookmark ); return session; } diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java index 6212951034..5e6a61800d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -439,7 +440,7 @@ private static class NetworkSessionWithAddressFactory extends SessionFactoryImpl { NetworkSessionWithAddressFactory( ConnectionProvider connectionProvider, Config config ) { - super( connectionProvider, new FixedRetryLogic( 0 ), config ); + super( connectionProvider, new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, config ); } @Override @@ -456,7 +457,7 @@ private static class NetworkSessionWithAddress extends NetworkSession NetworkSessionWithAddress( ConnectionProvider connectionProvider, AccessMode mode, Logging logging ) { - super( connectionProvider, mode, new FixedRetryLogic( 0 ), logging ); + super( connectionProvider, mode, new FixedRetryLogic( 0 ), GlobalEventExecutor.INSTANCE, logging ); try ( PooledConnection connection = connectionProvider.acquireConnection( mode ) ) { this.address = connection.boltServerAddress(); diff --git a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java index fd27004032..fdf07758d2 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Test; import org.neo4j.driver.internal.retry.FixedRetryLogic; @@ -61,6 +62,7 @@ public void createsLeakLoggingNetworkSessions() private static SessionFactory newSessionFactory( Config config ) { - return new SessionFactoryImpl( mock( ConnectionProvider.class ), new FixedRetryLogic( 0 ), config ); + return new SessionFactoryImpl( mock( ConnectionProvider.class ), new FixedRetryLogic( 0 ), + GlobalEventExecutor.INSTANCE, config ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java index 8cec050acc..f80e26c9b3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.cluster.loadbalancing; +import io.netty.util.concurrent.GlobalEventExecutor; import org.junit.Test; import org.mockito.InOrder; import org.mockito.invocation.InvocationOnMock; @@ -384,8 +385,10 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne private static Session newSession( LoadBalancer loadBalancer ) { SleeplessClock clock = new SleeplessClock(); - RetryLogic retryLogic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, clock, DEV_NULL_LOGGING ); - return new NetworkSession( loadBalancer, AccessMode.WRITE, retryLogic, DEV_NULL_LOGGING ); + RetryLogic retryLogic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, GlobalEventExecutor.INSTANCE, + clock, DEV_NULL_LOGGING ); + return new NetworkSession( loadBalancer, AccessMode.WRITE, retryLogic, GlobalEventExecutor.INSTANCE, + DEV_NULL_LOGGING ); } private static PooledConnection newConnectionWithFailingSync( BoltServerAddress address ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java index e6916c474c..be0dd58a27 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java @@ -23,10 +23,13 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; -import org.neo4j.driver.internal.logging.DevNullLogging; +import org.neo4j.driver.internal.async.InternalFuture; +import org.neo4j.driver.internal.async.InternalPromise; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Supplier; +import org.neo4j.driver.internal.util.TrackingEventExecutor; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -51,9 +54,13 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.v1.util.TestUtil.await; public class ExponentialBackoffRetryLogicTest { + private final TrackingEventExecutor eventExecutor = new TrackingEventExecutor(); + @Test public void throwsForIllegalMaxRetryTime() { @@ -155,6 +162,24 @@ public void nextDelayCalculatedAccordingToMultiplier() throws Exception assertEquals( delaysWithoutJitter( initialDelay, multiplier, retries ), sleepValues( clock, retries ) ); } + @Test + public void nextDelayCalculatedAccordingToMultiplierAsync() throws Exception + { + String result = "The Result"; + int retries = 14; + int initialDelay = 1; + int multiplier = 2; + int noJitter = 0; + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, noJitter, + Clock.SYSTEM ); + + InternalFuture future = retryAsync( retryLogic, retries, result ); + + assertEquals( result, future.get() ); + assertEquals( delaysWithoutJitter( initialDelay, multiplier, retries ), eventExecutor.scheduleDelays() ); + } + @Test public void nextDelayCalculatedAccordingToJitter() throws Exception { @@ -170,16 +195,29 @@ public void nextDelayCalculatedAccordingToJitter() throws Exception List sleepValues = sleepValues( clock, retries ); List delaysWithoutJitter = delaysWithoutJitter( initialDelay, multiplier, retries ); - assertEquals( delaysWithoutJitter.size(), sleepValues.size() ); - for ( int i = 0; i < sleepValues.size(); i++ ) - { - double sleepValue = sleepValues.get( i ).doubleValue(); - long delayWithoutJitter = delaysWithoutJitter.get( i ); - double jitter = delayWithoutJitter * jitterFactor; + assertDelaysApproximatelyEqual( delaysWithoutJitter, sleepValues, jitterFactor ); + } - assertThat( sleepValue, closeTo( delayWithoutJitter, jitter ) ); - } + @Test + public void nextDelayCalculatedAccordingToJitterAsync() throws Exception + { + String result = "The Result"; + int retries = 24; + double jitterFactor = 0.2; + int initialDelay = 1; + int multiplier = 2; + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( MAX_VALUE, initialDelay, multiplier, jitterFactor, + mock( Clock.class ) ); + + InternalFuture future = retryAsync( retryLogic, retries, result ); + assertEquals( result, future.get() ); + + List scheduleDelays = eventExecutor.scheduleDelays(); + List delaysWithoutJitter = delaysWithoutJitter( initialDelay, multiplier, retries ); + + assertDelaysApproximatelyEqual( delaysWithoutJitter, scheduleDelays, jitterFactor ); } @Test @@ -215,6 +253,44 @@ public void doesNotRetryWhenMaxRetryTimeExceeded() throws Exception verify( workMock, times( 3 ) ).get(); } + @Test + public void doesNotRetryWhenMaxRetryTimeExceededAsync() throws Exception + { + long retryStart = Clock.SYSTEM.millis(); + int initialDelay = 100; + int multiplier = 2; + long maxRetryTimeMs = 45; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( retryStart ) + .thenReturn( retryStart + maxRetryTimeMs - 5 ) + .thenReturn( retryStart + maxRetryTimeMs + 7 ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( maxRetryTimeMs, initialDelay, multiplier, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + InternalFuture future = retryLogic.retryAsync( workMock ); + + try + { + await( future ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 2, scheduleDelays.size() ); + assertEquals( initialDelay, scheduleDelays.get( 0 ).intValue() ); + assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); + + verify( workMock, times( 3 ) ).get(); + } + @Test public void sleepsOnServiceUnavailableException() throws Exception { @@ -231,6 +307,26 @@ public void sleepsOnServiceUnavailableException() throws Exception verify( clock ).sleep( 42 ); } + @Test + public void schedulesRetryOnServiceUnavailableException() throws Exception + { + String result = "The Result"; + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 42, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( succeededFuture( result ) ); + + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); + + verify( workMock, times( 2 ) ).get(); + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 1, scheduleDelays.size() ); + assertEquals( 42, scheduleDelays.get( 0 ).intValue() ); + } + @Test public void sleepsOnSessionExpiredException() throws Exception { @@ -247,6 +343,26 @@ public void sleepsOnSessionExpiredException() throws Exception verify( clock ).sleep( 4242 ); } + @Test + public void schedulesRetryOnSessionExpiredException() throws Exception + { + String result = "The Result"; + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 4242, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( succeededFuture( result ) ); + + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); + + verify( workMock, times( 2 ) ).get(); + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 1, scheduleDelays.size() ); + assertEquals( 4242, scheduleDelays.get( 0 ).intValue() ); + } + @Test public void sleepsOnTransientException() throws Exception { @@ -263,6 +379,26 @@ public void sleepsOnTransientException() throws Exception verify( clock ).sleep( 23 ); } + @Test + public void schedulesRetryOnTransientException() throws Exception + { + String result = "The Result"; + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 23, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + TransientException error = transientException(); + when( workMock.get() ).thenReturn( failedFuture( error ) ).thenReturn( succeededFuture( result ) ); + + assertEquals( result, await( retryLogic.retryAsync( workMock ) ) ); + + verify( workMock, times( 2 ) ).get(); + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 1, scheduleDelays.size() ); + assertEquals( 23, scheduleDelays.get( 0 ).intValue() ); + } + @Test public void throwsWhenUnknownError() throws Exception { @@ -287,6 +423,31 @@ public void throwsWhenUnknownError() throws Exception verify( clock, never() ).sleep( anyLong() ); } + @Test + public void doesNotRetryOnUnknownError() + { + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 1, 1, 1, clock ); + + Supplier> workMock = newWorkMock(); + IllegalStateException error = new IllegalStateException(); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + await( retryLogic.retryAsync( workMock ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + assertEquals( 0, eventExecutor.scheduleDelays().size() ); + } + @Test public void throwsWhenTransactionTerminatedError() throws Exception { @@ -311,6 +472,31 @@ public void throwsWhenTransactionTerminatedError() throws Exception verify( clock, never() ).sleep( 13 ); } + @Test + public void doesNotRetryOnTransactionTerminatedError() + { + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 13, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + TransientException error = new TransientException( "Neo.TransientError.Transaction.Terminated", "" ); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + await( retryLogic.retryAsync( workMock ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + assertEquals( 0, eventExecutor.scheduleDelays().size() ); + } + @Test public void throwsWhenTransactionLockClientStoppedError() throws Exception { @@ -335,6 +521,31 @@ public void throwsWhenTransactionLockClientStoppedError() throws Exception verify( clock, never() ).sleep( 13 ); } + @Test + public void doesNotRetryOnTransactionLockClientStoppedError() + { + Clock clock = mock( Clock.class ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( 1, 15, 1, 0, clock ); + + Supplier> workMock = newWorkMock(); + TransientException error = new TransientException( "Neo.TransientError.Transaction.LockClientStopped", "" ); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + await( retryLogic.retryAsync( workMock ) ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertEquals( error, e ); + } + + verify( workMock ).get(); + assertEquals( 0, eventExecutor.scheduleDelays().size() ); + } + @Test public void throwsWhenSleepInterrupted() throws Exception { @@ -402,6 +613,56 @@ public void collectsSuppressedErrors() throws Exception verify( clock ).sleep( initialDelay * multiplier * multiplier ); } + @Test + public void collectsSuppressedErrorsAsync() throws Exception + { + String result = "The Result"; + long maxRetryTime = 20; + int initialDelay = 15; + int multiplier = 2; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( 0L ).thenReturn( 10L ).thenReturn( 15L ).thenReturn( 25L ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( maxRetryTime, initialDelay, multiplier, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error1 = sessionExpired(); + SessionExpiredException error2 = sessionExpired(); + ServiceUnavailableException error3 = serviceUnavailable(); + TransientException error4 = transientException(); + + when( workMock.get() ).thenReturn( failedFuture( error1 ) ) + .thenReturn( failedFuture( error2 ) ) + .thenReturn( failedFuture( error3 ) ) + .thenReturn( failedFuture( error4 ) ) + .thenReturn( succeededFuture( result ) ); + + try + { + retryLogic.retryAsync( workMock ).get(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ExecutionException.class ) ); + Throwable cause = e.getCause(); + assertEquals( error4, cause ); + Throwable[] suppressed = cause.getSuppressed(); + assertEquals( 3, suppressed.length ); + assertEquals( error1, suppressed[0] ); + assertEquals( error2, suppressed[1] ); + assertEquals( error3, suppressed[2] ); + } + + verify( workMock, times( 4 ) ).get(); + + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 3, scheduleDelays.size() ); + assertEquals( initialDelay, scheduleDelays.get( 0 ).intValue() ); + assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); + assertEquals( initialDelay * multiplier * multiplier, scheduleDelays.get( 2 ).intValue() ); + } + @Test public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() throws Exception { @@ -434,6 +695,43 @@ public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrown() throws Excepti verify( clock ).sleep( initialDelay * multiplier ); } + @Test + public void doesNotCollectSuppressedErrorsWhenSameErrorIsThrownAsync() throws Exception + { + long maxRetryTime = 20; + int initialDelay = 15; + int multiplier = 2; + Clock clock = mock( Clock.class ); + when( clock.millis() ).thenReturn( 0L ).thenReturn( 10L ).thenReturn( 25L ); + + ExponentialBackoffRetryLogic retryLogic = newRetryLogic( maxRetryTime, initialDelay, multiplier, 0, clock ); + + Supplier> workMock = newWorkMock(); + SessionExpiredException error = sessionExpired(); + when( workMock.get() ).thenReturn( failedFuture( error ) ); + + try + { + retryLogic.retryAsync( workMock ).get(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ExecutionException.class ) ); + Throwable cause = e.getCause(); + + assertEquals( error, cause ); + assertEquals( 0, cause.getSuppressed().length ); + } + + verify( workMock, times( 3 ) ).get(); + + List scheduleDelays = eventExecutor.scheduleDelays(); + assertEquals( 2, scheduleDelays.size() ); + assertEquals( initialDelay, scheduleDelays.get( 0 ).intValue() ); + assertEquals( initialDelay * multiplier, scheduleDelays.get( 1 ).intValue() ); + } + @Test public void eachRetryIsLogged() { @@ -442,16 +740,38 @@ public void eachRetryIsLogged() Logging logging = mock( Logging.class ); Logger logger = mock( Logger.class ); when( logging.getLog( anyString() ) ).thenReturn( logger ); - ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, clock, logging ); + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, + clock, logging ); retry( logic, retries ); - verify( logger, times( retries ) ).error( + verify( logger, times( retries ) ).warn( startsWith( "Transaction failed and will be retried" ), any( ServiceUnavailableException.class ) ); } + @Test + public void eachAsyncRetryIsLogged() + { + String result = "The Result"; + int retries = 9; + Clock clock = mock( Clock.class ); + Logging logging = mock( Logging.class ); + Logger logger = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( logger ); + + ExponentialBackoffRetryLogic logic = new ExponentialBackoffRetryLogic( RetrySettings.DEFAULT, eventExecutor, + clock, logging ); + + assertEquals( result, await( retryAsync( logic, retries, result ) ) ); + + verify( logger, times( retries ) ).warn( + startsWith( "Async transaction failed and is scheduled to retry" ), + any( ServiceUnavailableException.class ) + ); + } + private static void retry( ExponentialBackoffRetryLogic retryLogic, final int times ) { retryLogic.retry( new Supplier() @@ -471,6 +791,26 @@ public Void get() } ); } + private InternalFuture retryAsync( ExponentialBackoffRetryLogic retryLogic, final int times, + final Object result ) + { + return retryLogic.retryAsync( new Supplier>() + { + int invoked; + + @Override + public InternalFuture get() + { + if ( invoked < times ) + { + invoked++; + return failedFuture( serviceUnavailable() ); + } + return succeededFuture( result ); + } + } ); + } + private static List delaysWithoutJitter( long initialDelay, double multiplier, int count ) { List values = new ArrayList<>(); @@ -491,11 +831,21 @@ private static List sleepValues( Clock clockMock, int expectedCount ) thro return captor.getAllValues(); } - private static ExponentialBackoffRetryLogic newRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, + private ExponentialBackoffRetryLogic newRetryLogic( long maxRetryTimeMs, long initialRetryDelayMs, double multiplier, double jitterFactor, Clock clock ) { - return new ExponentialBackoffRetryLogic( maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor, clock, - DevNullLogging.DEV_NULL_LOGGING ); + return new ExponentialBackoffRetryLogic( maxRetryTimeMs, initialRetryDelayMs, multiplier, jitterFactor, + eventExecutor, clock, DEV_NULL_LOGGING ); + } + + private InternalFuture succeededFuture( Object value ) + { + return new InternalPromise<>( eventExecutor ).setSuccess( value ); + } + + private InternalFuture failedFuture( Throwable error ) + { + return new InternalPromise<>( eventExecutor ).setFailure( error ); } private static ServiceUnavailableException serviceUnavailable() @@ -514,8 +864,22 @@ private static TransientException transientException() } @SuppressWarnings( "unchecked" ) - private static Supplier newWorkMock() + private static Supplier newWorkMock() { return mock( Supplier.class ); } + + private static void assertDelaysApproximatelyEqual( List expectedDelays, List actualDelays, + double delta ) + { + assertEquals( expectedDelays.size(), actualDelays.size() ); + + for ( int i = 0; i < actualDelays.size(); i++ ) + { + double actualValue = actualDelays.get( i ).doubleValue(); + long expectedValue = expectedDelays.get( i ); + + assertThat( actualValue, closeTo( expectedValue, expectedValue * delta ) ); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java b/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java index 2b6235bf43..a142500e6b 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java +++ b/driver/src/test/java/org/neo4j/driver/internal/retry/FixedRetryLogic.java @@ -18,34 +18,33 @@ */ package org.neo4j.driver.internal.retry; -import org.neo4j.driver.internal.util.Supplier; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.GlobalEventExecutor; -public class FixedRetryLogic implements RetryLogic +import org.neo4j.driver.internal.util.SleeplessClock; + +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; + +public class FixedRetryLogic extends ExponentialBackoffRetryLogic { private final int retryCount; - private int invokedWork; + private int invocationCount; public FixedRetryLogic( int retryCount ) { + this( retryCount, GlobalEventExecutor.INSTANCE ); + } + + public FixedRetryLogic( int retryCount, EventExecutorGroup eventExecutorGroup ) + { + super( new RetrySettings( Long.MAX_VALUE ), eventExecutorGroup, new SleeplessClock(), + DEV_NULL_LOGGING ); this.retryCount = retryCount; } @Override - public T retry( Supplier work ) + protected boolean canRetryOn( Throwable error ) { - while ( true ) - { - try - { - return work.get(); - } - catch ( Throwable error ) - { - if ( invokedWork++ >= retryCount ) - { - throw error; - } - } - } + return invocationCount++ < retryCount; } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java index 4d8482b00c..65792e3012 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/DriverFactoryWithFixedRetryLogic.java @@ -18,6 +18,8 @@ */ package org.neo4j.driver.internal.util; +import io.netty.util.concurrent.EventExecutorGroup; + import org.neo4j.driver.internal.retry.FixedRetryLogic; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.retry.RetrySettings; @@ -34,8 +36,9 @@ public DriverFactoryWithFixedRetryLogic( int retryCount ) } @Override - protected RetryLogic createRetryLogic( RetrySettings settings, Logging logging ) + protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGroup eventExecutorGroup, + Logging logging ) { - return new FixedRetryLogic( retryCount ); + return new FixedRetryLogic( retryCount, eventExecutorGroup ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/TrackingEventExecutor.java b/driver/src/test/java/org/neo4j/driver/internal/util/TrackingEventExecutor.java new file mode 100644 index 0000000000..b357f67c7d --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/util/TrackingEventExecutor.java @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.util; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.concurrent.ProgressivePromise; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.util.Collections.unmodifiableList; +import static org.mockito.Mockito.mock; + +public class TrackingEventExecutor implements EventExecutor +{ + private final EventExecutor delegate; + private final List scheduleDelays; + + public TrackingEventExecutor() + { + this( GlobalEventExecutor.INSTANCE ); + } + + public TrackingEventExecutor( EventExecutor delegate ) + { + this.delegate = delegate; + this.scheduleDelays = new CopyOnWriteArrayList<>(); + } + + public List scheduleDelays() + { + return unmodifiableList( scheduleDelays ); + } + + @Override + public EventExecutor next() + { + return this; + } + + @Override + public EventExecutorGroup parent() + { + return this; + } + + @Override + public boolean inEventLoop() + { + return delegate.inEventLoop(); + } + + @Override + public boolean inEventLoop( Thread thread ) + { + return delegate.inEventLoop( thread ); + } + + @Override + public Promise newPromise() + { + return delegate.newPromise(); + } + + @Override + public ProgressivePromise newProgressivePromise() + { + return delegate.newProgressivePromise(); + } + + @Override + public Future newSucceededFuture( V result ) + { + return delegate.newSucceededFuture( result ); + } + + @Override + public Future newFailedFuture( Throwable cause ) + { + return delegate.newFailedFuture( cause ); + } + + @Override + public boolean isShuttingDown() + { + return delegate.isShuttingDown(); + } + + @Override + public Future shutdownGracefully() + { + return delegate.shutdownGracefully(); + } + + @Override + public Future shutdownGracefully( long quietPeriod, long timeout, TimeUnit unit ) + { + return delegate.shutdownGracefully( quietPeriod, timeout, unit ); + } + + @Override + public Future terminationFuture() + { + return delegate.terminationFuture(); + } + + @Override + @Deprecated + public void shutdown() + { + delegate.shutdown(); + } + + @Override + @Deprecated + public List shutdownNow() + { + return delegate.shutdownNow(); + } + + @Override + public Iterator iterator() + { + return delegate.iterator(); + } + + @Override + public Future submit( Runnable task ) + { + return delegate.submit( task ); + } + + @Override + public Future submit( Runnable task, T result ) + { + return delegate.submit( task, result ); + } + + @Override + public Future submit( Callable task ) + { + return delegate.submit( task ); + } + + @Override + public ScheduledFuture schedule( Runnable command, long delay, TimeUnit unit ) + { + scheduleDelays.add( unit.toMillis( delay ) ); + delegate.execute( command ); + return mock( ScheduledFuture.class ); + } + + @Override + public ScheduledFuture schedule( Callable callable, long delay, TimeUnit unit ) + { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( Runnable command, long initialDelay, long period, + TimeUnit unit ) + { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, + TimeUnit unit ) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShutdown() + { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() + { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException + { + return delegate.awaitTermination( timeout, unit ); + } + + @Override + public List> invokeAll( Collection> tasks ) + throws InterruptedException + { + return delegate.invokeAll( tasks ); + } + + @Override + public List> invokeAll( Collection> tasks, long timeout, + TimeUnit unit ) throws InterruptedException + { + return delegate.invokeAll( tasks, timeout, unit ); + } + + @Override + public T invokeAny( Collection> tasks ) throws InterruptedException, ExecutionException + { + return delegate.invokeAny( tasks ); + } + + @Override + public T invokeAny( Collection> tasks, long timeout, TimeUnit unit ) + throws InterruptedException, ExecutionException, TimeoutException + { + return delegate.invokeAny( tasks, timeout, unit ); + } + + @Override + public void execute( Runnable command ) + { + delegate.execute( command ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java index 4edfff25ac..30ce5ee546 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionAsyncIT.java @@ -28,28 +28,38 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.async.InternalPromise; +import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.ResponseListener; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; import org.neo4j.driver.v1.StatementResultCursor; +import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.exceptions.DatabaseException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; +import org.neo4j.driver.v1.exceptions.TransientException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.summary.StatementType; import org.neo4j.driver.v1.types.Node; import org.neo4j.driver.v1.util.TestNeo4j; +import static java.util.Collections.emptyIterator; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -71,13 +81,13 @@ public class SessionAsyncIT private Session session; @Before - public void setUp() throws Exception + public void setUp() { session = neo4j.driver().session(); } @After - public void tearDown() throws Exception + public void tearDown() { await( session.closeAsync() ); } @@ -87,7 +97,7 @@ public void shouldRunQueryWithEmptyResult() { StatementResultCursor cursor = await( session.runAsync( "CREATE (:Person)" ) ); - assertThat( await( cursor.fetchAsync() ), is( false ) ); + assertNull( await( cursor.nextAsync() ) ); } @Test @@ -95,14 +105,13 @@ public void shouldRunQueryWithSingleResult() { StatementResultCursor cursor = await( session.runAsync( "CREATE (p:Person {name: 'Nick Fury'}) RETURN p" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - - Record record = cursor.current(); + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); Node node = record.get( 0 ).asNode(); assertEquals( "Person", single( node.labels() ) ); assertEquals( "Nick Fury", node.get( "name" ).asString() ); - assertThat( await( cursor.fetchAsync() ), is( false ) ); + assertNull( await( cursor.nextAsync() ) ); } @Test @@ -110,16 +119,19 @@ public void shouldRunQueryWithMultipleResults() { StatementResultCursor cursor = await( session.runAsync( "UNWIND [1,2,3] AS x RETURN x" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 1, cursor.current().get( 0 ).asInt() ); + Record record1 = await( cursor.nextAsync() ); + assertNotNull( record1 ); + assertEquals( 1, record1.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 2, cursor.current().get( 0 ).asInt() ); + Record record2 = await( cursor.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 2, record2.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 3, cursor.current().get( 0 ).asInt() ); + Record record3 = await( cursor.nextAsync() ); + assertNotNull( record3 ); + assertEquals( 3, record3.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( false ) ); + assertNull( await( cursor.nextAsync() ) ); } @Test @@ -141,16 +153,17 @@ public void shouldFailWhenQueryFailsAtRuntime() { StatementResultCursor cursor = await( session.runAsync( "UNWIND [1, 2, 0] AS x RETURN 10 / x" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 10, cursor.current().get( 0 ).asInt() ); + Record record1 = await( cursor.nextAsync() ); + assertNotNull( record1 ); + assertEquals( 10, record1.get( 0 ).asInt() ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - assertEquals( 5, cursor.current().get( 0 ).asInt() ); + Record record2 = await( cursor.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 5, record2.get( 0 ).asInt() ); try { - await( cursor.fetchAsync() ); - System.out.println( cursor.current() ); + await( cursor.nextAsync() ); fail( "Exception expected" ); } catch ( Exception e ) @@ -170,16 +183,17 @@ public void shouldFailWhenServerIsRestarted() throws Exception try { - Response recordAvailable = cursor.fetchAsync(); + Response recordResponse = cursor.nextAsync(); // kill db after receiving the first record // do it from a listener so that event loop thread executes the kill operation - recordAvailable.addListener( new KillDbListener( neo4j ) ); + recordResponse.addListener( new KillDbListener( neo4j ) ); - while ( await( recordAvailable ) ) + Record record; + while ( (record = await( recordResponse )) != null ) { - assertNotNull( cursor.current() ); - recordAvailable = cursor.fetchAsync(); + assertNotNull( record ); + recordResponse = cursor.nextAsync(); } fail( "Exception expected" ); } @@ -195,18 +209,19 @@ public void shouldAllowNestedQueries() StatementResultCursor cursor = await( session.runAsync( "UNWIND [1, 2, 3] AS x CREATE (p:Person {id: x}) RETURN p" ) ); - Future>> queriesExecuted = runNestedQueries( cursor ); - List> futures = await( queriesExecuted ); + Future>> queriesExecuted = runNestedQueries( cursor ); + List> futures = await( queriesExecuted ); - List futureResults = awaitAll( futures ); + List futureResults = awaitAll( futures ); assertEquals( 7, futureResults.size() ); StatementResultCursor personCursor = await( session.runAsync( "MATCH (p:Person) RETURN p ORDER BY p.id" ) ); List personNodes = new ArrayList<>(); - while ( await( personCursor.fetchAsync() ) ) + Record record; + while ( (record = await( personCursor.nextAsync() )) != null ) { - personNodes.add( personCursor.current().get( 0 ).asNode() ); + personNodes.add( record.get( 0 ).asNode() ); } assertEquals( 3, personNodes.size() ); @@ -233,21 +248,20 @@ public void shouldAllowMultipleAsyncRunsWithoutConsumingResults() throws Interru cursors.add( session.runAsync( "CREATE (:Person)" ) ); } - List> fetches = new ArrayList<>(); + List> records = new ArrayList<>(); for ( StatementResultCursor cursor : awaitAll( cursors ) ) { - fetches.add( cursor.fetchAsync() ); + records.add( cursor.nextAsync() ); } - awaitAll( fetches ); + awaitAll( records ); await( session.closeAsync() ); session = neo4j.driver().session(); StatementResultCursor cursor = await( session.runAsync( "MATCH (p:Person) RETURN count(p)" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - - Record record = cursor.current(); + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); assertEquals( queryCount, record.get( 0 ).asInt() ); } @@ -348,31 +362,173 @@ public void shouldExposeResultSummaryForProfileQuery() assertThat( summary.resultConsumedAfter( TimeUnit.MILLISECONDS ), greaterThanOrEqualTo( 0L ) ); } - private Future>> runNestedQueries( StatementResultCursor inputCursor ) + @Test + public void shouldRunAsyncTransactionWithoutRetries() + { + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Apa) RETURN 42" ); + Response txResponse = session.writeTransactionAsync( work ); + + Record record = await( txResponse ); + assertNotNull( record ); + assertEquals( 42L, record.get( 0 ).asLong() ); + + assertEquals( 1, work.invocationCount() ); + assertEquals( 1, countNodesByLabel( "Apa" ) ); + } + + @Test + public void shouldRunAsyncTransactionWithRetriesOnAsyncFailures() { - Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); - runNestedQueries( inputCursor, new ArrayList>(), resultPromise ); + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Node) RETURN 24" ).withAsyncFailures( + new ServiceUnavailableException( "Oh!" ), + new SessionExpiredException( "Ah!" ), + new TransientException( "Code", "Message" ) ); + + Response txResponse = session.writeTransactionAsync( work ); + + Record record = await( txResponse ); + assertNotNull( record ); + assertEquals( 24L, record.get( 0 ).asLong() ); + + assertEquals( 4, work.invocationCount() ); + assertEquals( 1, countNodesByLabel( "Node" ) ); + } + + @Test + public void shouldRunAsyncTransactionWithRetriesOnSyncFailures() + { + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Test) RETURN 12" ).withSyncFailures( + new TransientException( "Oh!", "Deadlock!" ), + new ServiceUnavailableException( "Oh! Network Failure" ) ); + + Response txResponse = session.writeTransactionAsync( work ); + + Record record = await( txResponse ); + assertNotNull( record ); + assertEquals( 12L, record.get( 0 ).asLong() ); + + assertEquals( 3, work.invocationCount() ); + assertEquals( 1, countNodesByLabel( "Test" ) ); + } + + @Test + public void shouldRunAsyncTransactionThatCanNotBeRetried() + { + InvocationTrackingWork work = new InvocationTrackingWork( "UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x" ); + Response txResponse = session.writeTransactionAsync( work ); + + try + { + await( txResponse ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + } + + assertEquals( 1, work.invocationCount() ); + assertEquals( 0, countNodesByLabel( "Hi" ) ); + } + + @Test + public void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() + { + // first throw TransientException directly from work, retry can happen afterwards + // then return a future failed with DatabaseException, retry can't happen afterwards + InvocationTrackingWork work = new InvocationTrackingWork( "CREATE (:Person) RETURN 1" ) + .withSyncFailures( new TransientException( "Oh!", "Deadlock!" ) ) + .withAsyncFailures( new DatabaseException( "Oh!", "OutOfMemory!" ) ); + Response txResponse = session.writeTransactionAsync( work ); + + try + { + await( txResponse ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( DatabaseException.class ) ); + assertEquals( 1, e.getSuppressed().length ); + assertThat( e.getSuppressed()[0], instanceOf( TransientException.class ) ); + } + + assertEquals( 2, work.invocationCount() ); + assertEquals( 0, countNodesByLabel( "Person" ) ); + } + + @Test + public void shouldPeekRecordFromCursor() + { + StatementResultCursor cursor = await( session.runAsync( "UNWIND [1, 2, 42] AS x RETURN x" ) ); + + assertEquals( 1, await( cursor.peekAsync() ).get( 0 ).asInt() ); + assertEquals( 1, await( cursor.peekAsync() ).get( 0 ).asInt() ); + assertEquals( 1, await( cursor.peekAsync() ).get( 0 ).asInt() ); + + assertEquals( 1, await( cursor.nextAsync() ).get( 0 ).asInt() ); + + assertEquals( 2, await( cursor.peekAsync() ).get( 0 ).asInt() ); + assertEquals( 2, await( cursor.peekAsync() ).get( 0 ).asInt() ); + + assertEquals( 2, await( cursor.nextAsync() ).get( 0 ).asInt() ); + + assertEquals( 42, await( cursor.nextAsync() ).get( 0 ).asInt() ); + + assertNull( await( cursor.peekAsync() ) ); + assertNull( await( cursor.nextAsync() ) ); + } + + @Test + public void shouldForEachWithEmptyCursor() + { + testForEach( "CREATE ()", 0 ); + } + + @Test + public void shouldForEachWithNonEmptyCursor() + { + testForEach( "UNWIND range(1, 10000) AS x RETURN x", 10000 ); + } + + @Test + public void shouldConvertToListWithEmptyCursor() + { + testList( "MATCH (n:NoSuchLabel) RETURN n", Collections.emptyList() ); + } + + @Test + public void shouldConvertToListWithNonEmptyCursor() + { + testList( "UNWIND range(1, 100, 10) AS x RETURN x", + Arrays.asList( 1L, 11L, 21L, 31L, 41L, 51L, 61L, 71L, 81L, 91L ) ); + } + + private Future>> runNestedQueries( StatementResultCursor inputCursor ) + { + Promise>> resultPromise = GlobalEventExecutor.INSTANCE.newPromise(); + runNestedQueries( inputCursor, new ArrayList>(), resultPromise ); return resultPromise; } - private void runNestedQueries( final StatementResultCursor inputCursor, final List> futures, - final Promise>> resultPromise ) + private void runNestedQueries( final StatementResultCursor inputCursor, final List> futures, + final Promise>> resultPromise ) { - final Response inputAvailable = inputCursor.fetchAsync(); - futures.add( inputAvailable ); + final Response recordResponse = inputCursor.nextAsync(); + futures.add( recordResponse ); - inputAvailable.addListener( new ResponseListener() + recordResponse.addListener( new ResponseListener() { @Override - public void operationCompleted( Boolean inputAvailable, Throwable error ) + public void operationCompleted( Record record, Throwable error ) { if ( error != null ) { resultPromise.setFailure( error ); } - else if ( inputAvailable ) + else if ( record != null ) { - runNestedQuery( inputCursor, futures, resultPromise ); + runNestedQuery( inputCursor, record, futures, resultPromise ); } else { @@ -382,10 +538,9 @@ else if ( inputAvailable ) } ); } - private void runNestedQuery( final StatementResultCursor inputCursor, final List> futures, - final Promise>> resultPromise ) + private void runNestedQuery( final StatementResultCursor inputCursor, Record record, + final List> futures, final Promise>> resultPromise ) { - Record record = inputCursor.current(); Node node = record.get( 0 ).asNode(); long id = node.get( "id" ).asLong(); long age = id * 10; @@ -397,7 +552,7 @@ private void runNestedQuery( final StatementResultCursor inputCursor, final List response.addListener( new ResponseListener() { @Override - public void operationCompleted( StatementResultCursor result, Throwable error ) + public void operationCompleted( StatementResultCursor cursor, Throwable error ) { if ( error != null ) { @@ -405,13 +560,48 @@ public void operationCompleted( StatementResultCursor result, Throwable error ) } else { - futures.add( result.fetchAsync() ); + futures.add( cursor.nextAsync() ); runNestedQueries( inputCursor, futures, resultPromise ); } } } ); } + private long countNodesByLabel( String label ) + { + return session.run( "MATCH (n:" + label + ") RETURN count(n)" ).single().get( 0 ).asLong(); + } + + private void testForEach( String query, int expectedSeenRecords ) + { + StatementResultCursor cursor = await( session.runAsync( query ) ); + + final AtomicInteger recordsSeen = new AtomicInteger(); + Response forEachDone = cursor.forEachAsync( new Consumer() + { + @Override + public void accept( Record record ) + { + recordsSeen.incrementAndGet(); + } + } ); + + assertNull( await( forEachDone ) ); + assertEquals( expectedSeenRecords, recordsSeen.get() ); + } + + private void testList( String query, List expectedList ) + { + StatementResultCursor cursor = await( session.runAsync( query ) ); + List records = await( cursor.listAsync() ); + List actualList = new ArrayList<>(); + for ( Record record : records ) + { + actualList.add( record.get( 0 ).asObject() ); + } + assertEquals( expectedList, actualList ); + } + private static void assertSyntaxError( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); @@ -425,7 +615,7 @@ private static void assertArithmeticError( Exception e ) assertThat( ((ClientException) e).code(), containsString( "ArithmeticError" ) ); } - private static class KillDbListener implements ResponseListener + private static class KillDbListener implements ResponseListener { final TestNeo4j neo4j; volatile boolean shouldKillDb = true; @@ -436,7 +626,7 @@ private static class KillDbListener implements ResponseListener } @Override - public void operationCompleted( Boolean result, Throwable error ) + public void operationCompleted( Record record, Throwable error ) { if ( shouldKillDb ) { @@ -457,4 +647,104 @@ void killDb() } } } + + private static class InvocationTrackingWork implements TransactionWork> + { + final String query; + final AtomicInteger invocationCount; + + Iterator asyncFailures = emptyIterator(); + Iterator syncFailures = emptyIterator(); + + InvocationTrackingWork( String query ) + { + this.query = query; + this.invocationCount = new AtomicInteger(); + } + + InvocationTrackingWork withAsyncFailures( RuntimeException... failures ) + { + asyncFailures = Arrays.asList( failures ).iterator(); + return this; + } + + InvocationTrackingWork withSyncFailures( RuntimeException... failures ) + { + syncFailures = Arrays.asList( failures ).iterator(); + return this; + } + + int invocationCount() + { + return invocationCount.get(); + } + + @Override + public Response execute( Transaction tx ) + { + invocationCount.incrementAndGet(); + + if ( syncFailures.hasNext() ) + { + throw syncFailures.next(); + } + + final InternalPromise resultPromise = new InternalPromise<>( GlobalEventExecutor.INSTANCE ); + + tx.runAsync( query ).addListener( new ResponseListener() + { + @Override + public void operationCompleted( final StatementResultCursor cursor, Throwable error ) + { + processQueryResult( cursor, error, resultPromise ); + } + } ); + + return resultPromise; + } + + private void processQueryResult( final StatementResultCursor cursor, final Throwable error, + final InternalPromise resultPromise ) + { + if ( error != null ) + { + resultPromise.setFailure( error ); + return; + } + + cursor.nextAsync().addListener( new ResponseListener() + { + @Override + public void operationCompleted( Record record, Throwable error ) + { + processFetchResult( record, error, resultPromise, cursor ); + } + } ); + } + + private void processFetchResult( Record record, Throwable error, + InternalPromise resultPromise, StatementResultCursor cursor ) + { + if ( error != null ) + { + resultPromise.setFailure( error ); + return; + } + + if ( record == null ) + { + resultPromise.setFailure( new AssertionError( "Record not available" ) ); + return; + } + + if ( asyncFailures.hasNext() ) + { + resultPromise.setFailure( asyncFailures.next() ); + } + else + { + resultPromise.setSuccess( record ); + } + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java index f21ebc8da9..e1a0740093 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/TransactionAsyncIT.java @@ -23,9 +23,15 @@ import org.junit.Rule; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.neo4j.driver.internal.util.Consumer; +import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Response; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -108,10 +114,13 @@ public void shouldBePossibleToRunSingleStatementAndCommit() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor = await( tx.runAsync( "CREATE (n:Node {id: 42}) RETURN n" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - Node node = cursor.current().get( 0 ).asNode(); + + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); + Node node = record.get( 0 ).asNode(); assertEquals( "Node", single( node.labels() ) ); assertEquals( 42, node.get( "id" ).asInt() ); + assertNull( await( cursor.nextAsync() ) ); assertNull( await( tx.commitAsync() ) ); assertEquals( 1, countNodes( 42 ) ); @@ -123,10 +132,12 @@ public void shouldBePossibleToRunSingleStatementAndRollback() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor = await( tx.runAsync( "CREATE (n:Node {id: 4242}) RETURN n" ) ); - assertThat( await( cursor.fetchAsync() ), is( true ) ); - Node node = cursor.current().get( 0 ).asNode(); + Record record = await( cursor.nextAsync() ); + assertNotNull( record ); + Node node = record.get( 0 ).asNode(); assertEquals( "Node", single( node.labels() ) ); assertEquals( 4242, node.get( "id" ).asInt() ); + assertNull( await( cursor.nextAsync() ) ); assertNull( await( tx.rollbackAsync() ) ); assertEquals( 0, countNodes( 4242 ) ); @@ -138,13 +149,13 @@ public void shouldBePossibleToRunMultipleStatementsAndCommit() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "CREATE (n:Node {id: 1})" ) ); - assertThat( await( cursor1.fetchAsync() ), is( false ) ); + assertNull( await( cursor1.nextAsync() ) ); StatementResultCursor cursor2 = await( tx.runAsync( "CREATE (n:Node {id: 2})" ) ); - assertThat( await( cursor2.fetchAsync() ), is( false ) ); + assertNull( await( cursor2.nextAsync() ) ); StatementResultCursor cursor3 = await( tx.runAsync( "CREATE (n:Node {id: 2})" ) ); - assertThat( await( cursor3.fetchAsync() ), is( false ) ); + assertNull( await( cursor3.nextAsync() ) ); assertNull( await( tx.commitAsync() ) ); assertEquals( 1, countNodes( 1 ) ); @@ -171,10 +182,10 @@ public void shouldBePossibleToRunMultipleStatementsAndRollback() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "CREATE (n:Node {id: 1})" ) ); - assertThat( await( cursor1.fetchAsync() ), is( false ) ); + assertNull( await( cursor1.nextAsync() ) ); StatementResultCursor cursor2 = await( tx.runAsync( "CREATE (n:Node {id: 42})" ) ); - assertThat( await( cursor2.fetchAsync() ), is( false ) ); + assertNull( await( cursor2.nextAsync() ) ); assertNull( await( tx.rollbackAsync() ) ); assertEquals( 0, countNodes( 1 ) ); @@ -244,12 +255,14 @@ public void shouldFailToCommitAfterCoupleCorrectAndSingleWrongStatement() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "CREATE (n:Node) RETURN n" ) ); - assertThat( await( cursor1.fetchAsync() ), is( true ) ); - assertTrue( cursor1.current().get( 0 ).asNode().hasLabel( "Node" ) ); + Record record1 = await( cursor1.nextAsync() ); + assertNotNull( record1 ); + assertTrue( record1.get( 0 ).asNode().hasLabel( "Node" ) ); StatementResultCursor cursor2 = await( tx.runAsync( "RETURN 42" ) ); - assertThat( await( cursor2.fetchAsync() ), is( true ) ); - assertEquals( 42, cursor2.current().get( 0 ).asInt() ); + Record record2 = await( cursor2.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 42, record2.get( 0 ).asInt() ); try { @@ -278,12 +291,14 @@ public void shouldAllowRollbackAfterCoupleCorrectAndSingleWrongStatement() Transaction tx = await( session.beginTransactionAsync() ); StatementResultCursor cursor1 = await( tx.runAsync( "RETURN 4242" ) ); - assertThat( await( cursor1.fetchAsync() ), is( true ) ); - assertEquals( 4242, cursor1.current().get( 0 ).asInt() ); + Record record1 = await( cursor1.nextAsync() ); + assertNotNull( record1 ); + assertEquals( 4242, record1.get( 0 ).asInt() ); StatementResultCursor cursor2 = await( tx.runAsync( "CREATE (n:Node) DELETE n RETURN 42" ) ); - assertThat( await( cursor2.fetchAsync() ), is( true ) ); - assertEquals( 42, cursor2.current().get( 0 ).asInt() ); + Record record2 = await( cursor2.nextAsync() ); + assertNotNull( record2 ); + assertEquals( 42, record2.get( 0 ).asInt() ); try { @@ -510,12 +525,92 @@ public void shouldExposeResultSummaryForProfileQuery() assertThat( summary.resultConsumedAfter( TimeUnit.MILLISECONDS ), greaterThanOrEqualTo( 0L ) ); } + @Test + public void shouldPeekRecordFromCursor() + { + Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( "UNWIND ['a', 'b', 'c'] AS x RETURN x" ) ); + + assertEquals( "a", await( cursor.peekAsync() ).get( 0 ).asString() ); + assertEquals( "a", await( cursor.peekAsync() ).get( 0 ).asString() ); + + assertEquals( "a", await( cursor.nextAsync() ).get( 0 ).asString() ); + + assertEquals( "b", await( cursor.peekAsync() ).get( 0 ).asString() ); + assertEquals( "b", await( cursor.peekAsync() ).get( 0 ).asString() ); + assertEquals( "b", await( cursor.peekAsync() ).get( 0 ).asString() ); + + assertEquals( "b", await( cursor.nextAsync() ).get( 0 ).asString() ); + assertEquals( "c", await( cursor.nextAsync() ).get( 0 ).asString() ); + + assertNull( await( cursor.peekAsync() ) ); + assertNull( await( cursor.nextAsync() ) ); + + await( tx.rollbackAsync() ); + } + + @Test + public void shouldForEachWithEmptyCursor() + { + testForEach( "MATCH (n:SomeReallyStrangeLabel) RETURN n", 0 ); + } + + @Test + public void shouldForEachWithNonEmptyCursor() + { + testForEach( "UNWIND range(1, 12555) AS x CREATE (n:Node {id: x}) RETURN n", 12555 ); + } + + @Test + public void shouldConvertToListWithEmptyCursor() + { + testList( "CREATE (:Person)-[:KNOWS]->(:Person)", Collections.emptyList() ); + } + + @Test + public void shouldConvertToListWithNonEmptyCursor() + { + testList( "UNWIND [1, '1', 2, '2', 3, '3'] AS x RETURN x", Arrays.asList( 1L, "1", 2L, "2", 3L, "3" ) ); + } + private int countNodes( Object id ) { StatementResult result = session.run( "MATCH (n:Node {id: $id}) RETURN count(n)", parameters( "id", id ) ); return result.single().get( 0 ).asInt(); } + private void testForEach( String query, int expectedSeenRecords ) + { + Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( query ) ); + + final AtomicInteger recordsSeen = new AtomicInteger(); + Response forEachDone = cursor.forEachAsync( new Consumer() + { + @Override + public void accept( Record record ) + { + recordsSeen.incrementAndGet(); + } + } ); + + assertNull( await( forEachDone ) ); + assertEquals( expectedSeenRecords, recordsSeen.get() ); + } + + private void testList( String query, List expectedList ) + { + Transaction tx = await( session.beginTransactionAsync() ); + StatementResultCursor cursor = await( tx.runAsync( query ) ); + List records = await( cursor.listAsync() ); + List actualList = new ArrayList<>(); + for ( Record record : records ) + { + actualList.add( record.get( 0 ).asObject() ); + } + assertEquals( expectedList, actualList ); + } + private static void assertSyntaxError( Exception e ) { assertThat( e, instanceOf( ClientException.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java index 429c5f62e8..e171f7d937 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestNeo4jSession.java @@ -134,12 +134,24 @@ public T readTransaction( TransactionWork work ) return realSession.readTransaction( work ); } + @Override + public Response readTransactionAsync( TransactionWork> work ) + { + return realSession.readTransactionAsync( work ); + } + @Override public T writeTransaction( TransactionWork work ) { return realSession.writeTransaction( work ); } + @Override + public Response writeTransactionAsync( TransactionWork> work ) + { + return realSession.writeTransactionAsync( work ); + } + @Override public String lastBookmark() { diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java index 91c4701673..97b244778e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/TestUtil.java @@ -68,13 +68,7 @@ public static > T await( F future ) } catch ( ExecutionException e ) { - Throwable cause = e.getCause(); - StackTraceElement[] originalStackTrace = cause.getStackTrace(); - RuntimeException exceptionWithOriginalStackTrace = new RuntimeException(); - cause.setStackTrace( exceptionWithOriginalStackTrace.getStackTrace() ); - exceptionWithOriginalStackTrace.setStackTrace( originalStackTrace ); - cause.addSuppressed( exceptionWithOriginalStackTrace ); - throwException( cause ); + throwException( e.getCause() ); return null; } catch ( TimeoutException e )