Skip to content

Commit 9a2d07e

Browse files
committed
Use CompletableFuture internally
This commit makes driver use `CompletableFuture` instead of netty-based `InternalFuture` and `InternalPromise`.
1 parent dff58cb commit 9a2d07e

40 files changed

+406
-1085
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
import org.neo4j.driver.internal.async.AsyncConnection;
22-
import org.neo4j.driver.internal.async.InternalFuture;
24+
import org.neo4j.driver.internal.async.Futures;
2325
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
2426
import org.neo4j.driver.internal.net.BoltServerAddress;
2527
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -53,7 +55,7 @@ public PooledConnection acquireConnection( AccessMode mode )
5355
}
5456

5557
@Override
56-
public InternalFuture<AsyncConnection> acquireAsyncConnection( AccessMode mode )
58+
public CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode )
5759
{
5860
return asyncPool.acquire( address );
5961
}
@@ -62,7 +64,7 @@ public InternalFuture<AsyncConnection> acquireAsyncConnection( AccessMode mode )
6264
public void close() throws Exception
6365
{
6466
pool.close();
65-
asyncPool.closeAsync().syncUninterruptibly();
67+
Futures.getBlocking( asyncPool.closeAsync() );
6668
}
6769

6870
public BoltServerAddress getAddress()

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.neo4j.driver.internal.async.AsyncConnectorImpl;
3030
import org.neo4j.driver.internal.async.BootstrapFactory;
31+
import org.neo4j.driver.internal.async.Futures;
3132
import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
3233
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
3334
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
@@ -85,15 +86,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
8586
try
8687
{
8788
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
88-
asyncConnectionPool, eventLoopGroup );
89+
asyncConnectionPool );
8990
}
9091
catch ( Throwable driverError )
9192
{
9293
// we need to close the connection pool if driver creation threw exception
9394
try
9495
{
9596
connectionPool.close();
96-
asyncConnectionPool.closeAsync().syncUninterruptibly();
97+
Futures.getBlocking( asyncConnectionPool.closeAsync() );
9798
}
9899
catch ( Throwable closeError )
99100
{
@@ -120,19 +121,17 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
120121
}
121122

122123
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
123-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
124-
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
124+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
125+
AsyncConnectionPool asyncConnectionPool )
125126
{
126127
String scheme = uri.getScheme().toLowerCase();
127128
switch ( scheme )
128129
{
129130
case BOLT_URI_SCHEME:
130131
assertNoRoutingContext( uri, routingSettings );
131-
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
132-
eventExecutorGroup );
132+
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
133133
case BOLT_ROUTING_URI_SCHEME:
134-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
135-
eventExecutorGroup );
134+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
136135
default:
137136
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
138137
}
@@ -144,13 +143,12 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
144143
* <b>This method is protected only for testing</b>
145144
*/
146145
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
147-
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
148-
EventExecutorGroup eventExecutorGroup )
146+
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
149147
{
150148
ConnectionProvider connectionProvider =
151149
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
152150
SessionFactory sessionFactory =
153-
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
151+
createSessionFactory( connectionProvider, retryLogic, config );
154152
return createDriver( config, securityPlan, sessionFactory );
155153
}
156154

@@ -160,16 +158,14 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
160158
* <b>This method is protected only for testing</b>
161159
*/
162160
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
163-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
164-
EventExecutorGroup eventExecutorGroup )
161+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
165162
{
166163
if ( !securityPlan.isRoutingCompatible() )
167164
{
168165
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
169166
}
170167
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
171-
SessionFactory sessionFactory =
172-
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
168+
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
173169
return createDriver( config, securityPlan, sessionFactory );
174170
}
175171

@@ -251,9 +247,9 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
251247
* <b>This method is protected only for testing</b>
252248
*/
253249
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
254-
EventExecutorGroup eventExecutorGroup, Config config )
250+
Config config )
255251
{
256-
return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
252+
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
257253
}
258254

