Skip to content

Commit ea3ca44

Browse files
authored
Merge pull request #427 from lutovich/1.5-session-reset-release-resources
Improve Session#reset() & cleanup of Connection#release()
2 parents 5c71e11 + 5a174eb commit ea3ca44

15 files changed

+234
-100
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public CompletionStage<Connection> acquireConnection( AccessMode mode )
5252
@Override
5353
public CompletionStage<Void> verifyConnectivity()
5454
{
55-
return acquireConnection( READ ).thenCompose( Connection::releaseNow );
55+
return acquireConnection( READ ).thenCompose( Connection::release );
5656
}
5757

5858
@Override

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public CompletionStage<ExplicitTransaction> beginAsync( Bookmark initialBookmark
122122
if ( beginError != null )
123123
{
124124
// release connection if begin failed, transaction can't be started
125-
connection.releaseNow();
125+
connection.release();
126126
throw new CompletionException( Futures.completionErrorCause( beginError ) );
127127
}
128128
return tx;
@@ -397,7 +397,7 @@ private BiConsumer<Object,Throwable> transactionClosed( State newState )
397397
return ( ignore, error ) ->
398398
{
399399
state = newState;
400-
connection.releaseInBackground();
400+
connection.release(); // release in background
401401
session.setBookmark( bookmark );
402402
};
403403
}

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -254,14 +254,13 @@ public void reset()
254254

255255
private CompletionStage<Void> resetAsync()
256256
{
257-
return releaseConnectionNow().thenCompose( ignore -> existingTransactionOrNull() )
258-
.thenAccept( tx ->
259-
{
260-
if ( tx != null )
261-
{
262-
tx.markTerminated();
263-
}
264-
} );
257+
return existingTransactionOrNull().thenAccept( tx ->
258+
{
259+
if ( tx != null )
260+
{
261+
tx.markTerminated();
262+
}
263+
} ).thenCompose( ignore -> releaseConnection() );
265264
}
266265

267266
@Override
@@ -496,7 +495,7 @@ private CompletionStage<Connection> acquireConnection( AccessMode mode )
496495

497496
private CompletionStage<Void> releaseResources()
498497
{
499-
return rollbackTransaction().thenCompose( ignore -> releaseConnectionNow() );
498+
return rollbackTransaction().thenCompose( ignore -> releaseConnection() );
500499
}
501500

502501
private CompletionStage<Void> rollbackTransaction()
@@ -516,13 +515,13 @@ private CompletionStage<Void> rollbackTransaction()
516515
} );
517516
}
518517

