Skip to content

Commit be6225d

Browse files
committed
Added async transaction functions
Via `Session#readTransactionAsync()` and `#writeTransactionAsync()`. Both accept a single parameter - async function that takes a transaction and returns a `Response`. When function returns a failed response or throws exception it will be retried. Retries only happen on non-fatal & transient errors. API is not fully complete yet - there is no way to create a `Result` object so it'll be inconvenient to use. Helpers to create some sort of "settable" response will come in subsequent commits.
1 parent af7c2a6 commit be6225d

15 files changed

+355
-35
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal;
2020

2121
import io.netty.bootstrap.Bootstrap;
22+
import io.netty.channel.EventLoopGroup;
2223
import io.netty.util.concurrent.EventExecutorGroup;
2324

2425
import java.io.IOException;
@@ -75,15 +76,16 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7576
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
7677

7778
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
78-
RetryLogic retryLogic = createRetryLogic( retrySettings, bootstrap.config().group(), config.logging() );
79+
EventLoopGroup eventLoopGroup = bootstrap.config().group();
80+
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );
7981

8082
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
8183
config );
8284

8385
try
8486
{
8587
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
86-
asyncConnectionPool );
88+
asyncConnectionPool, eventLoopGroup );
8789
}
8890
catch ( Throwable driverError )
8991
{
@@ -119,16 +121,18 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
119121

120122
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
121123
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
122-
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
124+
RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup )
123125
{
124126
String scheme = uri.getScheme().toLowerCase();
125127
switch ( scheme )
126128
{
127129
case BOLT_URI_SCHEME:
128130
assertNoRoutingContext( uri, routingSettings );
129-
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
131+
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool,
132+
eventExecutorGroup );
130133
case BOLT_ROUTING_URI_SCHEME:
131-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
134+
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
135+
eventExecutorGroup );
132136
default:
133137
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
134138
}
@@ -140,11 +144,13 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
140144
* <b>This method is protected only for testing</b>
141145
*/
142146
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
143-
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
147+
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool,
148+
EventExecutorGroup eventExecutorGroup )
144149
{
145150
ConnectionProvider connectionProvider =
146151
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
147-
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
152+
SessionFactory sessionFactory =
153+
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
148154
return createDriver( config, securityPlan, sessionFactory );
149155
}
150156

@@ -154,14 +160,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
154160
* <b>This method is protected only for testing</b>
155161
*/
156162
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
157-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
163+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
164+
EventExecutorGroup eventExecutorGroup )
158165
{
159166
if ( !securityPlan.isRoutingCompatible() )
160167
{
161168
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
162169
}
163170
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
164-
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
171+
SessionFactory sessionFactory =
172+
createSessionFactory( connectionProvider, retryLogic, eventExecutorGroup, config );
165173
return createDriver( config, securityPlan, sessionFactory );
166174
}
167175

@@ -242,10 +250,10 @@ protected Connector createConnector( final ConnectionSettings connectionSettings
242250
* <p>
243251
* <b>This method is protected only for testing</b>
244252
*/
245-
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider,
246-
RetryLogic retryLogic, Config config )
253+
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, RetryLogic retryLogic,
254+
EventExecutorGroup eventExecutorGroup, Config config )
247255
{
248-
return new SessionFactoryImpl( connectionProvider, retryLogic, config );
256+
return new SessionFactoryImpl( connectionProvider, retryLogic, eventExecutorGroup, config );
249257
}
250258

