Skip to content

Commit aac5738

Browse files
authored
Merge pull request #426 from lutovich/1.5-access-mode-fix
Use AccessMode correctly in NetworkSession
2 parents 5c57d60 + d844095 commit aac5738

File tree

13 files changed

+342
-203
lines changed

13 files changed

+342
-203
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.neo4j.driver.internal.spi.Connection;
3636
import org.neo4j.driver.internal.spi.ResponseHandler;
3737
import org.neo4j.driver.internal.types.InternalTypeSystem;
38+
import org.neo4j.driver.internal.util.Futures;
3839
import org.neo4j.driver.v1.Record;
3940
import org.neo4j.driver.v1.Session;
4041
import org.neo4j.driver.v1.Statement;
@@ -115,7 +116,17 @@ public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark
115116
CompletableFuture<ExplicitTransaction> beginFuture = new CompletableFuture<>();
116117
connection.runAndFlush( BEGIN_QUERY, initialBookmark.asBeginTransactionParameters(),
117118
NoOpResponseHandler.INSTANCE, new BeginTxResponseHandler<>( beginFuture, this ) );
118-
return beginFuture;
119+
120+
return beginFuture.handle( ( tx, beginError ) ->
121+
{
122+
if ( beginError != null )
123+
{
124+
// release connection if begin failed, transaction can't be started
125+
connection.releaseNow();
126+
throw new CompletionException( Futures.completionErrorCause( beginError ) );
127+
}
128+
return tx;
129+
} );
119130
}
120131
}
121132

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 66 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import org.neo4j.driver.internal.async.InternalStatementResultCursor;
2828
import org.neo4j.driver.internal.async.QueryRunner;
29-
import org.neo4j.driver.internal.async.ResultCursorsHolder;
3029
import org.neo4j.driver.internal.logging.DelegatingLogger;
3130
import org.neo4j.driver.internal.retry.RetryLogic;
3231
import org.neo4j.driver.internal.spi.Connection;
@@ -60,12 +59,12 @@ public class NetworkSession implements Session
6059
private final ConnectionProvider connectionProvider;
6160
private final AccessMode mode;
6261
private final RetryLogic retryLogic;
63-
private final ResultCursorsHolder resultCursors;
6462
protected final Logger logger;
6563

6664
private volatile Bookmark bookmark = Bookmark.empty();
6765
private volatile CompletionStage<ExplicitTransaction> transactionStage = completedFuture( null );
6866
private volatile CompletionStage<Connection> connectionStage = completedFuture( null );
67+
private volatile CompletionStage<InternalStatementResultCursor> resultCursorStage = completedFuture( null );
6968

7069
private final AtomicBoolean open = new AtomicBoolean( true );
7170

@@ -75,7 +74,6 @@ public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, R
7574
this.connectionProvider = connectionProvider;
7675
this.mode = mode;
7776
this.retryLogic = retryLogic;
78-
this.resultCursors = new ResultCursorsHolder();
7977
this.logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
8078
}
8179

@@ -163,22 +161,28 @@ public CompletionStage<Void> closeAsync()
163161
{
164162
if ( open.compareAndSet( true, false ) )
165163
{
166-
return resultCursors.retrieveNotConsumedError()
167-
.thenCompose( error -> releaseResources().thenApply( ignore ->
168-
{
169-
Throwable queryError = Futures.completionErrorCause( error );
170-
if ( queryError != null )
171-
{
172-
// connection has been acquired and there is an unconsumed error in result cursor
173-
throw new CompletionException( queryError );
174-
}
175-
else
176-
{
177-
// either connection acquisition failed or
178-
// there are no unconsumed errors in the result cursor
179-
return null;
180-
}
181-
} ) );
164+
return resultCursorStage.thenCompose( cursor ->
165+
{
166+
if ( cursor == null )
167+
{
168+
return completedFuture( null );
169+
}
170+
return cursor.failureAsync();
171+
} ).thenCompose( error -> releaseResources().thenApply( ignore ->
172+
{
173+
Throwable queryError = Futures.completionErrorCause( error );
174+
if ( queryError != null )
175+
{
176+
// connection has been acquired and there is an unconsumed error in result cursor
177+
throw new CompletionException( queryError );
178+
}
179+
else
180+
{
181+
// either connection acquisition failed or
182+
// there are no unconsumed errors in the result cursor
183+
return null;
184+
}
185+
} ) );
182186
}
183187
return completedFuture( null );
184188
}
@@ -275,7 +279,7 @@ CompletionStage<Boolean> currentConnectionIsOpen()
275279
return connectionStage.handle( ( connection, error ) ->
276280
error == null && // no acquisition error
277281
connection != null && // some connection has actually been acquired
278-
connection.isInUse() ); // and it's still being used
282+
connection.isOpen() ); // and it's still open
279283
}
280284

