Skip to content

Commit 4b3a45e

Browse files
authored
Merge pull request #406 from lutovich/1.5-completable-future
Java 8, CompletionStage in async and stress test improvements
2 parents f29988f + 93a29b7 commit 4b3a45e

File tree

79 files changed

+2566
-2002
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+2566
-2002
lines changed

driver/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@
104104
<artifactId>maven-compiler-plugin</artifactId>
105105
<version>2.3.2</version>
106106
<configuration>
107-
<source>1.7</source>
108-
<target>1.7</target>
107+
<source>${java.version}</source>
108+
<target>${java.version}</target>
109109
</configuration>
110110
</plugin>
111111
<plugin>

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import java.util.concurrent.CompletionStage;
22+
2123
import org.neo4j.driver.internal.async.AsyncConnection;
22-
import org.neo4j.driver.internal.async.InternalFuture;
24+
import org.neo4j.driver.internal.async.Futures;
2325
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
2426
import org.neo4j.driver.internal.net.BoltServerAddress;
2527
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -53,7 +55,7 @@ public PooledConnection acquireConnection( AccessMode mode )
5355
}
5456

5557
@Override
56-
public InternalFuture<AsyncConnection> acquireAsyncConnection( AccessMode mode )
58+
public CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode )
5759
{
5860
return asyncPool.acquire( address );
5961
}
@@ -62,7 +64,7 @@ public InternalFuture<AsyncConnection> acquireAsyncConnection( AccessMode mode )
6264
public void close() throws Exception
6365
{
6466
pool.close();
65-
asyncPool.closeAsync().syncUninterruptibly();
67+
Futures.getBlocking( asyncPool.closeAsync() );
6668
}
6769

6870
public BoltServerAddress getAddress()

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.neo4j.driver.internal.async.AsyncConnectorImpl;
3030
import org.neo4j.driver.internal.async.BootstrapFactory;
31+
import org.neo4j.driver.internal.async.Futures;
3132
import org.neo4j.driver.internal.async.pool.ActiveChannelTracker;
3233
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
3334
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
@@ -85,15 +86,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
8586
try
8687
{
8788
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
88-
asyncConnectionPool, eventLoopGroup );
89+
asyncConnectionPool );
8990
}
9091
catch ( Throwable driverError )
9192
{
9293
// we need to close the connection pool if driver creation threw exception
9394
try
9495
{
9596
connectionPool.close();
96-
asyncConnectionPool.closeAsync().syncUninterruptibly();
97+
Futures.getBlocking( asyncConnectionPool.closeAsync() );
9798
}
9899
catch ( Throwable closeError )
99100
{
@@ -120,19 +121,17 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
120121
}
121122

122123
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
123-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
124-
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
124+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
125+
AsyncConnectionPool asyncConnectionPool )
125126
{
126127
String scheme = uri.getScheme().toLowerCase();
127128
switch ( scheme )
128129
{
129130
case BOLT_URI_SCHEME:
130131
assertNoRoutingContext( uri, routingSettings );
131-
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
132-
eventExecutorGroup );
132+
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
133133
case BOLT_ROUTING_URI_SCHEME:
134-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
135-
eventExecutorGroup );
134+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
136135
default:
137136
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
138137
}
@@ -144,13 +143,12 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
144143
* <b>This method is protected only for testing</b>
145144
*/
146145
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
147-
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
148-
EventExecutorGroup eventExecutorGroup )
146+
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
149147
{
150148
ConnectionProvider connectionProvider =
151149
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
152150
SessionFactory sessionFactory =
153-
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
151+
createSessionFactory( connectionProvider, retryLogic, config );
154152
return createDriver( config, securityPlan, sessionFactory );
155153
}
156154

@@ -160,16 +158,14 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
160158
* <b>This method is protected only for testing</b>
161159
*/
162160
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
163-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
164-
EventExecutorGroup eventExecutorGroup )
161+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
165162
{
166163
if ( !securityPlan.isRoutingCompatible() )
167164
{
168165
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
169166
}
170167
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
171-
SessionFactory sessionFactory =
172-
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
168+
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
173169
return createDriver( config, securityPlan, sessionFactory );
174170
}
175171

@@ -251,9 +247,9 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
251247
* <b>This method is protected only for testing</b>
252248
*/
253249
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
254-
EventExecutorGroup eventExecutorGroup, Config config )
250+
Config config )
255251
{
256-
return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
252+
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
257253
}
258254