519-
private CompletionStage<Void> releaseConnectionNow()
518+
private CompletionStage<Void> releaseConnection()
520519
{
521520
return existingConnectionOrNull().thenCompose( connection ->
522521
{
523522
if ( connection != null )
524523
{
525-
return connection.releaseNow();
524+
return connection.release();
526525
}
527526
return completedFuture( null );
528527
} );

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@
3939
import org.neo4j.driver.v1.Value;
4040

4141
import static java.util.concurrent.CompletableFuture.completedFuture;
42-
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
4342
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
4443

45-
// todo: keep state flags to prohibit interaction with released connections
4644
public class NettyConnection implements Connection
4745
{
4846
private final Channel channel;
4947
private final InboundMessageDispatcher messageDispatcher;
48+
private final BoltServerAddress serverAddress;
49+
private final ServerVersion serverVersion;
5050
private final ChannelPool channelPool;
5151
private final Clock clock;
5252

@@ -56,7 +56,9 @@ public class NettyConnection implements Connection
5656
public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
5757
{
5858
this.channel = channel;
59-
this.messageDispatcher = messageDispatcher( channel );
59+
this.messageDispatcher = ChannelAttributes.messageDispatcher( channel );
60+
this.serverAddress = ChannelAttributes.serverAddress( channel );
61+
this.serverVersion = ChannelAttributes.serverVersion( channel );
6062
this.channelPool = channelPool;
6163
this.clock = clock;
6264
}
@@ -70,6 +72,7 @@ public boolean isOpen()
7072
@Override
7173
public void enableAutoRead()
7274
{
75+
assertOpen();
7376
if ( autoReadEnabled.compareAndSet( false, true ) )
7477
{
7578
setAutoRead( true );
@@ -79,6 +82,7 @@ public void enableAutoRead()
7982
@Override
8083
public void disableAutoRead()
8184
{
85+
assertOpen();
8286
if ( autoReadEnabled.compareAndSet( true, false ) )
8387
{
8488
setAutoRead( false );
@@ -89,27 +93,20 @@ public void disableAutoRead()
8993
public void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
9094
ResponseHandler pullAllHandler )
9195
{
96+
assertOpen();
9297
run( statement, parameters, runHandler, pullAllHandler, false );
9398
}
9499

95100
@Override
96101
public void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
97102
ResponseHandler pullAllHandler )
98103
{
104+
assertOpen();
99105
run( statement, parameters, runHandler, pullAllHandler, true );
100106
}
101107

102108
@Override
103-
public void releaseInBackground()
104-
{
105-
if ( open.compareAndSet( true, false ) )
106-
{
107-
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock ) );
108-
}
109-
}
110-
111-
@Override
112-
public CompletionStage<Void> releaseNow()
109+
public CompletionStage<Void> release()
113110
{
114111
if ( open.compareAndSet( true, false ) )
115112
{
@@ -126,13 +123,13 @@ public CompletionStage<Void> releaseNow()
126123
@Override
127124
public BoltServerAddress serverAddress()
128125
{
129-
return ChannelAttributes.serverAddress( channel );
126+
return serverAddress;
130127
}
131128

132129
@Override
133130
public ServerVersion serverVersion()
134131
{
135-
return ChannelAttributes.serverVersion( channel );
132+
return serverVersion;
136133
}
137134

138135
private void run( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
@@ -185,4 +182,12 @@ private void setAutoRead( boolean value )
185182
{
186183
channel.config().setAutoRead( value );
187184
}
185+
186+
private void assertOpen()
187+
{
188+
if ( !open.get() )
189+
{
190+
throw new IllegalStateException( "Connection has been released to the pool and can't be reused" );
191+
}
192+
}
188193
}

driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,16 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
6969
newRoutingResponseHandler( pullAllHandler ) );
7070
}
7171

72-
@Override
73-
public void releaseInBackground()
74-
{
75-
delegate.releaseInBackground();
76-
}
77-
7872
@Override
7973
public boolean isOpen()
8074
{
8175
return delegate.isOpen();
8276
}
8377

8478
@Override
85-
public CompletionStage<Void> releaseNow()
79+
public CompletionStage<Void> release()
8680
{
87-
return delegate.releaseNow();
81+
return delegate.release();
8882
}
8983

9084
@Override

driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,6 @@ public class ResetResponseHandler implements ResponseHandler
3939
private final Clock clock;
4040
private final Promise<Void> releasePromise;
4141

42-
public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher,
43-
Clock clock )
44-
{
45-
this( channel, pool, messageDispatcher, clock, null );
46-
}
47-
4842
public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher,
4943
Clock clock, Promise<Void> releasePromise )
5044
{

driver/src/main/java/org/neo4j/driver/internal/handlers/SessionPullAllResponseHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,17 @@ public SessionPullAllResponseHandler( Statement statement, RunResponseHandler ru
3232
@Override
3333
protected void afterSuccess()
3434
{
35-
connection.releaseInBackground();
35+
releaseConnection();
3636
}
3737

3838
@Override
3939
protected void afterFailure( Throwable error )
4040
{
41-
connection.releaseInBackground();
41+
releaseConnection();
42+
}
43+
44+
private void releaseConnection()
45+
{
46+
connection.release(); // release in background
4247
}
4348
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ void run( String statement, Map<String,Value> parameters, ResponseHandler runHan
3939
void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
4040
ResponseHandler pullAllHandler );
4141

42-
void releaseInBackground();
43-
44-
CompletionStage<Void> releaseNow();
42+
CompletionStage<Void> release();
4543

4644
BoltServerAddress serverAddress();
4745

driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void shouldRollbackOnImplicitFailure()
5858
InOrder order = inOrder( connection );
5959
order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() );
6060
order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() );
61-
order.verify( connection ).releaseInBackground();
61+
order.verify( connection ).release();
6262
}
6363

6464
@Test
@@ -77,7 +77,7 @@ public void shouldRollbackOnExplicitFailure()
7777
InOrder order = inOrder( connection );
7878
order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() );
7979
order.verify( connection ).runAndFlush( eq( "ROLLBACK" ), any(), any(), any() );
80-
order.verify( connection ).releaseInBackground();
80+
order.verify( connection ).release();
8181
}
8282

8383
@Test
@@ -95,7 +95,7 @@ public void shouldCommitOnSuccess()
9595
InOrder order = inOrder( connection );
9696
order.verify( connection ).run( eq( "BEGIN" ), any(), any(), any() );
9797
order.verify( connection ).runAndFlush( eq( "COMMIT" ), any(), any(), any() );
98-
order.verify( connection ).releaseInBackground();
98+
order.verify( connection ).release();
9999
}
100100