281285
private <T> T transaction( AccessMode mode, TransactionWork<T> work )
@@ -412,7 +416,7 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
412416
{
413417
ensureSessionIsOpen();
414418

415-
CompletionStage<InternalStatementResultCursor> cursorStage = ensureNoOpenTxBeforeRunningQuery()
419+
CompletionStage<InternalStatementResultCursor> newResultCursorStage = ensureNoOpenTxBeforeRunningQuery()
416420
.thenCompose( ignore -> acquireConnection( mode ) )
417421
.thenCompose( connection ->
418422
{
@@ -426,8 +430,9 @@ private CompletionStage<InternalStatementResultCursor> runAsync( Statement state
426430
}
427431
} );
428432

429-
resultCursors.add( cursorStage );
430-
return cursorStage;
433+
resultCursorStage = newResultCursorStage.exceptionally( error -> null );
434+
435+
return newResultCursorStage;
431436
}
432437

433438
private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode mode )
@@ -447,28 +452,46 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m
447452

448453
private CompletionStage<Connection> acquireConnection( AccessMode mode )
449454
{
450-
// memorize in local so same instance is transformed and used in callbacks
451-
CompletionStage<Connection> currentAsyncConnectionStage = connectionStage;
455+
CompletionStage<Connection> currentConnectionStage = connectionStage;
452456

453-
connectionStage = currentAsyncConnectionStage
454-
.exceptionally( error -> null ) // handle previous acquisition failures
455-
.thenCompose( connection ->
456-
{
457-
if ( connection != null && connection.tryMarkInUse() )
458-
{
459-
// previous acquisition attempt was successful and connection has not been released yet
460-
// continue using same connection
461-
return currentAsyncConnectionStage;
462-
}
463-
else
464-
{
465-
// previous acquisition attempt failed or connection has been released
466-
// acquire new connection
467-
return connectionProvider.acquireConnection( mode );
468-
}
469-
} );
457+
CompletionStage<Connection> newConnectionStage = resultCursorStage.thenCompose( cursor ->
458+
{
459+
if ( cursor == null )
460+
{
461+
return completedFuture( null );
462+
}
463+
// make sure previous result is fully consumed and connection is released back to the pool
464+
return cursor.failureAsync();
465+
} ).thenCompose( error ->
466+
{
467+
if ( error == null )
468+
{
469+
// there is no unconsumed error, so one of the following is true:
470+
// 1) this is first time connection is acquired in this session
471+
// 2) previous result has been successful and is fully consumed
472+
// 3) previous result failed and error has been consumed
473+
474+
// return existing connection, which should've been released back to the pool by now
475+
return currentConnectionStage.exceptionally( ignore -> null );
476+
}
477+
else
478+
{
479+
// there exists unconsumed error, re-throw it
480+
throw new CompletionException( error );
481+
}
482+
} ).thenCompose( existingConnection ->
483+
{
484+
if ( existingConnection != null && existingConnection.isOpen() )
485+
{
486+
// there somehow is an existing open connection, this should not happen, just a precondition
487+
throw new IllegalStateException( "Existing open connection detected" );
488+
}
489+
return connectionProvider.acquireConnection( mode );
490+
} );
491+
492+
connectionStage = newConnectionStage.exceptionally( error -> null );
470493

471-
return connectionStage;
494+
return newConnectionStage;
472495
}
473496

474497
private CompletionStage<Void> releaseResources()

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,9 @@ public class NettyConnection implements Connection
5050
private final ChannelPool channelPool;
5151
private final Clock clock;
5252

53+
private final AtomicBoolean open = new AtomicBoolean( true );
5354
private final AtomicBoolean autoReadEnabled = new AtomicBoolean( true );
5455

55-
private final NettyConnectionState state = new NettyConnectionState();
56-
5756
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
5857
{
5958
this.channel = channel;
@@ -63,15 +62,9 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
6362
}
6463

6564
@Override
66-
public boolean isInUse()
67-
{
68-
return state.isInUse();
69-
}
70-
71-
@Override
72-
public boolean tryMarkInUse()
65+
public boolean isOpen()
7366
{
74-
return state.markInUse();
67+
return open.get();
7568
}
7669