251259
/**

driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import io.netty.util.concurrent.EventExecutorGroup;
22+
2123
import org.neo4j.driver.internal.retry.RetryLogic;
2224
import org.neo4j.driver.internal.spi.ConnectionProvider;
2325
import org.neo4j.driver.v1.AccessMode;
@@ -30,9 +32,9 @@ class LeakLoggingNetworkSession extends NetworkSession
3032
private final String stackTrace;
3133

3234
LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
33-
Logging logging )
35+
EventExecutorGroup eventExecutorGroup, Logging logging )
3436
{
35-
super( connectionProvider, mode, retryLogic, logging );
37+
super( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
3638
this.stackTrace = captureStackTrace();
3739
}
3840

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import io.netty.util.concurrent.GlobalEventExecutor;
21+
import io.netty.util.concurrent.EventExecutorGroup;
22+
import io.netty.util.concurrent.Future;
23+
import io.netty.util.concurrent.FutureListener;
2224

2325
import java.util.Map;
2426
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,6 +42,7 @@
4042
import org.neo4j.driver.v1.Logging;
4143
import org.neo4j.driver.v1.Record;
4244
import org.neo4j.driver.v1.Response;
45+
import org.neo4j.driver.v1.ResponseListener;
4346
import org.neo4j.driver.v1.Session;
4447
import org.neo4j.driver.v1.Statement;
4548
import org.neo4j.driver.v1.StatementResult;
@@ -61,6 +64,7 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR
6164
private final ConnectionProvider connectionProvider;
6265
private final AccessMode mode;
6366
private final RetryLogic retryLogic;
67+
private final EventExecutorGroup eventExecutorGroup;
6468
protected final Logger logger;
6569

6670
private volatile Bookmark bookmark = Bookmark.empty();
@@ -73,11 +77,12 @@ public class NetworkSession implements Session, SessionResourcesHandler, ResultR
7377
private final AtomicBoolean isOpen = new AtomicBoolean( true );
7478

7579
public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, RetryLogic retryLogic,
76-
Logging logging )
80+
EventExecutorGroup eventExecutorGroup, Logging logging )
7781
{
7882
this.connectionProvider = connectionProvider;
7983
this.mode = mode;
8084
this.retryLogic = retryLogic;
85+
this.eventExecutorGroup = eventExecutorGroup;
8186
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
8287
}
8388

@@ -262,7 +267,7 @@ public InternalFuture<Void> apply( ExplicitTransaction tx )
262267
}
263268
else
264269
{
265-
return new InternalPromise<Void>( GlobalEventExecutor.INSTANCE ).setSuccess( null );
270+
return new InternalPromise<Void>( eventExecutorGroup ).setSuccess( null );
266271
}
267272
}
268273

@@ -293,12 +298,24 @@ public <T> T readTransaction( TransactionWork<T> work )
293298
return transaction( AccessMode.READ, work );
294299
}
295300

301+
@Override
302+
public <T> Response<T> readTransactionAsync( TransactionWork<Response<T>> work )
303+
{
304+
return transactionAsync( AccessMode.READ, work );
305+
}
306+
296307
@Override
297308
public <T> T writeTransaction( TransactionWork<T> work )
298309
{
299310
return transaction( AccessMode.WRITE, work );
300311
}
301312

313+
@Override
314+
public <T> Response<T> writeTransactionAsync( TransactionWork<Response<T>> work )
315+
{
316+
return transactionAsync( AccessMode.WRITE, work );
317+
}
318+
302319
void setBookmark( Bookmark bookmark )
303320
{
304321
if ( bookmark != null && !bookmark.isEmpty() )
@@ -398,6 +415,113 @@ public T get()
398415
} );
399416
}
400417

418+
private <T> InternalFuture<T> transactionAsync( final AccessMode mode, final TransactionWork<Response<T>> work )
419+
{
420+
return retryLogic.retryAsync( new Supplier<InternalFuture<T>>()
421+
{
422+
@Override
423+
public InternalFuture<T> get()
424+
{
425+
final InternalFuture<ExplicitTransaction> txFuture = beginTransactionAsync( mode );
426+
final InternalPromise<T> resultPromise = new InternalPromise<>( txFuture.eventExecutor() );
427+
428+
txFuture.addListener( new FutureListener<ExplicitTransaction>()
429+
{
430+
@Override
431+
public void operationComplete( Future<ExplicitTransaction> future ) throws Exception
432+
{
433+
if ( future.isCancelled() )
434+
{
435+
resultPromise.cancel( true );
436+
}
437+
else if ( future.isSuccess() )
438+
{
439+
executeWork( resultPromise, future.getNow(), work );
440+
}
441+
else
442+
{
443+
resultPromise.setFailure( future.cause() );
444+
}
445+
}
446+
} );
447+
448+
return resultPromise;
449+
}
450+
} );
451+
}
452+
453+
private <T> void executeWork( final InternalPromise<T> resultPromise, final ExplicitTransaction tx,
454+
TransactionWork<Response<T>> work )
455+
{
456+
Response<T> workResponse = work.execute( tx );
457+
workResponse.addListener( new ResponseListener<T>()
458+
{
459+
@Override
460+
public void operationCompleted( T result, Throwable error )
461+
{
462+
if ( error != null )
463+
{
464+
rollbackTxAfterFailedTransactionWork( tx, resultPromise, error );
465+
}
466+
else
467+
{
468+
commitTxAfterSucceededTransactionWork( tx, resultPromise, result );
469+
}
470+
}
471+
} );
472+
}
473+
474+
private <T> void rollbackTxAfterFailedTransactionWork( ExplicitTransaction tx,
475+
final InternalPromise<T> resultPromise, final Throwable error )
476+
{
477+
if ( tx.isOpen() )
478+
{
479+
tx.rollbackAsync().addListener( new ResponseListener<Void>()
480+
{
481+
@Override
482+
public void operationCompleted( Void ignore, Throwable rollbackError )
483+
{
484+
if ( rollbackError != null )
485+
{
486+
error.addSuppressed( rollbackError );
487+
}
488+
resultPromise.setFailure( error );
489+
}
490+
} );
491+
}
492+
else
493+
{
494+
resultPromise.setFailure( error );
495+
}
496+
}
497+
498+
private <T> void commitTxAfterSucceededTransactionWork( ExplicitTransaction tx,
499+
final InternalPromise<T> resultPromise, final T result )
500+
{
501+
if ( tx.isOpen() )
502+
{
503+
tx.commitAsync().addListener( new ResponseListener<Void>()
504+
{
505+
@Override
506+
public void operationCompleted( Void ignore, Throwable commitError )
507+
{
508+
if ( commitError != null )
509+
{
510+
resultPromise.setFailure( commitError );
511+
}
512+
else
513+
{
514+
resultPromise.setSuccess( result );
515+
}
516+
}
517+
} );
518+
}
519+
else
520+
{
521+
resultPromise.setSuccess( result );
522+
}
523+
}
524+
401525
private synchronized Transaction beginTransaction( AccessMode mode )
402526
{
403527
ensureSessionIsOpen();
@@ -431,7 +555,6 @@ public InternalFuture<ExplicitTransaction> apply( AsyncConnection connection )
431555
}
432556
} );
433557

434-
//noinspection unchecked
435558
return currentAsyncTransactionFuture;
436559
}
437560

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import io.netty.util.concurrent.EventExecutorGroup;
22+
2123
import org.neo4j.driver.internal.retry.RetryLogic;
2224
import org.neo4j.driver.internal.spi.ConnectionProvider;
2325
import org.neo4j.driver.v1.AccessMode;
@@ -29,12 +31,15 @@ public class SessionFactoryImpl implements SessionFactory
2931
{
3032
private final ConnectionProvider connectionProvider;
3133
private final RetryLogic retryLogic;
34+
private final EventExecutorGroup eventExecutorGroup;
3235
private final Logging logging;
3336
private final boolean leakedSessionsLoggingEnabled;
3437

35-
SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic, Config config )
38+
SessionFactoryImpl( ConnectionProvider connectionProvider, RetryLogic retryLogic,
39+
EventExecutorGroup eventExecutorGroup, Config config )
3640
{
3741
this.connectionProvider = connectionProvider;
42+
this.eventExecutorGroup = eventExecutorGroup;
3843
this.leakedSessionsLoggingEnabled = config.logLeakedSessions();
3944
this.retryLogic = retryLogic;
4045
this.logging = config.logging();
@@ -51,9 +56,9 @@ public final Session newInstance( AccessMode mode, Bookmark bookmark )
5156
protected NetworkSession createSession( ConnectionProvider connectionProvider, RetryLogic retryLogic,
5257
AccessMode mode, Logging logging )
5358
{
54-
return leakedSessionsLoggingEnabled ?
55-
new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, logging ) :
56-
new NetworkSession( connectionProvider, mode, retryLogic, logging );
59+
return leakedSessionsLoggingEnabled
60+
? new LeakLoggingNetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging )
61+
: new NetworkSession( connectionProvider, mode, retryLogic, eventExecutorGroup, logging );
5762
}
5863

5964
@Override

driver/src/main/java/org/neo4j/driver/internal/async/InternalPromise.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.bootstrap.Bootstrap;
2222
import io.netty.util.concurrent.EventExecutor;
23+
import io.netty.util.concurrent.EventExecutorGroup;
2324
import io.netty.util.concurrent.Future;
2425
import io.netty.util.concurrent.FutureListener;
2526
import io.netty.util.concurrent.GenericFutureListener;
@@ -40,7 +41,12 @@ public class InternalPromise<T> implements InternalFuture<T>, Promise<T>
4041

4142
public InternalPromise( Bootstrap bootstrap )
4243
{
43-
this( bootstrap.config().group().next() );
44+
this( bootstrap.config().group() );
45+
}
46+
47+
public InternalPromise( EventExecutorGroup eventExecutorGroup )
48+
{
49+
this( eventExecutorGroup.next() );
4450
}
4551

4652
public InternalPromise( EventExecutor eventExecutor )

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public <T> T retry( Supplier<T> work )
124124
@Override
125125
public <T> InternalFuture<T> retryAsync( Supplier<InternalFuture<T>> work )
126126
{
127-
InternalPromise<T> result = new InternalPromise<>( eventExecutorGroup.next() );
127+
InternalPromise<T> result = new InternalPromise<>( eventExecutorGroup );
128128
executeWorkInEventLoop( result, work );
129129
return result;
130130
}

0 commit comments

Comments
 (0)