diff --git a/driver/pom.xml b/driver/pom.xml
index c5be24348b..2c2d92c744 100644
--- a/driver/pom.xml
+++ b/driver/pom.xml
@@ -104,8 +104,8 @@
maven-compiler-plugin
2.3.2
- 1.7
- 1.7
+ ${java.version}
+ ${java.version}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
index 4bd40e7bd2..639dcd75ba 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java
@@ -18,8 +18,10 @@
*/
package org.neo4j.driver.internal;
+import java.util.concurrent.CompletionStage;
+
import org.neo4j.driver.internal.async.AsyncConnection;
-import org.neo4j.driver.internal.async.InternalFuture;
+import org.neo4j.driver.internal.async.Futures;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -53,7 +55,7 @@ public PooledConnection acquireConnection( AccessMode mode )
}
@Override
- public InternalFuture acquireAsyncConnection( AccessMode mode )
+ public CompletionStage acquireAsyncConnection( AccessMode mode )
{
return asyncPool.acquire( address );
}
@@ -62,7 +64,7 @@ public InternalFuture acquireAsyncConnection( AccessMode mode )
public void close() throws Exception
{
pool.close();
- asyncPool.closeAsync().syncUninterruptibly();
+ Futures.getBlocking( asyncPool.closeAsync() );
}
public BoltServerAddress getAddress()
diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
index 527874e44f..e1eb995d26 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
@@ -28,6 +28,7 @@
import org.neo4j.driver.internal.async.AsyncConnectorImpl;
import org.neo4j.driver.internal.async.BootstrapFactory;
+import org.neo4j.driver.internal.async.Futures;
import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
@@ -85,7 +86,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
try
{
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
- asyncConnectionPool, eventLoopGroup );
+ asyncConnectionPool );
}
catch ( Throwable driverError )
{
@@ -93,7 +94,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
try
{
connectionPool.close();
- asyncConnectionPool.closeAsync().syncUninterruptibly();
+ Futures.getBlocking( asyncConnectionPool.closeAsync() );
}
catch ( Throwable closeError )
{
@@ -120,19 +121,17 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
}
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
- Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
- RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
+ Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
+ AsyncConnectionPool asyncConnectionPool )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
- return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
- eventExecutorGroup );
+ return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
case BOLT_ROUTING_URI_SCHEME:
- return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
- eventExecutorGroup );
+ return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
@@ -144,13 +143,12 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* This method is protected only for testing
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
- SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
- EventExecutorGroup eventExecutorGroup )
+ SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
{
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
SessionFactory sessionFactory =
- createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
+ createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}
@@ -160,16 +158,14 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* This method is protected only for testing
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
- Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
- EventExecutorGroup eventExecutorGroup )
+ Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
- SessionFactory sessionFactory =
- createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
+ SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}
@@ -251,9 +247,9 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
* This method is protected only for testing
*/
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
- EventExecutorGroup eventExecutorGroup, Config config )
+ Config config )
{
- return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
+ return new SessionFactoryImpl( connectionProvider, retryLogic, config );
}
/**
diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
index 7551c88373..0ba43cf66d 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java
@@ -20,11 +20,12 @@
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.BiConsumer;
import org.neo4j.driver.ResultResourcesHandler;
import org.neo4j.driver.internal.async.AsyncConnection;
-import org.neo4j.driver.internal.async.InternalFuture;
-import org.neo4j.driver.internal.async.InternalPromise;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
import org.neo4j.driver.internal.handlers.BookmarkResponseHandler;
@@ -33,9 +34,7 @@
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.types.InternalTypeSystem;
-import org.neo4j.driver.internal.util.BiConsumer;
import org.neo4j.driver.v1.Record;
-import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.StatementResultCursor;
@@ -45,8 +44,9 @@
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.types.TypeSystem;
-import org.neo4j.driver.v1.util.Function;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.neo4j.driver.internal.async.Futures.failedFuture;
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
import static org.neo4j.driver.v1.Values.ofValue;
import static org.neo4j.driver.v1.Values.value;
@@ -118,25 +118,23 @@ public void begin( Bookmark initialBookmark )
}
}
- public InternalFuture beginAsync( Bookmark initialBookmark )
+ public CompletionStage beginAsync( Bookmark initialBookmark )
{
- InternalPromise beginTxPromise = asyncConnection.newPromise();
-
Map parameters = initialBookmark.asBeginTransactionParameters();
asyncConnection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );
if ( initialBookmark.isEmpty() )
{
asyncConnection.pullAll( NoOpResponseHandler.INSTANCE );
- beginTxPromise.setSuccess( this );
+ return completedFuture( this );
}
else
{
- asyncConnection.pullAll( new BeginTxResponseHandler<>( beginTxPromise, this ) );
+ CompletableFuture beginFuture = new CompletableFuture<>();
+ asyncConnection.pullAll( new BeginTxResponseHandler<>( beginFuture, this ) );
asyncConnection.flush();
+ return beginFuture;
}
-
- return beginTxPromise;
}
@Override
@@ -215,21 +213,15 @@ private void rollbackTx()
}
@Override
- public Response commitAsync()
- {
- return internalCommitAsync();
- }
-
- InternalFuture internalCommitAsync()
+ public CompletionStage commitAsync()
{
if ( state == State.COMMITTED )
{
- return asyncConnection.newPromise().setSuccess( null );
+ return completedFuture( null );
}
else if ( state == State.ROLLED_BACK )
{
- return asyncConnection.newPromise().setFailure(
- new ClientException( "Can't commit, transaction has already been rolled back" ) );
+ return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) );
}
else
{
@@ -238,21 +230,15 @@ else if ( state == State.ROLLED_BACK )
}
@Override
- public Response rollbackAsync()
- {
- return internalRollbackAsync();
- }
-
- InternalFuture internalRollbackAsync()
+ public CompletionStage rollbackAsync()
{
if ( state == State.COMMITTED )
{
- return asyncConnection.newPromise()
- .setFailure( new ClientException( "Can't rollback, transaction has already been committed" ) );
+ return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) );
}
else if ( state == State.ROLLED_BACK )
{
- return asyncConnection.newPromise().setSuccess( null );
+ return completedFuture( null );
}
else
{
@@ -262,51 +248,39 @@ else if ( state == State.ROLLED_BACK )
private BiConsumer releaseConnectionAndNotifySession()
{
- return new BiConsumer()
+ return ( ignore, error ) ->
{
- @Override
- public void accept( Void result, Throwable error )
- {
- asyncConnection.release();
- session.asyncTransactionClosed( ExplicitTransaction.this );
- }
+ asyncConnection.release();
+ session.asyncTransactionClosed( ExplicitTransaction.this );
};
}
- private InternalFuture doCommitAsync()
+ private CompletionStage doCommitAsync()
{
- InternalPromise commitTxPromise = asyncConnection.newPromise();
+ CompletableFuture commitFuture = new CompletableFuture<>();
- asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
- asyncConnection.pullAll( new CommitTxResponseHandler( commitTxPromise, this ) );
+ asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
+ asyncConnection.pullAll( new CommitTxResponseHandler( commitFuture, this ) );
asyncConnection.flush();
- return commitTxPromise.thenApply( new Function()
+ return commitFuture.thenApply( ignore ->
{
- @Override
- public Void apply( Void ignore )
- {
- ExplicitTransaction.this.state = State.COMMITTED;
- return null;
- }
+ ExplicitTransaction.this.state = State.COMMITTED;
+ return null;
} );
}
- private InternalFuture doRollbackAsync()
+ private CompletionStage doRollbackAsync()
{
- InternalPromise rollbackTxPromise = asyncConnection.newPromise();
- asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
- asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackTxPromise ) );
+ CompletableFuture rollbackFuture = new CompletableFuture<>();
+ asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
+ asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackFuture ) );
asyncConnection.flush();
- return rollbackTxPromise.thenApply( new Function()
+ return rollbackFuture.thenApply( ignore ->
{
- @Override
- public Void apply( Void ignore )
- {
- ExplicitTransaction.this.state = State.ROLLED_BACK;
- return null;
- }
+ ExplicitTransaction.this.state = State.ROLLED_BACK;
+ return null;
} );
}
@@ -317,7 +291,7 @@ public StatementResult run( String statementText, Value statementParameters )
}
@Override
- public Response runAsync( String statementText, Value parameters )
+ public CompletionStage runAsync( String statementText, Value parameters )
{
return runAsync( new Statement( statementText, parameters ) );
}
@@ -329,7 +303,7 @@ public StatementResult run( String statementText )
}
@Override
- public Response runAsync( String statementTemplate )
+ public CompletionStage runAsync( String statementTemplate )
{
return runAsync( statementTemplate, Values.EmptyMap );
}
@@ -342,7 +316,8 @@ public StatementResult run( String statementText, Map statementPa
}
@Override
- public Response runAsync( String statementTemplate, Map statementParameters )
+ public CompletionStage runAsync( String statementTemplate,
+ Map statementParameters )
{
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
return runAsync( statementTemplate, params );
@@ -356,7 +331,7 @@ public StatementResult run( String statementTemplate, Record statementParameters
}
@Override
- public Response runAsync( String statementTemplate, Record statementParameters )
+ public CompletionStage runAsync( String statementTemplate, Record statementParameters )
{
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
return runAsync( statementTemplate, params );
@@ -388,7 +363,7 @@ public StatementResult run( Statement statement )
}
@Override
- public Response runAsync( Statement statement )
+ public CompletionStage runAsync( Statement statement )
{
ensureNotFailed();
return QueryRunner.runAsync( asyncConnection, statement, this );
diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
index 63420be0e8..eeecf4f0fe 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java
@@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal;
-import io.netty.util.concurrent.EventExecutorGroup;
-
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
@@ -32,9 +30,9 @@ class LeakLoggingNetworkSession extends NetworkSession
private final String stackTrace;
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
- EventExecutorGroup eventExecutorGroup, Logging logging )
+ Logging logging )
{
- super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
+ super( connectionProvider, mode, retryLogic, logging );
this.stackTrace = captureStackTrace();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
index 4a39bab241..7a59ea4f08 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
@@ -18,17 +18,14 @@
*/
package org.neo4j.driver.internal;
-import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import org.neo4j.driver.ResultResourcesHandler;
import org.neo4j.driver.internal.async.AsyncConnection;
-import org.neo4j.driver.internal.async.InternalFuture;
-import org.neo4j.driver.internal.async.InternalPromise;
+import org.neo4j.driver.internal.async.Futures;
import org.neo4j.driver.internal.async.QueryRunner;
import org.neo4j.driver.internal.logging.DelegatingLogger;
import org.neo4j.driver.internal.retry.RetryLogic;
@@ -41,8 +38,6 @@
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Record;
-import org.neo4j.driver.v1.Response;
-import org.neo4j.driver.v1.ResponseListener;
import org.neo4j.driver.v1.Session;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
@@ -53,8 +48,8 @@
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.types.TypeSystem;
-import org.neo4j.driver.v1.util.Function;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.neo4j.driver.v1.Values.value;
public class NetworkSession implements Session, SessionResourcesHandler, ResultResourcesHandler
@@ -64,25 +59,23 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR
private final ConnectionProvider connectionProvider;
private final AccessMode mode;
private final RetryLogic retryLogic;
- private final EventExecutorGroup eventExecutorGroup;
protected final Logger logger;
private volatile Bookmark bookmark = Bookmark.empty();
private PooledConnection currentConnection;
private ExplicitTransaction currentTransaction;
- private volatile InternalFuture currentAsyncTransactionFuture;
+ private volatile CompletionStage asyncTransactionStage;
- private InternalFuture asyncConnectionFuture;
+ private CompletionStage asyncConnectionStage;
private final AtomicBoolean isOpen = new AtomicBoolean( true );
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
- EventExecutorGroup eventExecutorGroup, Logging logging )
+ Logging logging )
{
this.connectionProvider = connectionProvider;
this.mode = mode;
this.retryLogic = retryLogic;
- this.eventExecutorGroup = eventExecutorGroup;
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
}
@@ -93,7 +86,7 @@ public StatementResult run( String statementText )
}
@Override
- public Response runAsync( String statementText )
+ public CompletionStage runAsync( String statementText )
{
return runAsync( statementText, Values.EmptyMap );
}
@@ -106,7 +99,8 @@ public StatementResult run( String statementText, Map statementPa
}
@Override
- public Response runAsync( String statementText, Map statementParameters )
+ public CompletionStage runAsync( String statementText,
+ Map statementParameters )
{
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
return runAsync( statementText, params );
@@ -120,7 +114,7 @@ public StatementResult run( String statementTemplate, Record statementParameters
}
@Override
- public Response runAsync( String statementTemplate, Record statementParameters )
+ public CompletionStage runAsync( String statementTemplate, Record statementParameters )
{
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
return runAsync( statementTemplate, params );
@@ -133,7 +127,7 @@ public StatementResult run( String statementText, Value statementParameters )
}
@Override
- public Response runAsync( String statementText, Value parameters )
+ public CompletionStage runAsync( String statementText, Value parameters )
{
return runAsync( new Statement( statementText, parameters ) );
}
@@ -151,21 +145,13 @@ public StatementResult run( Statement statement )
}
@Override
- public Response runAsync( final Statement statement )
+ public CompletionStage runAsync( final Statement statement )
{
ensureSessionIsOpen();
ensureNoOpenTransactionBeforeRunningSession();
- InternalFuture connectionFuture = acquireAsyncConnection( mode );
-
- return connectionFuture.thenCompose( new Function>()
- {
- @Override
- public InternalFuture apply( AsyncConnection connection )
- {
- return QueryRunner.runAsync( connection, statement );
- }
- } );
+ return acquireAsyncConnection( mode ).thenCompose( connection ->
+ QueryRunner.runAsync( connection, statement ) );
}
public static StatementResult run( Connection connection, Statement statement,
@@ -232,7 +218,7 @@ public void close()
try
{
- closeAsync().get();
+ closeAsync().toCompletableFuture().get();
}
catch ( Exception e )
{
@@ -241,33 +227,19 @@ public void close()
}
@Override
- public Response closeAsync()
+ public CompletionStage closeAsync()
{
- if ( asyncConnectionFuture != null )
+ if ( asyncConnectionStage != null )
{
- return asyncConnectionFuture.thenCompose( new Function>()
- {
- @Override
- public InternalFuture apply( AsyncConnection connection )
- {
- return connection.forceRelease();
- }
- } );
+ return asyncConnectionStage.thenCompose( AsyncConnection::forceRelease );
}
- else if ( currentAsyncTransactionFuture != null )
+ else if ( asyncTransactionStage != null )
{
- return currentAsyncTransactionFuture.thenCompose( new Function>()
- {
- @Override
- public InternalFuture apply( ExplicitTransaction tx )
- {
- return tx.internalRollbackAsync();
- }
- } );
+ return asyncTransactionStage.thenCompose( ExplicitTransaction::rollbackAsync );
}
else
{
- return new InternalPromise( eventExecutorGroup ).setSuccess( null );
+ return completedFuture( null );
}
}
@@ -286,10 +258,10 @@ public synchronized Transaction beginTransaction( String bookmark )
}
@Override
- public Response beginTransactionAsync()
+ public CompletionStage beginTransactionAsync()
{
//noinspection unchecked
- return (Response) beginTransactionAsync( mode );
+ return (CompletionStage) beginTransactionAsync( mode );
}
@Override
@@ -299,7 +271,7 @@ public T readTransaction( TransactionWork work )
}
@Override
- public Response readTransactionAsync( TransactionWork> work )
+ public CompletionStage readTransactionAsync( TransactionWork> work )
{
return transactionAsync( AccessMode.READ, work );
}
@@ -311,7 +283,7 @@ public T writeTransaction( TransactionWork work )
}
@Override
- public Response writeTransactionAsync( TransactionWork> work )
+ public CompletionStage writeTransactionAsync( TransactionWork> work )
{
return transactionAsync( AccessMode.WRITE, work );
}
@@ -368,7 +340,7 @@ public synchronized void onTransactionClosed( ExplicitTransaction tx )
public void asyncTransactionClosed( ExplicitTransaction tx )
{
setBookmark( tx.bookmark() );
- currentAsyncTransactionFuture = null;
+ asyncTransactionStage = null;
}
@Override
@@ -415,63 +387,47 @@ public T get()
} );
}
- private InternalFuture transactionAsync( final AccessMode mode, final TransactionWork> work )
+ private CompletionStage transactionAsync( AccessMode mode, TransactionWork> work )
{
- return retryLogic.retryAsync( new Supplier>()
+ return retryLogic.retryAsync( () ->
{
- @Override
- public InternalFuture get()
- {
- final InternalFuture txFuture = beginTransactionAsync( mode );
- final InternalPromise resultPromise = new InternalPromise<>( txFuture.eventExecutor() );
+ CompletableFuture resultFuture = new CompletableFuture<>();
+ CompletionStage txFuture = beginTransactionAsync( mode );
- txFuture.addListener( new FutureListener()
+ txFuture.whenComplete( ( tx, error ) ->
+ {
+ if ( error != null )
{
- @Override
- public void operationComplete( Future future ) throws Exception
- {
- if ( future.isCancelled() )
- {
- resultPromise.cancel( true );
- }
- else if ( future.isSuccess() )
- {
- executeWork( resultPromise, future.getNow(), work );
- }
- else
- {
- resultPromise.setFailure( future.cause() );
- }
- }
- } );
+ resultFuture.completeExceptionally( error );
+ }
+ else
+ {
+ executeWork( resultFuture, tx, work );
+ }
+ } );
- return resultPromise;
- }
+ return resultFuture;
} );
}
- private void executeWork( final InternalPromise resultPromise, final ExplicitTransaction tx,
- TransactionWork> work )
+ private void executeWork( CompletableFuture resultFuture, ExplicitTransaction tx,
+ TransactionWork> work )
{
- Response workResponse = safeExecuteWork( tx, work );
- workResponse.addListener( new ResponseListener()
+ CompletionStage workFuture = safeExecuteWork( tx, work );
+ workFuture.whenComplete( ( result, error ) ->
{
- @Override
- public void operationCompleted( T result, Throwable error )
+ if ( error != null )
{
- if ( error != null )
- {
- rollbackTxAfterFailedTransactionWork( tx, resultPromise, error );
- }
- else
- {
- commitTxAfterSucceededTransactionWork( tx, resultPromise, result );
- }
+ rollbackTxAfterFailedTransactionWork( tx, resultFuture, error );
+ }
+ else
+ {
+ commitTxAfterSucceededTransactionWork( tx, resultFuture, result );
}
} );
}
- private Response safeExecuteWork( ExplicitTransaction tx, TransactionWork> work )
+ private CompletionStage safeExecuteWork( ExplicitTransaction tx, TransactionWork> work )
{
// given work might fail in both async and sync way
// async failure will result in a failed future being returned
@@ -483,58 +439,50 @@ private Response safeExecuteWork( ExplicitTransaction tx, TransactionWork
catch ( Throwable workError )
{
// work threw an exception, wrap it in a future and proceed
- return new InternalPromise( eventExecutorGroup ).setFailure( workError );
+ return Futures.failedFuture( workError );
}
}
- private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx,
- final InternalPromise resultPromise, final Throwable error )
+ private void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx, CompletableFuture resultFuture,
+ Throwable error )
{
if ( tx.isOpen() )
{
- tx.rollbackAsync().addListener( new ResponseListener()
+ tx.rollbackAsync().whenComplete( ( ignore, rollbackError ) ->
{
- @Override
- public void operationCompleted( Void ignore, Throwable rollbackError )
+ if ( rollbackError != null )
{
- if ( rollbackError != null )
- {
- error.addSuppressed( rollbackError );
- }
- resultPromise.setFailure( error );
+ error.addSuppressed( rollbackError );
}
+ resultFuture.completeExceptionally( error );
} );
}
else
{
- resultPromise.setFailure( error );
+ resultFuture.completeExceptionally( error );
}
}
- private void commitTxAfterSucceededTransactionWork( ExplicitTransaction tx,
- final InternalPromise resultPromise, final T result )
+ private void commitTxAfterSucceededTransactionWork( ExplicitTransaction tx, CompletableFuture resultFuture,
+ T result )
{
if ( tx.isOpen() )
{
- tx.commitAsync().addListener( new ResponseListener()
+ tx.commitAsync().whenComplete( ( ignore, commitError ) ->
{
- @Override
- public void operationCompleted( Void ignore, Throwable commitError )
+ if ( commitError != null )
{
- if ( commitError != null )
- {
- resultPromise.setFailure( commitError );
- }
- else
- {
- resultPromise.setSuccess( result );
- }
+ resultFuture.completeExceptionally( commitError );
+ }
+ else
+ {
+ resultFuture.complete( result );
}
} );
}
else
{
- resultPromise.setSuccess( result );
+ resultFuture.complete( result );
}
}
@@ -553,25 +501,18 @@ private synchronized Transaction beginTransaction( AccessMode mode )
return currentTransaction;
}
- private synchronized InternalFuture beginTransactionAsync( AccessMode mode )
+ private synchronized CompletionStage beginTransactionAsync( AccessMode mode )
{
ensureSessionIsOpen();
ensureNoOpenTransactionBeforeOpeningTransaction();
- InternalFuture connectionFuture = acquireAsyncConnection( mode );
-
- currentAsyncTransactionFuture = connectionFuture.thenCompose(
- new Function>()
- {
- @Override
- public InternalFuture apply( AsyncConnection connection )
- {
- ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
- return tx.beginAsync( bookmark );
- }
- } );
+ asyncTransactionStage = acquireAsyncConnection( mode ).thenCompose( connection ->
+ {
+ ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
+ return tx.beginAsync( bookmark );
+ } );
- return currentAsyncTransactionFuture;
+ return asyncTransactionStage;
}
private void ensureNoUnrecoverableError()
@@ -587,7 +528,7 @@ private void ensureNoUnrecoverableError()
//should be called from a synchronized block
private void ensureNoOpenTransactionBeforeRunningSession()
{
- if ( currentTransaction != null || currentAsyncTransactionFuture != null )
+ if ( currentTransaction != null || asyncTransactionStage != null )
{
throw new ClientException( "Statements cannot be run directly on a session with an open transaction;" +
" either run from within the transaction or use a different session." );
@@ -597,7 +538,7 @@ private void ensureNoOpenTransactionBeforeRunningSession()
//should be called from a synchronized block
private void ensureNoOpenTransactionBeforeOpeningTransaction()
{
- if ( currentTransaction != null || currentAsyncTransactionFuture != null )
+ if ( currentTransaction != null || asyncTransactionStage != null )
{
throw new ClientException( "You cannot begin a transaction on a session with an open transaction;" +
" either run from within the transaction or use a different session." );
@@ -624,36 +565,31 @@ private PooledConnection acquireConnection( AccessMode mode )
return connection;
}
- private InternalFuture acquireAsyncConnection( final AccessMode mode )
+ private CompletionStage acquireAsyncConnection( final AccessMode mode )
{
- if ( asyncConnectionFuture == null )
+ if ( asyncConnectionStage == null )
{
- asyncConnectionFuture = connectionProvider.acquireAsyncConnection( mode );
+ asyncConnectionStage = connectionProvider.acquireAsyncConnection( mode );
}
else
{
// memorize in local so same instance is transformed and used in callbacks
- final InternalFuture currentAsyncConnectionFuture = asyncConnectionFuture;
+ CompletionStage currentAsyncConnectionStage = asyncConnectionStage;
- asyncConnectionFuture = currentAsyncConnectionFuture.thenCompose(
- new Function>()
- {
- @Override
- public InternalFuture apply( AsyncConnection connection )
- {
- if ( connection.tryMarkInUse() )
- {
- return currentAsyncConnectionFuture;
- }
- else
- {
- return connectionProvider.acquireAsyncConnection( mode );
- }
- }
- } );
+ asyncConnectionStage = currentAsyncConnectionStage.thenCompose( connection ->
+ {
+ if ( connection.tryMarkInUse() )
+ {
+ return currentAsyncConnectionStage;
+ }
+ else
+ {
+ return connectionProvider.acquireAsyncConnection( mode );
+ }
+ } );
}
- return asyncConnectionFuture;
+ return asyncConnectionStage;
}
boolean currentConnectionIsOpen()
diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
index cc9c7050ac..a9070c7911 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java
@@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal;
-import io.netty.util.concurrent.EventExecutorGroup;
-
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.v1.AccessMode;
@@ -31,15 +29,12 @@ public class SessionFactoryImpl implements SessionFactory
{
private final ConnectionProvider connectionProvider;
private final RetryLogic retryLogic;
- private final EventExecutorGroup eventExecutorGroup;
private final Logging logging;
private final boolean leakedSessionsLoggingEnabled;
- SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic,
- EventExecutorGroup eventExecutorGroup, Config config )
+ SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config )
{
this.connectionProvider = connectionProvider;
- this.eventExecutorGroup = eventExecutorGroup;
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
this.retryLogic = retryLogic;
this.logging = config.logging();
@@ -57,8 +52,8 @@ protected NetworkSession createSession( ConnectionProvider connectionProvider, R
AccessMode mode, Logging logging )
{
return leakedSessionsLoggingEnabled
- ? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging )
- : new NetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
+ ? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, logging )
+ : new NetworkSession( connectionProvider, mode, retryLogic, logging );
}
@Override
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java
index a5f75a59c0..79b8b883e9 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java
@@ -19,6 +19,7 @@
package org.neo4j.driver.internal.async;
import java.util.Map;
+import java.util.concurrent.CompletionStage;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.v1.Value;
@@ -38,11 +39,9 @@ public interface AsyncConnection
void flush();
- InternalPromise newPromise();
-
void release();
- InternalFuture forceRelease();
+ CompletionStage forceRelease();
ServerInfo serverInfo();
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
index 04b0240eee..93648a0eb8 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/Futures.java
@@ -18,13 +18,12 @@
*/
package org.neo4j.driver.internal.async;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
+import io.netty.util.internal.PlatformDependent;
-import org.neo4j.driver.internal.util.BiConsumer;
-import org.neo4j.driver.v1.util.Function;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
public final class Futures
{
@@ -32,182 +31,77 @@ private Futures()
{
}
- public static InternalFuture thenApply( InternalFuture future, Function fn )
+ public static CompletionStage asCompletionStage( io.netty.util.concurrent.Future future )
{
- InternalPromise result = new InternalPromise<>( future.eventExecutor() );
- future.addListener( new ThenApplyListener<>( result, fn ) );
- return result;
- }
-
- public static InternalFuture thenApply( Future future, Bootstrap bootstrap, Function fn )
- {
- InternalPromise result = new InternalPromise<>( bootstrap );
- future.addListener( new ThenApplyListener<>( result, fn ) );
- return result;
- }
-
- public static InternalFuture thenCompose( InternalFuture future, Function> fn )
- {
- InternalPromise result = new InternalPromise<>( future.eventExecutor() );
- future.addListener( new ThenComposeListener<>( result, fn ) );
- return result;
- }
-
- public static InternalFuture whenComplete( InternalFuture future, BiConsumer action )
- {
- InternalPromise result = new InternalPromise<>( future.eventExecutor() );
- future.addListener( new CompletionListener<>( result, action ) );
- return result;
- }
-
- private static class ThenApplyListener implements GenericFutureListener>
- {
- final Promise result;
- final Function fn;
-
- ThenApplyListener( Promise result, Function fn )
- {
- this.result = result;
- this.fn = fn;
- }
-
- @Override
- public void operationComplete( Future future ) throws Exception
+ CompletableFuture result = new CompletableFuture<>();
+ if ( future.isCancelled() )
{
- if ( future.isCancelled() )
- {
- result.cancel( true );
- }
- else if ( future.isSuccess() )
- {
- try
- {
- T originalValue = future.getNow();
- U newValue = fn.apply( originalValue );
- result.setSuccess( newValue );
- }
- catch ( Throwable t )
- {
- result.setFailure( t );
- }
- }
- else
- {
- result.setFailure( future.cause() );
- }
+ result.cancel( true );
}
- }
-
- private static class ThenComposeListener implements GenericFutureListener>
- {
- final Promise result;
- final Function> fn;
-
- ThenComposeListener( Promise result, Function> fn )
+ else if ( future.isSuccess() )
{
- this.result = result;
- this.fn = fn;
+ result.complete( future.getNow() );
}
-
- @Override
- public void operationComplete( Future future ) throws Exception
+ else
{
- if ( future.isCancelled() )
- {
- result.cancel( true );
- }
- else if ( future.isSuccess() )
+ future.addListener( ignore ->
{
- try
+ if ( future.isCancelled() )
{
- T value = future.getNow();
- InternalFuture newFuture = fn.apply( value );
- newFuture.addListener( new NestedThenCombineListener<>( result, newFuture ) );
+ result.cancel( true );
}
- catch ( Throwable t )
+ else if ( future.isSuccess() )
{
- result.setFailure( t );
+ result.complete( future.getNow() );
}
- }
- else
- {
- result.setFailure( future.cause() );
- }
+ else
+ {
+ result.completeExceptionally( future.cause() );
+ }
+ } );
}
+ return result;
}
- private static class NestedThenCombineListener implements GenericFutureListener>
+ public static CompletableFuture failedFuture( Throwable error )
{
-
- final Promise result;
- final Future future;
-
- NestedThenCombineListener( Promise result, Future future )
- {
- this.result = result;
- this.future = future;
- }
-
- @Override
- public void operationComplete( Future future ) throws Exception
- {
- if ( future.isCancelled() )
- {
- result.cancel( true );
- }
- else if ( future.isSuccess() )
- {
- result.setSuccess( future.getNow() );
- }
- else
- {
- result.setFailure( future.cause() );
- }
- }
+ CompletableFuture result = new CompletableFuture<>();
+ result.completeExceptionally( error );
+ return result;
}
- private static class CompletionListener implements GenericFutureListener>
+ public static V getBlocking( CompletionStage stage )
{
- final Promise result;
- final BiConsumer action;
-
- CompletionListener( Promise result, BiConsumer action )
- {
- this.result = result;
- this.action = action;
- }
+ Future future = stage.toCompletableFuture();
+ return getBlocking( future );
+ }
- @Override
- public void operationComplete( Future future ) throws Exception
+ public static V getBlocking( Future future )
+ {
+ boolean interrupted = false;
+ try
{
- if ( future.isCancelled() )
- {
- result.cancel( true );
- }
- else if ( future.isSuccess() )
+ while ( true )
{
try
{
- action.accept( future.getNow(), null );
- result.setSuccess( future.getNow() );
+ return future.get();
}
- catch ( Throwable t )
+ catch ( InterruptedException e )
{
- result.setFailure( t );
+ interrupted = true;
}
- }
- else
- {
- Throwable error = future.cause();
- try
- {
- action.accept( null, error );
- }
- catch ( Throwable t )
+ catch ( ExecutionException e )
{
- error.addSuppressed( t );
+ PlatformDependent.throwException( e.getCause() );
}
- result.setFailure( error );
+ }
+ }
+ finally
+ {
+ if ( interrupted )
+ {
+ Thread.currentThread().interrupt();
}
}
}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java
deleted file mode 100644
index 6a48065b4e..0000000000
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright (c) 2002-2017 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.neo4j.driver.internal.async;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.neo4j.driver.internal.util.BiConsumer;
-import org.neo4j.driver.v1.ResponseListener;
-import org.neo4j.driver.v1.util.Function;
-
-public class InternalPromise implements InternalFuture, Promise
-{
- private final EventExecutor eventExecutor;
- private final Promise delegate;
-
- public InternalPromise( Bootstrap bootstrap )
- {
- this( bootstrap.config().group() );
- }
-
- public InternalPromise( EventExecutorGroup eventExecutorGroup )
- {
- this( eventExecutorGroup.next() );
- }
-
- public InternalPromise( EventExecutor eventExecutor )
- {
- this.eventExecutor = eventExecutor;
- this.delegate = eventExecutor.newPromise();
- }
-
- @Override
- public EventExecutor eventExecutor()
- {
- return eventExecutor;
- }
-
- @Override
- public InternalFuture thenApply( Function fn )
- {
- return Futures.thenApply( this, fn );
- }
-
- @Override
- public InternalFuture thenCompose( Function> fn )
- {
- return Futures.thenCompose( this, fn );
- }
-
- @Override
- public InternalFuture whenComplete( BiConsumer action )
- {
- return Futures.whenComplete( this, action );
- }
-
- @Override
- public void addListener( final ResponseListener listener )
- {
- delegate.addListener( new FutureListener()
- {
- @Override
- public void operationComplete( Future future )
- {
- if ( future.isSuccess() )
- {
- listener.operationCompleted( future.getNow(), null );
- }
- else
- {
- listener.operationCompleted( null, future.cause() );
- }
- }
- } );
- }
-
- @Override
- public InternalPromise setSuccess( T result )
- {
- delegate.setSuccess( result );
- return this;
- }
-
- @Override
- public boolean trySuccess( T result )
- {
- return delegate.trySuccess( result );
- }
-
- @Override
- public InternalPromise setFailure( Throwable cause )
- {
- delegate.setFailure( cause );
- return this;
- }
-
- @Override
- public boolean tryFailure( Throwable cause )
- {
- return delegate.tryFailure( cause );
- }
-
- @Override
- public boolean setUncancellable()
- {
- return delegate.setUncancellable();
- }
-
- @Override
- public InternalPromise addListener( GenericFutureListener extends Future super T>> listener )
- {
- delegate.addListener( listener );
- return this;
- }
-
- @Override
- public InternalPromise addListeners( GenericFutureListener extends Future super T>>... listeners )
- {
- delegate.addListeners( listeners );
- return this;
- }
-
- @Override
- public InternalPromise removeListener( GenericFutureListener extends Future super T>> listener )
- {
- delegate.removeListener( listener );
- return this;
- }
-
- @Override
- public InternalPromise removeListeners( GenericFutureListener extends Future super T>>... listeners )
- {
- delegate.removeListeners( listeners );
- return this;
- }
-
- @Override
- public InternalPromise await() throws InterruptedException
- {
- delegate.await();
- return this;
- }
-
- @Override
- public InternalPromise awaitUninterruptibly()
- {
- delegate.awaitUninterruptibly();
- return this;
- }
-
- @Override
- public InternalPromise sync() throws InterruptedException
- {
- delegate.sync();
- return this;
- }
-
- @Override
- public InternalPromise syncUninterruptibly()
- {
- delegate.syncUninterruptibly();
- return this;
- }
-
- @Override
- public boolean isSuccess()
- {
- return delegate.isSuccess();
- }
-
- @Override
- public boolean isCancellable()
- {
- return delegate.isCancellable();
- }
-
- @Override
- public Throwable cause()
- {
- return delegate.cause();
- }
-
- @Override
- public boolean await( long timeout, TimeUnit unit ) throws InterruptedException
- {
- return delegate.await( timeout, unit );
- }
-
- @Override
- public boolean await( long timeoutMillis ) throws InterruptedException
- {
- return delegate.await( timeoutMillis );
- }
-
- @Override
- public boolean awaitUninterruptibly( long timeout, TimeUnit unit )
- {
- return delegate.awaitUninterruptibly( timeout, unit );
- }
-
- @Override
- public boolean awaitUninterruptibly( long timeoutMillis )
- {
- return delegate.awaitUninterruptibly( timeoutMillis );
- }
-
- @Override
- public T getNow()
- {
- return delegate.getNow();
- }
-
- @Override
- public boolean cancel( boolean mayInterruptIfRunning )
- {
- return delegate.cancel( mayInterruptIfRunning );
- }
-
- @Override
- public boolean isCancelled()
- {
- return delegate.isCancelled();
- }
-
- @Override
- public boolean isDone()
- {
- return delegate.isDone();
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException
- {
- return delegate.get();
- }
-
- @Override
- public T get( long timeout, TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException
- {
- return delegate.get( timeout, unit );
- }
-}
diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
index bb2cd5ed4b..004ddceafb 100644
--- a/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
+++ b/driver/src/main/java/org/neo4j/driver/internal/async/InternalStatementResultCursor.java
@@ -18,18 +18,16 @@
*/
package org.neo4j.driver.internal.async;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.FutureListener;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Consumer;
import org.neo4j.driver.internal.handlers.PullAllResponseHandler;
import org.neo4j.driver.internal.handlers.RunResponseHandler;
-import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.Record;
-import org.neo4j.driver.v1.Response;
import org.neo4j.driver.v1.StatementResultCursor;
import org.neo4j.driver.v1.summary.ResultSummary;
@@ -37,16 +35,13 @@
public class InternalStatementResultCursor implements StatementResultCursor
{
- private final AsyncConnection connection;
private final RunResponseHandler runResponseHandler;
private final PullAllResponseHandler pullAllHandler;
- private InternalFuture peekedRecordResponse;
+ private CompletionStage peekedRecordFuture;
- public InternalStatementResultCursor( AsyncConnection connection, RunResponseHandler runResponseHandler,
- PullAllResponseHandler pullAllHandler )
+ public InternalStatementResultCursor( RunResponseHandler runResponseHandler, PullAllResponseHandler pullAllHandler )
{
- this.connection = requireNonNull( connection );
this.runResponseHandler = requireNonNull( runResponseHandler );
this.pullAllHandler = requireNonNull( pullAllHandler );
}
@@ -55,126 +50,101 @@ public InternalStatementResultCursor( AsyncConnection connection, RunResponseHan
public List keys()
{
List keys = runResponseHandler.statementKeys();
- return keys == null ? Collections.emptyList() : Collections.unmodifiableList( keys );
+ return keys == null ? Collections.emptyList() : Collections.unmodifiableList( keys );
}
@Override
- public Response summaryAsync()
+ public CompletionStage summaryAsync()
{
return pullAllHandler.summaryAsync();
}
@Override
- public Response nextAsync()
+ public CompletionStage nextAsync()
{
- return internalNextAsync();
+ if ( peekedRecordFuture != null )
+ {
+ CompletionStage result = peekedRecordFuture;
+ peekedRecordFuture = null;
+ return result;
+ }
+ else
+ {
+ return pullAllHandler.nextAsync();
+ }
}
@Override
- public Response peekAsync()
+ public CompletionStage peekAsync()
{
- if ( peekedRecordResponse == null )
+ if ( peekedRecordFuture == null )
{
- peekedRecordResponse = pullAllHandler.nextAsync();
+ peekedRecordFuture = pullAllHandler.nextAsync();
}
- return peekedRecordResponse;
+ return peekedRecordFuture;
}
@Override
- public Response forEachAsync( final Consumer action )
+ public CompletionStage forEachAsync( Consumer action )
{
- InternalPromise result = connection.newPromise();
- internalForEachAsync( action, result );
- return result;
+ CompletableFuture resultFuture = new CompletableFuture<>();
+ internalForEachAsync( action, resultFuture );
+ return resultFuture;
}
@Override
- public Response> listAsync()
+ public CompletionStage> listAsync()
{
- InternalPromise