259255
/**

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 39 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020

2121
import java.util.Collections;
2222
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CompletionStage;
25+
import java.util.function.BiConsumer;
2326

2427
import org.neo4j.driver.ResultResourcesHandler;
2528
import org.neo4j.driver.internal.async.AsyncConnection;
26-
import org.neo4j.driver.internal.async.InternalFuture;
27-
import org.neo4j.driver.internal.async.InternalPromise;
2829
import org.neo4j.driver.internal.async.QueryRunner;
2930
import org.neo4j.driver.internal.handlers.BeginTxResponseHandler;
3031
import org.neo4j.driver.internal.handlers.BookmarkResponseHandler;
@@ -33,9 +34,7 @@
3334
import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler;
3435
import org.neo4j.driver.internal.spi.Connection;
3536
import org.neo4j.driver.internal.types.InternalTypeSystem;
36-
import org.neo4j.driver.internal.util.BiConsumer;
3737
import org.neo4j.driver.v1.Record;
38-
import org.neo4j.driver.v1.Response;
3938
import org.neo4j.driver.v1.Statement;
4039
import org.neo4j.driver.v1.StatementResult;
4140
import org.neo4j.driver.v1.StatementResultCursor;
@@ -45,8 +44,9 @@
4544
import org.neo4j.driver.v1.exceptions.ClientException;
4645
import org.neo4j.driver.v1.exceptions.Neo4jException;
4746
import org.neo4j.driver.v1.types.TypeSystem;
48-
import org.neo4j.driver.v1.util.Function;
4947

48+
import static java.util.concurrent.CompletableFuture.completedFuture;
49+
import static org.neo4j.driver.internal.async.Futures.failedFuture;
5050
import static org.neo4j.driver.internal.util.ErrorUtil.isRecoverable;
5151
import static org.neo4j.driver.v1.Values.ofValue;
5252
import static org.neo4j.driver.v1.Values.value;
@@ -118,25 +118,23 @@ public void begin( Bookmark initialBookmark )
118118
}
119119
}
120120

121-
public InternalFuture<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
121+
public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark )
122122
{
123-
InternalPromise<ExplicitTransaction> beginTxPromise = asyncConnection.newPromise();
124-
125123
Map<String,Value> parameters = initialBookmark.asBeginTransactionParameters();
126124
asyncConnection.run( BEGIN_QUERY, parameters, NoOpResponseHandler.INSTANCE );
127125

128126
if ( initialBookmark.isEmpty() )
129127
{
130128
asyncConnection.pullAll( NoOpResponseHandler.INSTANCE );
131-
beginTxPromise.setSuccess( this );
129+
return completedFuture( this );
132130
}
133131
else
134132
{
135-
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginTxPromise, this ) );
133+
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
134+
asyncConnection.pullAll( new BeginTxResponseHandler<>( beginFuture, this ) );
136135
asyncConnection.flush();
136+
return beginFuture;
137137
}
138-
139-
return beginTxPromise;
140138
}
141139

142140
@Override
@@ -215,21 +213,15 @@ private void rollbackTx()
215213
}
216214

217215
@Override
218-
public Response<Void> commitAsync()
219-
{
220-
return internalCommitAsync();
221-
}
222-
223-
InternalFuture<Void> internalCommitAsync()
216+
public CompletionStage<Void> commitAsync()
224217
{
225218
if ( state == State.COMMITTED )
226219
{
227-
return asyncConnection.<Void>newPromise().setSuccess( null );
220+
return completedFuture( null );
228221
}
229222
else if ( state == State.ROLLED_BACK )
230223
{
231-
return asyncConnection.<Void>newPromise().setFailure(
232-
new ClientException( "Can't commit, transaction has already been rolled back" ) );
224+
return failedFuture( new ClientException( "Can't commit, transaction has already been rolled back" ) );
233225
}
234226
else
235227
{
@@ -238,21 +230,15 @@ else if ( state == State.ROLLED_BACK )
238230
}
239231

240232
@Override
241-
public Response<Void> rollbackAsync()
242-
{
243-
return internalRollbackAsync();
244-
}
245-
246-
InternalFuture<Void> internalRollbackAsync()
233+
public CompletionStage<Void> rollbackAsync()
247234
{
248235
if ( state == State.COMMITTED )
249236
{
250-
return asyncConnection.<Void>newPromise()
251-
.setFailure( new ClientException( "Can't rollback, transaction has already been committed" ) );
237+
return failedFuture( new ClientException( "Can't rollback, transaction has already been committed" ) );
252238
}
253239
else if ( state == State.ROLLED_BACK )
254240
{
255-
return asyncConnection.<Void>newPromise().setSuccess( null );
241+
return completedFuture( null );
256242
}
257243
else
258244
{
@@ -262,51 +248,39 @@ else if ( state == State.ROLLED_BACK )
262248

263249
private BiConsumer<Void,Throwable> releaseConnectionAndNotifySession()
264250
{
265-
return new BiConsumer<Void,Throwable>()
251+
return ( ignore, error ) ->
266252
{
267-
@Override
268-
public void accept( Void result, Throwable error )
269-
{
270-
asyncConnection.release();
271-
session.asyncTransactionClosed( ExplicitTransaction.this );
272-
}
253+
asyncConnection.release();
254+
session.asyncTransactionClosed( ExplicitTransaction.this );
273255
};
274256
}
275257

276-
private InternalFuture<Void> doCommitAsync()
258+
private CompletionStage<Void> doCommitAsync()
277259
{
278-
InternalPromise<Void> commitTxPromise = asyncConnection.newPromise();
260+
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
279261

280-
asyncConnection.run( COMMIT_QUERY, Collections.<String,Value>emptyMap(), NoOpResponseHandler.INSTANCE );
281-
asyncConnection.pullAll( new CommitTxResponseHandler( commitTxPromise, this ) );
262+
asyncConnection.run( COMMIT_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
263+
asyncConnection.pullAll( new CommitTxResponseHandler( commitFuture, this ) );
282264
asyncConnection.flush();
283265

284-
return commitTxPromise.thenApply( new Function<Void,Void>()
266+
return commitFuture.thenApply( ignore ->
285267
{
286-
@Override
287-
public Void apply( Void ignore )
288-
{
289-
ExplicitTransaction.this.state = State.COMMITTED;
290-
return null;
291-
}
268+
ExplicitTransaction.this.state = State.COMMITTED;
269+
return null;
292270
} );
293271
}
294272

295-
private InternalFuture<Void> doRollbackAsync()
273+
private CompletionStage<Void> doRollbackAsync()
296274
{
297-
InternalPromise<Void> rollbackTxPromise = asyncConnection.newPromise();
298-
asyncConnection.run( ROLLBACK_QUERY, Collections.<String,Value>emptyMap(), NoOpResponseHandler.INSTANCE );
299-
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackTxPromise ) );
275+
CompletableFuture<Void> rollbackFuture = new CompletableFuture<>();
276+
asyncConnection.run( ROLLBACK_QUERY, Collections.emptyMap(), NoOpResponseHandler.INSTANCE );
277+
asyncConnection.pullAll( new RollbackTxResponseHandler( rollbackFuture ) );
300278
asyncConnection.flush();
301279

302-
return rollbackTxPromise.thenApply( new Function<Void,Void>()
280+
return rollbackFuture.thenApply( ignore ->
303281
{
304-
@Override
305-
public Void apply( Void ignore )
306-
{
307-
ExplicitTransaction.this.state = State.ROLLED_BACK;
308-
return null;
309-
}
282+
ExplicitTransaction.this.state = State.ROLLED_BACK;
283+
return null;
310284
} );
311285
}
312286

@@ -317,7 +291,7 @@ public StatementResult run( String statementText, Value statementParameters )
317291
}
318292

319293
@Override
320-
public Response<StatementResultCursor> runAsync( String statementText, Value parameters )
294+
public CompletionStage<StatementResultCursor> runAsync( String statementText, Value parameters )
321295
{
322296
return runAsync( new Statement( statementText, parameters ) );
323297
}
@@ -329,7 +303,7 @@ public StatementResult run( String statementText )
329303
}
330304

331305
@Override
332-
public Response<StatementResultCursor> runAsync( String statementTemplate )
306+
public CompletionStage<StatementResultCursor> runAsync( String statementTemplate )
333307
{
334308
return runAsync( statementTemplate, Values.EmptyMap );
335309
}
@@ -342,7 +316,8 @@ public StatementResult run( String statementText, Map<String,Object> statementPa
342316
}
343317

344318
@Override
345-
public Response<StatementResultCursor> runAsync( String statementTemplate, Map<String,Object> statementParameters )
319+
public CompletionStage<StatementResultCursor> runAsync( String statementTemplate,
320+
Map<String,Object> statementParameters )
346321
{
347322
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters );
348323
return runAsync( statementTemplate, params );
@@ -356,7 +331,7 @@ public StatementResult run( String statementTemplate, Record statementParameters
356331
}
357332

358333
@Override
359-
public Response<StatementResultCursor> runAsync( String statementTemplate, Record statementParameters )
334+
public CompletionStage<StatementResultCursor> runAsync( String statementTemplate, Record statementParameters )
360335
{
361336
Value params = statementParameters == null ? Values.EmptyMap : value( statementParameters.asMap() );
362337
return runAsync( statementTemplate, params );
@@ -388,7 +363,7 @@ public StatementResult run( Statement statement )
388363
}
389364

390365
@Override
391-
public Response<StatementResultCursor> runAsync( Statement statement )
366+
public CompletionStage<StatementResultCursor> runAsync( Statement statement )
392367
{
393368
ensureNotFailed();
394369
return QueryRunner.runAsync( asyncConnection, statement, this );

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import io.netty.util.concurrent.EventExecutorGroup;
22-
2321
import org.neo4j.driver.internal.retry.RetryLogic;
2422
import org.neo4j.driver.internal.spi.ConnectionProvider;
2523
import org.neo4j.driver.v1.AccessMode;
@@ -32,9 +30,9 @@ class LeakLoggingNetworkSession extends NetworkSession
3230
private final String stackTrace;
3331

3432
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
35-
EventExecutorGroup eventExecutorGroup, Logging logging )
33+
Logging logging )
3634
{
37-
super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
35+
super( connectionProvider, mode, retryLogic, logging );
3836
this.stackTrace = captureStackTrace();
3937
}
4038

0 commit comments

Comments
 (0)