259255
/**

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 31 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020

2121
import java.util.Collections;
2222
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.CompletionStage;
25+
import java.util.function.BiConsumer;
2426

2527
import org.neo4j.driver.ResultResourcesHandler;
2628
import org.neo4j.driver.internal.async.AsyncConnection;
27-
import org.neo4j.driver.internal.async.InternalFuture;
28-
import org.neo4j.driver.internal.async.InternalPromise;
2929
import org.neo4j.driver.internal.async.QueryRunner;
3030
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
3131
import org.neo4j.driver.internal.handlers.BookmarkResponseHandler;
@@ -34,7 +34,6 @@
3434
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
3535
import org.neo4j.driver.internal.spi.Connection;
3636
import org.neo4j.driver.internal.types.InternalTypeSystem;
37-
import org.neo4j.driver.internal.util.BiConsumer;
3837
import org.neo4j.driver.v1.Record;
3938
import org.neo4j.driver.v1.Statement;
4039
import org.neo4j.driver.v1.StatementResult;
@@ -45,9 +44,9 @@
4544
import org.neo4j.driver.v1.exceptions.ClientException;
4645
import org.neo4j.driver.v1.exceptions.Neo4jException;
4746
import org.neo4j.driver.v1.types.TypeSystem;
48-
import org.neo4j.driver.v1.util.Function;
4947

50-
import static org.neo4j.driver.internal.async.Futures.asCompletionStage;
48+
import static java.util.concurrent.CompletableFuture.completedFuture;
49+
import static org.neo4j.driver.internal.async.Futures.failedFuture;
5150
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
5251
import static org.neo4j.driver.v1.Values.ofValue;
5352
import static org.neo4j.driver.v1.Values.value;
@@ -119,25 +118,23 @@ public void begin( Bookmark initialBookmark )
119118
}
120119
}
121120

122-
public InternalFuture<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
121+
public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
123122
{
124-
InternalPromise<ExplicitTransaction> beginTxPromise = asyncConnection.newPromise();
125-
126123
Map<String,Value> parameters = initialBookmark.asBeginTransactionParameters();
127124
asyncConnection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );
128125

129126
if ( initialBookmark.isEmpty() )
130127
{
131128
asyncConnection.pullAll( NoOpResponseHandler.INSTANCE );
132-
beginTxPromise.setSuccess( this );
129+
return completedFuture( this );
133130
}
134131
else
135132
{
136-
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginTxPromise, this ) );
133+
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
134+
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginFuture, this ) );
137135
asyncConnection.flush();
136+
return beginFuture;
138137
}
139-
140-
return beginTxPromise;
141138
}
142139

143140
@Override
@@ -217,20 +214,14 @@ private void rollbackTx()
217214

218215
@Override
219216
public CompletionStage<Void> commitAsync()
220-
{
221-
return asCompletionStage( internalCommitAsync() );
222-
}
223-
224-
InternalFuture<Void> internalCommitAsync()
225217
{
226218
if ( state == State.COMMITTED )
227219
{
228-
return asyncConnection.<Void>newPromise().setSuccess( null );
220+
return completedFuture( null );
229221
}
230222
else if ( state == State.ROLLED_BACK )
231223
{
232-
return asyncConnection.<Void>newPromise().setFailure(
233-
new ClientException( "Can't commit, transaction has already been rolled back" ) );
224+
return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) );
234225
}
235226
else
236227
{
@@ -240,20 +231,14 @@ else if ( state == State.ROLLED_BACK )
240231

241232
@Override
242233
public CompletionStage<Void> rollbackAsync()
243-
{
244-
return asCompletionStage( internalRollbackAsync() );
245-
}
246-
247-
InternalFuture<Void> internalRollbackAsync()
248234
{
249235
if ( state == State.COMMITTED )
250236
{
251-
return asyncConnection.<Void>newPromise()
252-
.setFailure( new ClientException( "Can't rollback, transaction has already been committed" ) );
237+
return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) );
253238
}
254239
else if ( state == State.ROLLED_BACK )
255240
{
256-
return asyncConnection.<Void>newPromise().setSuccess( null );
241+
return completedFuture( null );
257242
}
258243
else
259244
{
@@ -263,51 +248,39 @@ else if ( state == State.ROLLED_BACK )
263248

264249
private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
265250
{
266-
return new BiConsumer<Void,Throwable>()
251+
return ( ignore, error ) ->
267252
{
268-
@Override
269-
public void accept( Void result, Throwable error )
270-
{
271-
asyncConnection.release();
272-
session.asyncTransactionClosed( ExplicitTransaction.this );
273-
}
253+
asyncConnection.release();
254+
session.asyncTransactionClosed( ExplicitTransaction.this );
274255
};
275256
}
276257

277-
private InternalFuture<Void> doCommitAsync()
258+
private CompletionStage<Void> doCommitAsync()
278259
{
279-
InternalPromise<Void> commitTxPromise = asyncConnection.newPromise();
260+
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
280261

281-
asyncConnection.run( COMMIT_QUERY, Collections.<String,Value>emptyMap(), NoOpResponseHandler.INSTANCE );
282-
asyncConnection.pullAll( new CommitTxResponseHandler( commitTxPromise, this ) );
262+
asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
263+
asyncConnection.pullAll( new CommitTxResponseHandler( commitFuture, this ) );
283264
asyncConnection.flush();
284265

285-
return commitTxPromise.thenApply( new Function<Void,Void>()
266+
return commitFuture.thenApply( ignore ->
286267
{
287-
@Override
288-
public Void apply( Void ignore )
289-
{
290-
ExplicitTransaction.this.state = State.COMMITTED;
291-
return null;
292-
}
268+
ExplicitTransaction.this.state = State.COMMITTED;
269+
return null;
293270
} );
294271
}
295272

296-
private InternalFuture<Void> doRollbackAsync()
273+
private CompletionStage<Void> doRollbackAsync()
297274
{
298-
InternalPromise<Void> rollbackTxPromise = asyncConnection.newPromise();
299-
asyncConnection.run( ROLLBACK_QUERY, Collections.<String,Value>emptyMap(), NoOpResponseHandler.INSTANCE );
300-
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackTxPromise ) );
275+
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
276+
asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
277+
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackFuture ) );
301278
asyncConnection.flush();
302279

303-
return rollbackTxPromise.thenApply( new Function<Void,Void>()
280+
return rollbackFuture.thenApply( ignore ->
304281
{
305-
@Override
306-
public Void apply( Void ignore )
307-
{
308-
ExplicitTransaction.this.state = State.ROLLED_BACK;
309-
return null;
310-
}
282+
ExplicitTransaction.this.state = State.ROLLED_BACK;
283+
return null;
311284
} );
312285
}
313286

@@ -393,7 +366,7 @@ public StatementResult run( Statement statement )
393366
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
394367
{
395368
ensureNotFailed();
396-
return asCompletionStage( QueryRunner.runAsync( asyncConnection, statement, this ) );
369+
return QueryRunner.runAsync( asyncConnection, statement, this );
397370
}
398371

399372
@Override

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import io.netty.util.concurrent.EventExecutorGroup;
22-
2321
import org.neo4j.driver.internal.retry.RetryLogic;
2422
import org.neo4j.driver.internal.spi.ConnectionProvider;
2523
import org.neo4j.driver.v1.AccessMode;
@@ -32,9 +30,9 @@ class LeakLoggingNetworkSession extends NetworkSession
3230
private final String stackTrace;
3331

3432
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
35-
EventExecutorGroup eventExecutorGroup, Logging logging )
33+
Logging logging )
3634
{
37-
super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
35+
super( connectionProvider, mode, retryLogic, logging );
3836
this.stackTrace = captureStackTrace();
3937
}
4038

0 commit comments

Comments
 (0)