7770
@Override
@@ -109,7 +102,7 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
109102
@Override
110103
public void releaseInBackground()
111104
{
112-
if ( state.release() )
105+
if ( open.compareAndSet( true, false ) )
113106
{
114107
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) );
115108
}
@@ -118,7 +111,7 @@ public void releaseInBackground()
118111
@Override
119112
public CompletionStage<Void> releaseNow()
120113
{
121-
if ( state.forceRelease() )
114+
if ( open.compareAndSet( true, false ) )
122115
{
123116
Promise<Void> releasePromise = channel.eventLoop().newPromise();
124117
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );

driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,6 @@ public RoutingConnection( Connection delegate, AccessMode accessMode, RoutingErr
4141
this.errorHandler = errorHandler;
4242
}
4343

44-
@Override
45-
public boolean tryMarkInUse()
46-
{
47-
return delegate.tryMarkInUse();
48-
}
49-
5044
@Override
5145
public void enableAutoRead()
5246
{
@@ -82,9 +76,9 @@ public void releaseInBackground()
8276
}
8377

8478
@Override
85-
public boolean isInUse()
79+
public boolean isOpen()
8680
{
87-
return delegate.isInUse();
81+
return delegate.isOpen();
8882
}
8983

9084
@Override

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727

2828
public interface Connection
2929
{
30-
boolean isInUse();
31-
32-
boolean tryMarkInUse();
30+
boolean isOpen();
3331

3432
void enableAutoRead();
3533

driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,20 @@
2121
import org.junit.Test;
2222
import org.mockito.InOrder;
2323

24+
import java.util.function.Consumer;
25+
2426
import org.neo4j.driver.internal.spi.Connection;
27+
import org.neo4j.driver.internal.spi.ResponseHandler;
2528
import org.neo4j.driver.v1.Transaction;
2629

30+
import static java.util.Collections.emptyMap;
2731
import static org.junit.Assert.assertEquals;
2832
import static org.junit.Assert.assertFalse;
2933
import static org.junit.Assert.assertTrue;
34+
import static org.junit.Assert.fail;
3035
import static org.mockito.Matchers.any;
3136
import static org.mockito.Matchers.eq;
37+
import static org.mockito.Mockito.doAnswer;
3238
import static org.mockito.Mockito.inOrder;
3339
import static org.mockito.Mockito.mock;
3440
import static org.mockito.Mockito.never;
@@ -220,6 +226,36 @@ public void shouldNotOverwriteBookmarkWithEmptyBookmark()
220226
assertEquals( "Cat", tx.bookmark().maxBookmarkAsString() );
221227
}
222228

229+
@Test
230+
public void shouldReleaseConnectionWhenBeginFails()
231+
{
232+
RuntimeException error = new RuntimeException( "Wrong bookmark!" );
233+
Connection connection = connectionWithBegin( handler -> handler.onFailure( error ) );
234+
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
235+
236+
try
237+
{
238+
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );
239+
fail( "Exception expected" );
240+
}
241+
catch ( RuntimeException e )
242+
{
243+
assertEquals( error, e );
244+
}
245+
246+
verify( connection ).releaseNow();
247+
}
248+
249+
@Test
250+
public void shouldNotReleaseConnectionWhenBeginSucceeds()
251+
{
252+
Connection connection = connectionWithBegin( handler -> handler.onSuccess( emptyMap() ) );
253+
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
254+
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );
255+
256+
verify( connection, never() ).releaseNow();
257+
}
258+
223259
private static ExplicitTransaction beginTx( Connection connection )
224260
{
225261
return beginTx( connection, Bookmark.empty() );
@@ -236,4 +272,18 @@ private static ExplicitTransaction beginTx( Connection connection, NetworkSessio
236272
ExplicitTransaction tx = new ExplicitTransaction( connection, session );
237273
return getBlocking( tx.beginAsync( initialBookmark ) );
238274
}
275+
276+
private static Connection connectionWithBegin( Consumer<ResponseHandler> beginBehaviour )
277+
{
278+
Connection connection = mock( Connection.class );
279+
280+
doAnswer( invocation ->
281+
{
282+
ResponseHandler beginHandler = invocation.getArgumentAt( 3, ResponseHandler.class );
283+
beginBehaviour.accept( beginHandler );
284+
return null;
285+
} ).when( connection ).runAndFlush( eq( "BEGIN" ), any(), any(), any() );
286+
287+
return connection;
288+
}
239289
}

driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,24 +95,24 @@ private static void finalize( Session session ) throws Exception
9595
finalizeMethod.invoke( session );
9696
}
9797

98-
private static LeakLoggingNetworkSession newSession( Logging logging, boolean inUseConnection )
98+
private static LeakLoggingNetworkSession newSession( Logging logging, boolean openConnection )
9999
{
100-
return new LeakLoggingNetworkSession( connectionProviderMock( inUseConnection ), READ,
100+
return new LeakLoggingNetworkSession( connectionProviderMock( openConnection ), READ,
101101
new FixedRetryLogic( 0 ), logging );
102102
}
103103

104-
private static ConnectionProvider connectionProviderMock( boolean inUseConnection )
104+
private static ConnectionProvider connectionProviderMock( boolean openConnection )
105105
{
106106
ConnectionProvider provider = mock( ConnectionProvider.class );
107-
Connection connection = connectionMock( inUseConnection );
107+
Connection connection = connectionMock( openConnection );
108108
when( provider.acquireConnection( any( AccessMode.class ) ) ).thenReturn( completedFuture( connection ) );
109109
return provider;
110110
}
111111

112-
private static Connection connectionMock( boolean inUse )
112+
private static Connection connectionMock( boolean open )
113113
{
114114
Connection connection = mock( Connection.class );
115-
when( connection.isInUse() ).thenReturn( inUse );
115+
when( connection.isOpen() ).thenReturn( open );
116116
return connection;
117117
}
118118
}

0 commit comments

Comments
 (0)