101101
@Test
@@ -243,7 +243,7 @@ public void shouldReleaseConnectionWhenBeginFails()
243243
assertEquals( error, e );
244244
}
245245

246-
verify( connection ).releaseNow();
246+
verify( connection ).release();
247247
}
248248

249249
@Test
@@ -253,7 +253,7 @@ public void shouldNotReleaseConnectionWhenBeginSucceeds()
253253
ExplicitTransaction tx = new ExplicitTransaction( connection, mock( NetworkSession.class ) );
254254
getBlocking( tx.beginAsync( Bookmark.from( "SomeBookmark" ) ) );
255255

256-
verify( connection, never() ).releaseNow();
256+
verify( connection, never() ).release();
257257
}
258258

259259
private static ExplicitTransaction beginTx( Connection connection )

driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import static org.mockito.Matchers.any;
6161
import static org.mockito.Matchers.eq;
6262
import static org.mockito.Mockito.RETURNS_MOCKS;
63+
import static org.mockito.Mockito.atLeastOnce;
6364
import static org.mockito.Mockito.doAnswer;
6465
import static org.mockito.Mockito.inOrder;
6566
import static org.mockito.Mockito.mock;
@@ -88,7 +89,7 @@ public class NetworkSessionTest
8889
public void setUp()
8990
{
9091
connection = connectionMock();
91-
when( connection.releaseNow() ).thenReturn( completedFuture( null ) );
92+
when( connection.release() ).thenReturn( completedFuture( null ) );
9293
when( connection.serverVersion() ).thenReturn( ServerVersion.v3_2_0 );
9394
connectionProvider = mock( ConnectionProvider.class );
9495
when( connectionProvider.acquireConnection( any( AccessMode.class ) ) )
@@ -214,7 +215,7 @@ public void releasesOpenConnectionUsedForRunWhenSessionIsClosed()
214215

215216
InOrder inOrder = inOrder( connection );
216217
inOrder.verify( connection ).runAndFlush( eq( "RETURN 1" ), any(), any(), any() );
217-
inOrder.verify( connection ).releaseNow();
218+
inOrder.verify( connection, atLeastOnce() ).release();
218219
}
219220

220221
@SuppressWarnings( "deprecation" )
@@ -274,7 +275,7 @@ public void releasesConnectionWhenTxIsClosed()
274275
verify( connection ).runAndFlush( eq( query ), any(), any(), any() );
275276

276277
tx.close();
277-
verify( connection ).releaseInBackground();
278+
verify( connection ).release();
278279
}
279280

280281
@Test
@@ -484,22 +485,18 @@ public void writeTxRetriedUntilFailureWhenTxCloseThrows()
484485
}
485486

486487
@Test
487-
@SuppressWarnings( "deprecation" )
488488
public void connectionShouldBeReleasedAfterSessionReset()
489489
{
490490
NetworkSession session = newSession( connectionProvider, READ );
491491
session.run( "RETURN 1" );
492492

493-
verify( connection, never() ).releaseInBackground();
494-
verify( connection, never() ).releaseNow();
493+
verify( connection, never() ).release();
495494

496495
session.reset();
497-
verify( connection, never() ).releaseInBackground();
498-
verify( connection ).releaseNow();
496+
verify( connection ).release();
499497
}
500498

501499
@Test
502-
@SuppressWarnings( "deprecation" )
503500
public void transactionShouldBeRolledBackAfterSessionReset()
504501
{
505502
NetworkSession session = newSession( connectionProvider, READ );
@@ -663,6 +660,25 @@ public void shouldBeginTxAfterRunFailureToAcquireConnection()
663660
verifyBeginTx( connection, times( 1 ) );
664661
}
665662

663+
@Test
664+
public void shouldMarkTransactionAsTerminatedAndThenReleaseConnectionOnReset()
665+
{
666+
NetworkSession session = newSession( connectionProvider, READ );
667+
Transaction tx = session.beginTransaction();
668+
669+
assertTrue( tx.isOpen() );
670+
when( connection.release() ).then( invocation ->
671+
{
672+
// verify that tx is not open when connection is released
673+
assertFalse( tx.isOpen() );
674+
return completedFuture( null );
675+
} );
676+
677+
session.reset();
678+
679+
verify( connection ).release();
680+
}
681+
666682
private void testConnectionAcquisition( AccessMode sessionMode, AccessMode transactionMode )
667683
{
668684
NetworkSession session = newSession( connectionProvider, sessionMode );

0 commit comments

Comments
 (0)