acquireAsyncConnection( AccessMode mode )
+ {
+ return asyncPool.acquire( address );
+ }
+
@Override
public void close() throws Exception
{
pool.close();
+ asyncPool.closeAsync().syncUninterruptibly();
}
public BoltServerAddress getAddress()
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index 746fc4ed95..45ad5e0b8a 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -18,10 +18,17 @@
*/
package org.neo4j.driver.internal;
+import io.netty.bootstrap.Bootstrap;
+
import java.io.IOException;
import java.net.URI;
import java.security.GeneralSecurityException;
+import org.neo4j.driver.internal.async.AsyncConnectorImpl;
+import org.neo4j.driver.internal.async.BootstrapFactory;
+import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
+import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
+import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
@@ -59,15 +66,20 @@ public class DriverFactory
public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings routingSettings,
RetrySettings retrySettings, Config config )
{
+ authToken = authToken == null ? AuthTokens.none() : authToken;
+
BoltServerAddress address = new BoltServerAddress( uri );
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );
+ AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, config );
+
try
{
- return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic );
+ return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
+ asyncConnectionPool );
}
catch ( Throwable driverError )
{
@@ -75,6 +87,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
try
{
connectionPool.close();
+ asyncConnectionPool.closeAsync().syncUninterruptibly();
}
catch ( Throwable closeError )
{
@@ -84,16 +97,33 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}
}
+ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
+ Config config )
+ {
+ Clock clock = createClock();
+ ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
+ ActiveChannelTracker activeChannelTracker = new ActiveChannelTracker( config.logging() );
+ AsyncConnectorImpl connector = new AsyncConnectorImpl( connectionSettings, securityPlan,
+ activeChannelTracker, config.logging(), clock );
+ Bootstrap bootstrap = BootstrapFactory.newBootstrap();
+ PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
+ config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
+ config.maxConnectionPoolSize(),
+ config.connectionAcquisitionTimeoutMillis() );
+ return new AsyncConnectionPoolImpl( connector, bootstrap, activeChannelTracker, poolSettings, config.logging(),
+ clock );
+ }
+
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
- RetryLogic retryLogic )
+ RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
- return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic );
+ return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
default:
@@ -107,9 +137,10 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* This method is protected only for testing
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
- SecurityPlan securityPlan, RetryLogic retryLogic )
+ SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
{
- ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
+ ConnectionProvider connectionProvider =
+ new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}
@@ -173,11 +204,10 @@ private static LoadBalancingStrategy createLoadBalancingStrategy( Config config,
*/
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
{
- authToken = authToken == null ? AuthTokens.none() : authToken;
-
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
- config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetime() );
+ config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
+ config.maxConnectionPoolSize(), config.connectionAcquisitionTimeoutMillis() );
Connector connector = createConnector( connectionSettings, securityPlan, config.logging() );
return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() );
@@ -198,7 +228,7 @@ protected Clock createClock()
*
* This method is protected only for testing
*/
- protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
+ protected Connector createConnector( final ConnectionSettings connectionSettings, SecurityPlan securityPlan,
Logging logging )
{
return new SocketConnector( connectionSettings, securityPlan, logging );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
index 4701ec2314..8d0ff5a364 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
@@ -21,24 +21,41 @@
import java.util.Collections;
import java.util.Map;
-import org.neo4j.driver.internal.spi.Collector;
+import org.neo4j.driver.ResultResourcesHandler;
+import org.neo4j.driver.internal.async.AsyncConnection;
+import org.neo4j.driver.internal.async.InternalFuture;
+import org.neo4j.driver.internal.async.InternalPromise;
+import org.neo4j.driver.internal.async.QueryRunner;
+import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
+import org.neo4j.driver.internal.handlers.BookmarkResponseHandler;
+import org.neo4j.driver.internal.handlers.CommitTxResponseHandler;
+import org.neo4j.driver.internal.handlers.NoOpResponseHandler;
+import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.v1.Record;
+import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.StatementResultCursor;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.types.TypeSystem;
+import org.neo4j.driver.v1.util.Function;
+import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
import static org.neo4j.driver.v1.Values.ofValue;
import static org.neo4j.driver.v1.Values.value;
-public class ExplicitTransaction implements Transaction
+public class ExplicitTransaction implements Transaction, ResultResourcesHandler
{
+ private static final String BEGIN_QUERY = "BEGIN";
+ private static final String COMMIT_QUERY = "COMMIT";
+ private static final String ROLLBACK_QUERY = "ROLLBACK";
+
private enum State
{
/** The transaction is running with no explicit success or failure marked */
@@ -57,28 +74,68 @@ private enum State
FAILED,
/** This transaction has successfully committed */
- SUCCEEDED,
+ COMMITTED,
/** This transaction has been rolled back */
ROLLED_BACK
}
private final SessionResourcesHandler resourcesHandler;
- private final Connection conn;
+ private final Connection connection;
+ private final AsyncConnection asyncConnection;
+ private final NetworkSession session;
- private Bookmark bookmark = Bookmark.empty();
- private State state = State.ACTIVE;
+ private volatile Bookmark bookmark = Bookmark.empty();
+ private volatile State state = State.ACTIVE;
- public ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler )
+ public ExplicitTransaction( Connection connection, SessionResourcesHandler resourcesHandler )
{
- this( conn, resourcesHandler, Bookmark.empty() );
+ this.connection = connection;
+ this.asyncConnection = null;
+ this.session = null;
+ this.resourcesHandler = resourcesHandler;
}
- ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler, Bookmark initialBookmark )
+ public ExplicitTransaction( AsyncConnection asyncConnection, NetworkSession session )
{
- this.conn = conn;
- this.resourcesHandler = resourcesHandler;
- runBeginStatement( conn, initialBookmark );
+ this.connection = null;
+ this.asyncConnection = asyncConnection;
+ this.session = session;
+ this.resourcesHandler = SessionResourcesHandler.NO_OP;
+ }
+
+ public void begin( Bookmark initialBookmark )
+ {
+ Map parameters = initialBookmark.asBeginTransactionParameters();
+
+ connection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );
+ connection.pullAll( NoOpResponseHandler.INSTANCE );
+
+ if ( !initialBookmark.isEmpty() )
+ {
+ connection.sync();
+ }
+ }
+
+ public InternalFuture beginAsync( Bookmark initialBookmark )
+ {
+ InternalPromise beginTxPromise = asyncConnection.newPromise();
+
+ Map parameters = initialBookmark.asBeginTransactionParameters();
+ asyncConnection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );
+
+ if ( initialBookmark.isEmpty() )
+ {
+ asyncConnection.pullAll( NoOpResponseHandler.INSTANCE );
+ beginTxPromise.setSuccess( this );
+ }
+ else
+ {
+ asyncConnection.pullAll( new BeginTxResponseHandler<>( beginTxPromise, this ) );
+ asyncConnection.flush();
+ }
+
+ return beginTxPromise;
}
@Override
@@ -104,25 +161,26 @@ public void close()
{
try
{
- if ( conn != null && conn.isOpen() )
+ if ( connection != null && connection.isOpen() )
{
if ( state == State.MARKED_SUCCESS )
{
try
{
- conn.run( "COMMIT", Collections.emptyMap(), Collector.NO_OP );
- conn.pullAll( new BookmarkCollector( this ) );
- conn.sync();
- state = State.SUCCEEDED;
+ connection.run( COMMIT_QUERY, Collections.emptyMap(),
+ NoOpResponseHandler.INSTANCE );
+ connection.pullAll( new BookmarkResponseHandler( this ) );
+ connection.sync();
+ state = State.COMMITTED;
}
- catch( Throwable e )
+ catch ( Throwable e )
{
// failed to commit
try
{
rollbackTx();
}
- catch( Throwable ignored )
+ catch ( Throwable ignored )
{
// best effort.
}
@@ -149,32 +207,146 @@ else if ( state == State.FAILED )
private void rollbackTx()
{
- conn.run( "ROLLBACK", Collections.emptyMap(), Collector.NO_OP );
- conn.pullAll( new BookmarkCollector( this ) );
- conn.sync();
+ connection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
+ connection.pullAll( new BookmarkResponseHandler( this ) );
+ connection.sync();
state = State.ROLLED_BACK;
}
@Override
- @SuppressWarnings( "unchecked" )
+ public Response commitAsync()
+ {
+ return internalCommitAsync();
+ }
+
+ private InternalFuture internalCommitAsync()
+ {
+ if ( state == State.COMMITTED )
+ {
+ return asyncConnection.newPromise().setSuccess( null );
+ }
+ else if ( state == State.ROLLED_BACK )
+ {
+ return asyncConnection.newPromise().setFailure(
+ new ClientException( "Can't commit, transaction has already been rolled back" ) );
+ }
+ else
+ {
+ return doCommitAsync().whenComplete( releaseConnectionAndNotifySession() );
+ }
+ }
+
+ @Override
+ public Response rollbackAsync()
+ {
+ return internalRollbackAsync();
+ }
+
+ InternalFuture internalRollbackAsync()
+ {
+ if ( state == State.COMMITTED )
+ {
+ return asyncConnection.newPromise()
+ .setFailure( new ClientException( "Can't rollback, transaction has already been committed" ) );
+ }
+ else if ( state == State.ROLLED_BACK )
+ {
+ return asyncConnection.newPromise().setSuccess( null );
+ }
+ else
+ {
+ return doRollbackAsync().whenComplete( releaseConnectionAndNotifySession() );
+ }
+ }
+
+ private Runnable releaseConnectionAndNotifySession()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ asyncConnection.release();
+ session.asyncTransactionClosed( ExplicitTransaction.this );
+ }
+ };
+ }
+
+ private InternalFuture doCommitAsync()
+ {
+ InternalPromise commitTxPromise = asyncConnection.newPromise();
+
+ asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
+ asyncConnection.pullAll( new CommitTxResponseHandler( commitTxPromise, this ) );
+ asyncConnection.flush();
+
+ return commitTxPromise.thenApply( new Function()
+ {
+ @Override
+ public Void apply( Void ignore )
+ {
+ ExplicitTransaction.this.state = State.COMMITTED;
+ return null;
+ }
+ } );
+ }
+
+ private InternalFuture doRollbackAsync()
+ {
+ InternalPromise rollbackTxPromise = asyncConnection.newPromise();
+ asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
+ asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackTxPromise ) );
+ asyncConnection.flush();
+
+ return rollbackTxPromise.thenApply( new Function()
+ {
+ @Override
+ public Void apply( Void ignore )
+ {
+ ExplicitTransaction.this.state = State.ROLLED_BACK;
+ return null;
+ }
+ } );
+ }
+
+ @Override
public StatementResult run( String statementText, Value statementParameters )
{
return run( new Statement( statementText, statementParameters ) );
}
+ @Override
+ public Response runAsync( String statementText, Value parameters )
+ {
+ return runAsync( new Statement( statementText, parameters ) );
+ }
+
@Override
public StatementResult run( String statementText )
{
return run( statementText, Values.EmptyMap );
}
+ @Override
+ public Response runAsync( String statementTemplate )
+ {
+ return runAsync( statementTemplate, Values.EmptyMap );
+ }
+
@Override
public StatementResult run( String statementText, Map statementParameters )
{
- Value params = statementParameters == null ? Values.EmptyMap : value(statementParameters);
+ Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
return run( statementText, params );
}
+ @Override
+ public Response runAsync( String statementTemplate, Map statementParameters )
+ {
+ Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
+ return runAsync( statementTemplate, params );
+ }
+
@Override
public StatementResult run( String statementTemplate, Record statementParameters )
{
@@ -183,19 +355,26 @@ public StatementResult run( String statementTemplate, Record statementParameters
}
@Override
- public synchronized StatementResult run( Statement statement )
+ public Response runAsync( String statementTemplate, Record statementParameters )
+ {
+ Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
+ return runAsync( statementTemplate, params );
+ }
+
+ @Override
+ public StatementResult run( Statement statement )
{
ensureNotFailed();
try
{
InternalStatementResult result =
- new InternalStatementResult( conn, SessionResourcesHandler.NO_OP, this, statement );
- conn.run( statement.text(),
+ new InternalStatementResult( statement, connection, ResultResourcesHandler.NO_OP );
+ connection.run( statement.text(),
statement.parameters().asMap( ofValue() ),
- result.runResponseCollector() );
- conn.pullAll( result.pullAllResponseCollector() );
- conn.flush();
+ result.runResponseHandler() );
+ connection.pullAll( result.pullAllResponseHandler() );
+ connection.flush();
return result;
}
catch ( Neo4jException e )
@@ -207,10 +386,17 @@ public synchronized StatementResult run( Statement statement )
}
}
+ @Override
+ public Response runAsync( Statement statement )
+ {
+ ensureNotFailed();
+ return QueryRunner.runAsync( asyncConnection, statement, this );
+ }
+
@Override
public boolean isOpen()
{
- return state != State.SUCCEEDED && state != State.ROLLED_BACK;
+ return state != State.COMMITTED && state != State.ROLLED_BACK;
}
private void ensureNotFailed()
@@ -218,9 +404,9 @@ private void ensureNotFailed()
if ( state == State.FAILED || state == State.MARKED_FAILED || state == State.ROLLED_BACK )
{
throw new ClientException(
- "Cannot run more statements in this transaction, because previous statements in the " +
- "transaction has failed and the transaction has been rolled back. Please start a new" +
- " transaction to run another statement."
+ "Cannot run more statements in this transaction, because previous statements in the " +
+ "transaction has failed and the transaction has been rolled back. Please start a new " +
+ "transaction to run another statement."
);
}
}
@@ -231,7 +417,27 @@ public TypeSystem typeSystem()
return InternalTypeSystem.TYPE_SYSTEM;
}
- public synchronized void markToClose()
+ @Override
+ public void resultFetched()
+ {
+ // no resources to release when result is fully fetched
+ }
+
+ @Override
+ public void resultFailed( Throwable error )
+ {
+ // RUN failed, this transaction should not commit
+ if ( isRecoverable( error ) )
+ {
+ failure();
+ }
+ else
+ {
+ markToClose();
+ }
+ }
+
+ public void markToClose()
{
state = State.FAILED;
}
@@ -241,25 +447,11 @@ public Bookmark bookmark()
return bookmark;
}
- void setBookmark( Bookmark bookmark )
+ public void setBookmark( Bookmark bookmark )
{
if ( bookmark != null && !bookmark.isEmpty() )
{
this.bookmark = bookmark;
}
}
-
- private static void runBeginStatement( Connection connection, Bookmark bookmark )
- {
- Bookmark initialBookmark = bookmark == null ? Bookmark.empty() : bookmark;
- Map parameters = initialBookmark.asBeginTransactionParameters();
-
- connection.run( "BEGIN", parameters, Collector.NO_OP );
- connection.pullAll( Collector.NO_OP );
-
- if ( !initialBookmark.isEmpty() )
- {
- connection.sync();
- }
- }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
index 23f2c785d5..21833b86fd 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java
@@ -19,27 +19,20 @@
package org.neo4j.driver.internal;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
-import org.neo4j.driver.internal.spi.Collector;
+import org.neo4j.driver.ResultResourcesHandler;
+import org.neo4j.driver.internal.handlers.RecordsResponseHandler;
+import org.neo4j.driver.internal.handlers.RunResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
-import org.neo4j.driver.internal.summary.SummaryBuilder;
+import org.neo4j.driver.internal.spi.ResponseHandler;
+import org.neo4j.driver.internal.summary.InternalResultSummary;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
-import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.NoSuchRecordException;
-import org.neo4j.driver.v1.summary.Notification;
-import org.neo4j.driver.v1.summary.Plan;
-import org.neo4j.driver.v1.summary.ProfiledPlan;
import org.neo4j.driver.v1.summary.ResultSummary;
-import org.neo4j.driver.v1.summary.ServerInfo;
-import org.neo4j.driver.v1.summary.StatementType;
-import org.neo4j.driver.v1.summary.SummaryCounters;
import org.neo4j.driver.v1.util.Function;
import org.neo4j.driver.v1.util.Functions;
@@ -47,144 +40,39 @@
public class InternalStatementResult implements StatementResult
{
+ private final Statement statement;
private final Connection connection;
- private final SessionResourcesHandler resourcesHandler;
- private final Collector runResponseCollector;
- private final Collector pullAllResponseCollector;
- private final Queue recordBuffer = new LinkedList<>();
+ private final ResultResourcesHandler resourcesHandler;
+ private final RunResponseHandler runResponseHandler;
+ private final RecordsResponseHandler pullAllResponseHandler;
- private List keys = null;
- private ResultSummary summary = null;
-
- private boolean done = false;
-
- InternalStatementResult( Connection connection, SessionResourcesHandler resourcesHandler,
- ExplicitTransaction transaction, Statement statement )
+ InternalStatementResult( Statement statement, Connection connection, ResultResourcesHandler resourcesHandler )
{
+ this.statement = statement;
this.connection = connection;
- this.runResponseCollector = newRunResponseCollector();
- this.pullAllResponseCollector = newStreamResponseCollector( transaction, statement, connection.server() );
+ this.runResponseHandler = new RunResponseHandler( null );
+ this.pullAllResponseHandler = new RecordsResponseHandler( runResponseHandler );
this.resourcesHandler = resourcesHandler;
}
- private Collector newRunResponseCollector()
+ ResponseHandler runResponseHandler()
{
- return new Collector.NoOperationCollector()
- {
- @Override
- public void keys( String[] names )
- {
- keys = Arrays.asList( names );
- }
-
- @Override
- public void done()
- {
- if ( keys == null )
- {
- keys = new ArrayList<>();
- }
- }
-
- @Override
- public void resultAvailableAfter( long l )
- {
- pullAllResponseCollector.resultAvailableAfter( l );
- }
- };
+ return runResponseHandler;
}
- private Collector newStreamResponseCollector( final ExplicitTransaction transaction, final Statement statement,
- final ServerInfo serverInfo )
+ ResponseHandler pullAllResponseHandler()
{
- final SummaryBuilder summaryBuilder = new SummaryBuilder( statement, serverInfo );
-
- return new Collector.NoOperationCollector()
- {
- @Override
- public void record( Value[] fields )
- {
- recordBuffer.add( new InternalRecord( keys, fields ) );
- }
-
- @Override
- public void statementType( StatementType type )
- {
- summaryBuilder.statementType( type );
- }
-
- @Override
- public void statementStatistics( SummaryCounters statistics )
- {
- summaryBuilder.statementStatistics( statistics );
- }
-
- @Override
- public void plan( Plan plan )
- {
- summaryBuilder.plan( plan );
- }
-
- @Override
- public void profile( ProfiledPlan plan )
- {
- summaryBuilder.profile( plan );
- }
-
- @Override
- public void notifications( List notifications )
- {
- summaryBuilder.notifications( notifications );
- }
-
- @Override
- public void bookmark( Bookmark bookmark )
- {
- if ( transaction != null )
- {
- transaction.setBookmark( bookmark );
- }
- }
-
- @Override
- public void done()
- {
- summary = summaryBuilder.build();
- done = true;
- }
-
- @Override
- public void resultAvailableAfter(long l)
- {
- summaryBuilder.resultAvailableAfter( l );
- }
-
- @Override
- public void resultConsumedAfter(long l)
- {
- summaryBuilder.resultConsumedAfter( l );
- }
- };
- }
-
- Collector runResponseCollector()
- {
- return runResponseCollector;
- }
-
- Collector pullAllResponseCollector()
- {
- return pullAllResponseCollector;
+ return pullAllResponseHandler;
}
@Override
public List keys()
{
- if ( keys == null )
+ if ( runResponseHandler.statementKeys() == null )
{
tryFetchNext();
}
- return keys;
+ return runResponseHandler.statementKeys();
}
@Override
@@ -196,15 +84,9 @@ public boolean hasNext()
@Override
public Record next()
{
- // Implementation note:
- // We've chosen to use Iterator over a cursor-based version,
- // after tests show escape analysis will eliminate short-lived allocations
- // in a way that makes the two equivalent in performance.
- // To get the intended benefit, we need to allocate Record in this method,
- // and have it copy out its fields from some lower level data structure.
if ( tryFetchNext() )
{
- return recordBuffer.poll();
+ return pullAllResponseHandler.recordBuffer().poll();
}
else
{
@@ -242,7 +124,7 @@ public Record peek()
{
if ( tryFetchNext() )
{
- return recordBuffer.peek();
+ return pullAllResponseHandler.recordBuffer().peek();
}
else
{
@@ -280,32 +162,32 @@ public List list( Function mapFunction )
@Override
public ResultSummary consume()
{
- if ( done )
+ if ( pullAllResponseHandler.isCompleted() )
{
- recordBuffer.clear();
+ pullAllResponseHandler.recordBuffer().clear();
}
else
{
do
{
receiveOne();
- recordBuffer.clear();
+ pullAllResponseHandler.recordBuffer().clear();
}
- while ( !done );
+ while ( !pullAllResponseHandler.isCompleted() );
}
- return summary;
+ return createResultSummary();
}
@Override
public ResultSummary summary()
{
- while( !done )
+ while ( !pullAllResponseHandler.isCompleted() )
{
receiveOne();
}
- return summary;
+ return createResultSummary();
}
@Override
@@ -316,9 +198,9 @@ public void remove()
private boolean tryFetchNext()
{
- while ( recordBuffer.isEmpty() )
+ while ( pullAllResponseHandler.recordBuffer().isEmpty() )
{
- if ( done )
+ if ( pullAllResponseHandler.isCompleted() )
{
return false;
}
@@ -336,12 +218,27 @@ private void receiveOne()
}
catch ( Throwable error )
{
- resourcesHandler.onResultConsumed();
+ resourcesHandler.resultFailed( error );
throw error;
}
- if ( done )
+ if ( pullAllResponseHandler.isCompleted() )
{
- resourcesHandler.onResultConsumed();
+ resourcesHandler.resultFetched();
}
}
+
+ private ResultSummary createResultSummary()
+ {
+ return new InternalResultSummary(
+ statement,
+ connection.server(),
+ pullAllResponseHandler.statementType(),
+ pullAllResponseHandler.counters(),
+ pullAllResponseHandler.plan(),
+ pullAllResponseHandler.profile(),
+ pullAllResponseHandler.notifications(),
+ runResponseHandler.resultAvailableAfter(),
+ pullAllResponseHandler.resultConsumedAfter()
+ );
+ }
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
index 8f2c78fe7d..51881a2c2b 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
@@ -18,9 +18,16 @@
*/
package org.neo4j.driver.internal;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.neo4j.driver.ResultResourcesHandler;
+import org.neo4j.driver.internal.async.AsyncConnection;
+import org.neo4j.driver.internal.async.InternalFuture;
+import org.neo4j.driver.internal.async.InternalPromise;
+import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.logging.DelegatingLogger;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.Connection;
@@ -32,19 +39,22 @@
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Record;
+import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.StatementResultCursor;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.TransactionWork;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.types.TypeSystem;
+import org.neo4j.driver.v1.util.Function;
import static org.neo4j.driver.v1.Values.value;
-public class NetworkSession implements Session, SessionResourcesHandler
+public class NetworkSession implements Session, SessionResourcesHandler, ResultResourcesHandler
{
private static final String LOG_NAME = "Session";
@@ -53,9 +63,12 @@ public class NetworkSession implements Session, SessionResourcesHandler
private final RetryLogic retryLogic;
protected final Logger logger;
- private Bookmark bookmark = Bookmark.empty();
+ private volatile Bookmark bookmark = Bookmark.empty();
private PooledConnection currentConnection;
private ExplicitTransaction currentTransaction;
+ private volatile InternalFuture currentAsyncTransactionFuture;
+
+ private InternalFuture asyncConnectionFuture;
private final AtomicBoolean isOpen = new AtomicBoolean( true );
@@ -74,6 +87,12 @@ public StatementResult run( String statementText )
return run( statementText, Values.EmptyMap );
}
+ @Override
+ public Response runAsync( String statementText )
+ {
+ return runAsync( statementText, Values.EmptyMap );
+ }
+
@Override
public StatementResult run( String statementText, Map statementParameters )
{
@@ -81,6 +100,13 @@ public StatementResult run( String statementText, Map statementPa
return run( statementText, params );
}
+ @Override
+ public Response runAsync( String statementText, Map statementParameters )
+ {
+ Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
+ return runAsync( statementText, params );
+ }
+
@Override
public StatementResult run( String statementTemplate, Record statementParameters )
{
@@ -88,12 +114,25 @@ public StatementResult run( String statementTemplate, Record statementParameters
return run( statementTemplate, params );
}
+ @Override
+ public Response runAsync( String statementTemplate, Record statementParameters )
+ {
+ Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
+ return runAsync( statementTemplate, params );
+ }
+
@Override
public StatementResult run( String statementText, Value statementParameters )
{
return run( new Statement( statementText, statementParameters ) );
}
+ @Override
+ public Response runAsync( String statementText, Value parameters )
+ {
+ return runAsync( new Statement( statementText, parameters ) );
+ }
+
@Override
public StatementResult run( Statement statement )
{
@@ -106,13 +145,31 @@ public StatementResult run( Statement statement )
return run( currentConnection, statement, this );
}
+ @Override
+ public Response runAsync( final Statement statement )
+ {
+ ensureSessionIsOpen();
+ ensureNoOpenTransactionBeforeRunningSession();
+
+ InternalFuture connectionFuture = acquireAsyncConnection( mode );
+
+ return connectionFuture.thenCombine( new Function>()
+ {
+ @Override
+ public InternalFuture apply( AsyncConnection connection )
+ {
+ return QueryRunner.runAsync( connection, statement );
+ }
+ } );
+ }
+
public static StatementResult run( Connection connection, Statement statement,
- SessionResourcesHandler resourcesHandler )
+ ResultResourcesHandler resourcesHandler )
{
- InternalStatementResult result = new InternalStatementResult( connection, resourcesHandler, null, statement );
+ InternalStatementResult result = new InternalStatementResult( statement, connection, resourcesHandler );
connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ),
- result.runResponseCollector() );
- connection.pullAll( result.pullAllResponseCollector() );
+ result.runResponseHandler() );
+ connection.pullAll( result.pullAllResponseHandler() );
connection.flush();
return result;
}
@@ -167,6 +224,46 @@ public void close()
}
syncAndCloseCurrentConnection();
+
+ try
+ {
+ closeAsync().get();
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException( e );
+ }
+ }
+
+ @Override
+ public Response closeAsync()
+ {
+ if ( asyncConnectionFuture != null )
+ {
+ return asyncConnectionFuture.thenCombine( new Function>()
+ {
+ @Override
+ public InternalFuture apply( AsyncConnection connection )
+ {
+ return connection.forceRelease();
+ }
+ } );
+ }
+ else if ( currentAsyncTransactionFuture != null )
+ {
+ return currentAsyncTransactionFuture.thenCombine( new Function>()
+ {
+ @Override
+ public InternalFuture apply( ExplicitTransaction tx )
+ {
+ return tx.internalRollbackAsync();
+ }
+ } );
+ }
+ else
+ {
+ return new InternalPromise( GlobalEventExecutor.INSTANCE ).setSuccess( null );
+ }
}
@Override
@@ -183,6 +280,12 @@ public synchronized Transaction beginTransaction( String bookmark )
return beginTransaction();
}
+ @Override
+ public Response beginTransactionAsync()
+ {
+ return beginTransactionAsync( mode );
+ }
+
@Override
public T readTransaction( TransactionWork work )
{
@@ -221,6 +324,18 @@ public synchronized void onResultConsumed()
closeCurrentConnection();
}
+ @Override
+ public void resultFetched()
+ {
+ closeCurrentConnection();
+ }
+
+ @Override
+ public void resultFailed( Throwable error )
+ {
+ resultFetched();
+ }
+
@Override
public synchronized void onTransactionClosed( ExplicitTransaction tx )
{
@@ -232,6 +347,12 @@ public synchronized void onTransactionClosed( ExplicitTransaction tx )
}
}
+ public void asyncTransactionClosed( ExplicitTransaction tx )
+ {
+ setBookmark( tx.bookmark() );
+ currentAsyncTransactionFuture = null;
+ }
+
@Override
public synchronized void onConnectionError( boolean recoverable )
{
@@ -284,11 +405,35 @@ private synchronized Transaction beginTransaction( AccessMode mode )
syncAndCloseCurrentConnection();
currentConnection = acquireConnection( mode );
- currentTransaction = new ExplicitTransaction( currentConnection, this, bookmark);
+ ExplicitTransaction tx = new ExplicitTransaction( currentConnection, this );
+ tx.begin( bookmark );
+ currentTransaction = tx;
currentConnection.setResourcesHandler( this );
return currentTransaction;
}
+ private synchronized Response beginTransactionAsync( AccessMode mode )
+ {
+ ensureSessionIsOpen();
+ ensureNoOpenTransactionBeforeOpeningTransaction();
+
+ InternalFuture connectionFuture = acquireAsyncConnection( mode );
+
+ currentAsyncTransactionFuture = connectionFuture.thenCombine(
+ new Function>()
+ {
+ @Override
+ public InternalFuture apply( AsyncConnection connection )
+ {
+ ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
+ return tx.beginAsync( bookmark );
+ }
+ } );
+
+ //noinspection unchecked
+ return (Response) currentAsyncTransactionFuture;
+ }
+
private void ensureNoUnrecoverableError()
{
if ( currentConnection != null && currentConnection.hasUnrecoverableErrors() )
@@ -302,7 +447,7 @@ private void ensureNoUnrecoverableError()
//should be called from a synchronized block
private void ensureNoOpenTransactionBeforeRunningSession()
{
- if ( currentTransaction != null )
+ if ( currentTransaction != null || currentAsyncTransactionFuture != null )
{
throw new ClientException( "Statements cannot be run directly on a session with an open transaction;" +
" either run from within the transaction or use a different session." );
@@ -312,7 +457,7 @@ private void ensureNoOpenTransactionBeforeRunningSession()
//should be called from a synchronized block
private void ensureNoOpenTransactionBeforeOpeningTransaction()
{
- if ( currentTransaction != null )
+ if ( currentTransaction != null || currentAsyncTransactionFuture != null )
{
throw new ClientException( "You cannot begin a transaction on a session with an open transaction;" +
" either run from within the transaction or use a different session." );
@@ -339,6 +484,38 @@ private PooledConnection acquireConnection( AccessMode mode )
return connection;
}
+ private InternalFuture acquireAsyncConnection( final AccessMode mode )
+ {
+ if ( asyncConnectionFuture == null )
+ {
+ asyncConnectionFuture = connectionProvider.acquireAsyncConnection( mode );
+ }
+ else
+ {
+ // memorize in local so same instance is transformed and used in callbacks
+ final InternalFuture currentAsyncConnectionFuture = asyncConnectionFuture;
+
+ asyncConnectionFuture = currentAsyncConnectionFuture.thenCombine(
+ new Function>()
+ {
+ @Override
+ public InternalFuture apply( AsyncConnection connection )
+ {
+ if ( connection.tryMarkInUse() )
+ {
+ return currentAsyncConnectionFuture;
+ }
+ else
+ {
+ return connectionProvider.acquireAsyncConnection( mode );
+ }
+ }
+ } );
+ }
+
+ return asyncConnectionFuture;
+ }
+
boolean currentConnectionIsOpen()
{
return currentConnection != null && currentConnection.isOpen();
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java
new file mode 100644
index 0000000000..a5f75a59c0
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import java.util.Map;
+
+import org.neo4j.driver.internal.spi.ResponseHandler;
+import org.neo4j.driver.v1.Value;
+import org.neo4j.driver.v1.summary.ServerInfo;
+
+public interface AsyncConnection
+{
+ boolean tryMarkInUse();
+
+ void enableAutoRead();
+
+ void disableAutoRead();
+
+ void run( String statement, Map parameters, ResponseHandler handler );
+
+ void pullAll( ResponseHandler handler );
+
+ void flush();
+
+ InternalPromise newPromise();
+
+ void release();
+
+ InternalFuture forceRelease();
+
+ ServerInfo serverInfo();
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnector.java b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnector.java
new file mode 100644
index 0000000000..aed2d9dabe
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnector.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+
+import org.neo4j.driver.internal.net.BoltServerAddress;
+
+public interface AsyncConnector
+{
+ ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap );
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnectorImpl.java b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnectorImpl.java
new file mode 100644
index 0000000000..3b51abd726
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnectorImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.pool.ChannelPoolHandler;
+
+import java.util.Map;
+
+import org.neo4j.driver.internal.ConnectionSettings;
+import org.neo4j.driver.internal.net.BoltServerAddress;
+import org.neo4j.driver.internal.security.InternalAuthToken;
+import org.neo4j.driver.internal.security.SecurityPlan;
+import org.neo4j.driver.internal.util.Clock;
+import org.neo4j.driver.v1.AuthToken;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Logging;
+import org.neo4j.driver.v1.Value;
+import org.neo4j.driver.v1.exceptions.ClientException;
+
+import static java.util.Objects.requireNonNull;
+
+public class AsyncConnectorImpl implements AsyncConnector
+{
+ private final String userAgent;
+ private final Map authToken;
+ private final SecurityPlan securityPlan;
+ private final int connectTimeoutMillis;
+ private final ChannelPoolHandler channelPoolHandler;
+ private final Logging logging;
+ private final Clock clock;
+
+ public AsyncConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
+ ChannelPoolHandler channelPoolHandler, Logging logging, Clock clock )
+ {
+ this.userAgent = connectionSettings.userAgent();
+ this.authToken = tokenAsMap( connectionSettings.authToken() );
+ this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
+ this.securityPlan = requireNonNull( securityPlan );
+ this.channelPoolHandler = requireNonNull( channelPoolHandler );
+ this.logging = requireNonNull( logging );
+ this.clock = requireNonNull( clock );
+ }
+
+ @Override
+ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
+ {
+ bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMillis );
+ bootstrap.handler( new NettyChannelInitializer( address, securityPlan, channelPoolHandler, clock ) );
+
+ ChannelFuture channelConnected = bootstrap.connect( address.toSocketAddress() );
+
+ Channel channel = channelConnected.channel();
+ ChannelPromise handshakeCompleted = channel.newPromise();
+ ChannelPromise connectionInitialized = channel.newPromise();
+
+ channelConnected.addListener( new ChannelConnectedListener( address, handshakeCompleted, logging ) );
+ handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
+
+ return connectionInitialized;
+ }
+
+ private static Map tokenAsMap( AuthToken token )
+ {
+ if ( token instanceof InternalAuthToken )
+ {
+ return ((InternalAuthToken) token).toMap();
+ }
+ else
+ {
+ throw new ClientException(
+ "Unknown authentication token, `" + token + "`. Please use one of the supported " +
+ "tokens from `" + AuthTokens.class.getSimpleName() + "`." );
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/BootstrapFactory.java b/driver/src/main/java/org/neo4j/driver/internal/async/BootstrapFactory.java
new file mode 100644
index 0000000000..40f5245e22
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/BootstrapFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public final class BootstrapFactory
+{
+ private BootstrapFactory()
+ {
+ }
+
+ public static Bootstrap newBootstrap()
+ {
+ return newBootstrap( new NioEventLoopGroup() );
+ }
+
+ public static Bootstrap newBootstrap( int threadCount )
+ {
+ return newBootstrap( new NioEventLoopGroup( threadCount ) );
+ }
+
+ private static Bootstrap newBootstrap( NioEventLoopGroup eventLoopGroup )
+ {
+ Bootstrap bootstrap = new Bootstrap();
+ bootstrap.group( eventLoopGroup );
+ bootstrap.channel( NioSocketChannel.class );
+ bootstrap.option( ChannelOption.SO_KEEPALIVE, true );
+ bootstrap.option( ChannelOption.SO_REUSEADDR, true );
+ return bootstrap;
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java
new file mode 100644
index 0000000000..9654695118
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.channel.Channel;
+import io.netty.util.AttributeKey;
+
+import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
+import org.neo4j.driver.internal.net.BoltServerAddress;
+
+import static io.netty.util.AttributeKey.newInstance;
+
+public final class ChannelAttributes
+{
+ private static final AttributeKey ADDRESS = newInstance( "address" );
+ private static final AttributeKey CREATION_TIMESTAMP = newInstance( "creationTimestamp" );
+ private static final AttributeKey LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
+ private static final AttributeKey MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
+ private static final AttributeKey SERVER_VERSION = newInstance( "serverVersion" );
+
+ private ChannelAttributes()
+ {
+ }
+
+ public static BoltServerAddress address( Channel channel )
+ {
+ return get( channel, ADDRESS );
+ }
+
+ public static void setAddress( Channel channel, BoltServerAddress address )
+ {
+ setOnce( channel, ADDRESS, address );
+ }
+
+ public static long creationTimestamp( Channel channel )
+ {
+ return get( channel, CREATION_TIMESTAMP );
+ }
+
+ public static void setCreationTimestamp( Channel channel, long creationTimestamp )
+ {
+ setOnce( channel, CREATION_TIMESTAMP, creationTimestamp );
+ }
+
+ public static Long lastUsedTimestamp( Channel channel )
+ {
+ return get( channel, LAST_USED_TIMESTAMP );
+ }
+
+ public static void setLastUsedTimestamp( Channel channel, long lastUsedTimestamp )
+ {
+ set( channel, LAST_USED_TIMESTAMP, lastUsedTimestamp );
+ }
+
+ public static InboundMessageDispatcher messageDispatcher( Channel channel )
+ {
+ return get( channel, MESSAGE_DISPATCHER );
+ }
+
+ public static void setMessageDispatcher( Channel channel, InboundMessageDispatcher messageDispatcher )
+ {
+ setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher );
+ }
+
+ public static String serverVersion( Channel channel )
+ {
+ return get( channel, SERVER_VERSION );
+ }
+
+ public static void setServerVersion( Channel channel, String serverVersion )
+ {
+ setOnce( channel, SERVER_VERSION, serverVersion );
+ }
+
+ private static T get( Channel channel, AttributeKey key )
+ {
+ return channel.attr( key ).get();
+ }
+
+ private static void set( Channel channel, AttributeKey key, T value )
+ {
+ channel.attr( key ).set( value );
+ }
+
+ private static void setOnce( Channel channel, AttributeKey key, T value )
+ {
+ T existingValue = channel.attr( key ).setIfAbsent( value );
+ if ( existingValue != null )
+ {
+ throw new IllegalStateException(
+ "Unable to set " + key.name() + " because it is already set to " + existingValue );
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java
new file mode 100644
index 0000000000..2382cfb23d
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/ChannelConnectedListener.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelPromise;
+
+import org.neo4j.driver.internal.net.BoltServerAddress;
+import org.neo4j.driver.v1.Logging;
+import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;
+
+public class ChannelConnectedListener implements ChannelFutureListener
+{
+ private final BoltServerAddress address;
+ private final ChannelPromise handshakeCompletedPromise;
+ private final Logging logging;
+
+ public ChannelConnectedListener( BoltServerAddress address, ChannelPromise handshakeCompletedPromise,
+ Logging logging )
+ {
+ this.address = requireNonNull( address );
+ this.handshakeCompletedPromise = requireNonNull( handshakeCompletedPromise );
+ this.logging = requireNonNull( logging );
+ }
+
+ @Override
+ public void operationComplete( ChannelFuture future )
+ {
+ Channel channel = future.channel();
+
+ if ( future.isSuccess() )
+ {
+ channel.pipeline().addLast( new HandshakeResponseHandler( handshakeCompletedPromise, logging ) );
+ ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );
+
+ handshakeFuture.addListener( new ChannelFutureListener()
+ {
+ @Override
+ public void operationComplete( ChannelFuture future ) throws Exception
+ {
+ if ( !future.isSuccess() )
+ {
+ handshakeCompletedPromise.setFailure( future.cause() );
+ }
+ }
+ } );
+ }
+ else
+ {
+ handshakeCompletedPromise.setFailure( databaseUnavailableError( address, future.cause() ) );
+ }
+ }
+
+ private static Throwable databaseUnavailableError( BoltServerAddress address, Throwable cause )
+ {
+ return new ServiceUnavailableException( format(
+ "Unable to connect to %s, ensure the database is running and that there " +
+ "is a working network connection to it.", address ), cause );
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
new file mode 100644
index 0000000000..8f14c7d182
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
+import org.neo4j.driver.v1.util.Function;
+
+public final class Futures
+{
+ private Futures()
+ {
+ }
+
+ public static InternalFuture thenApply( InternalFuture future, Function fn )
+ {
+ InternalPromise result = new InternalPromise<>( future.eventExecutor() );
+ future.addListener( new ThenApplyListener<>( result, fn ) );
+ return result;
+ }
+
+ public static InternalFuture thenApply( Future future, Bootstrap bootstrap, Function fn )
+ {
+ InternalPromise result = new InternalPromise<>( bootstrap );
+ future.addListener( new ThenApplyListener<>( result, fn ) );
+ return result;
+ }
+
+ public static InternalFuture thenCombine( InternalFuture future, Function> fn )
+ {
+ InternalPromise result = new InternalPromise<>( future.eventExecutor() );
+ future.addListener( new ThenCombineListener<>( result, fn ) );
+ return result;
+ }
+
+ public static InternalFuture whenComplete( InternalFuture future, Runnable action )
+ {
+ InternalPromise result = new InternalPromise<>( future.eventExecutor() );
+ future.addListener( new CompletionListener<>( result, action ) );
+ return result;
+ }
+
+ private static class ThenApplyListener implements GenericFutureListener>
+ {
+ final Promise result;
+ final Function fn;
+
+ ThenApplyListener( Promise result, Function fn )
+ {
+ this.result = result;
+ this.fn = fn;
+ }
+
+ @Override
+ public void operationComplete( Future future ) throws Exception
+ {
+ if ( future.isCancelled() )
+ {
+ result.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ try
+ {
+ T originalValue = future.getNow();
+ U newValue = fn.apply( originalValue );
+ result.setSuccess( newValue );
+ }
+ catch ( Throwable t )
+ {
+ result.setFailure( t );
+ }
+ }
+ else
+ {
+ result.setFailure( future.cause() );
+ }
+ }
+ }
+
+ private static class ThenCombineListener implements GenericFutureListener>
+ {
+ final Promise result;
+ final Function> fn;
+
+ ThenCombineListener( Promise result, Function> fn )
+ {
+ this.result = result;
+ this.fn = fn;
+ }
+
+ @Override
+ public void operationComplete( Future future ) throws Exception
+ {
+ if ( future.isCancelled() )
+ {
+ result.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ try
+ {
+ T value = future.getNow();
+ InternalFuture newFuture = fn.apply( value );
+ newFuture.addListener( new NestedThenCombineListener<>( result, newFuture ) );
+ }
+ catch ( Throwable t )
+ {
+ result.setFailure( t );
+ }
+ }
+ else
+ {
+ result.setFailure( future.cause() );
+ }
+ }
+ }
+
+ private static class NestedThenCombineListener implements GenericFutureListener>
+ {
+
+ final Promise result;
+ final Future future;
+
+ NestedThenCombineListener( Promise result, Future future )
+ {
+ this.result = result;
+ this.future = future;
+ }
+
+ @Override
+ public void operationComplete( Future future ) throws Exception
+ {
+ if ( future.isCancelled() )
+ {
+ result.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ result.setSuccess( future.getNow() );
+ }
+ else
+ {
+ result.setFailure( future.cause() );
+ }
+ }
+ }
+
+ private static class CompletionListener implements GenericFutureListener>
+ {
+ final Promise result;
+ final Runnable action;
+
+ CompletionListener( Promise result, Runnable action )
+ {
+ this.result = result;
+ this.action = action;
+ }
+
+ @Override
+ public void operationComplete( Future future ) throws Exception
+ {
+ if ( future.isCancelled() )
+ {
+ result.cancel( true );
+ }
+ else if ( future.isSuccess() )
+ {
+ try
+ {
+ action.run();
+ result.setSuccess( future.getNow() );
+ }
+ catch ( Throwable t )
+ {
+ result.setFailure( t );
+ }
+ }
+ else
+ {
+ Throwable error = future.cause();
+ try
+ {
+ action.run();
+ }
+ catch ( Throwable t )
+ {
+ error.addSuppressed( t );
+ }
+ result.setFailure( error );
+ }
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java
new file mode 100644
index 0000000000..3094b13c79
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeCompletedListener.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelPromise;
+
+import java.util.Map;
+
+import org.neo4j.driver.internal.handlers.AsyncInitResponseHandler;
+import org.neo4j.driver.internal.messaging.InitMessage;
+import org.neo4j.driver.v1.Value;
+
+import static java.util.Objects.requireNonNull;
+import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
+
+public class HandshakeCompletedListener implements ChannelFutureListener
+{
+ private final String userAgent;
+ private final Map authToken;
+ private final ChannelPromise connectionInitializedPromise;
+
+ public HandshakeCompletedListener( String userAgent, Map authToken,
+ ChannelPromise connectionInitializedPromise )
+ {
+ this.userAgent = requireNonNull( userAgent );
+ this.authToken = requireNonNull( authToken );
+ this.connectionInitializedPromise = requireNonNull( connectionInitializedPromise );
+ }
+
+ @Override
+ public void operationComplete( ChannelFuture future )
+ {
+ if ( future.isSuccess() )
+ {
+ Channel channel = future.channel();
+
+ InitMessage message = new InitMessage( userAgent, authToken );
+ AsyncInitResponseHandler handler = new AsyncInitResponseHandler( connectionInitializedPromise );
+
+ messageDispatcher( channel ).queue( handler );
+ channel.writeAndFlush( message );
+ }
+ else
+ {
+ connectionInitializedPromise.setFailure( future.cause() );
+ }
+ }
+}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java
new file mode 100644
index 0000000000..c9e1f6d2d3
--- /dev/null
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/HandshakeResponseHandler.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2002-2017 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.internal.async;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.ReplayingDecoder;
+
+import java.util.List;
+
+import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
+import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
+import org.neo4j.driver.internal.async.inbound.MessageDecoder;
+import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
+import org.neo4j.driver.internal.messaging.MessageFormat;
+import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
+import org.neo4j.driver.v1.Logger;
+import org.neo4j.driver.v1.Logging;
+import org.neo4j.driver.v1.exceptions.ClientException;
+
+import static java.util.Objects.requireNonNull;
+import static org.neo4j.driver.internal.async.ProtocolUtil.HTTP;
+import static org.neo4j.driver.internal.async.ProtocolUtil.NO_PROTOCOL_VERSION;
+import static org.neo4j.driver.internal.async.ProtocolUtil.PROTOCOL_VERSION_1;
+import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
+
+public class HandshakeResponseHandler extends ReplayingDecoder
+{
+ private final ChannelPromise handshakeCompletedPromise;
+ private final Logger log;
+
+ public HandshakeResponseHandler( ChannelPromise handshakeCompletedPromise, Logging logging )
+ {
+ this.handshakeCompletedPromise = requireNonNull( handshakeCompletedPromise );
+ this.log = logging.getLog( getClass().getSimpleName() );
+ }
+
+ @Override
+ public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) throws Exception
+ {
+ fail( ctx, cause );
+ }
+
+ @Override
+ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List