Skip to content

Commit 2d86641

Browse files
committed
Make Session#reset() only send RESET message
Instead of sending RESET message and returning connection back to the pool. Queries & transactions should still be in control of the connection lifecycle even after reset. Previous behaviour was problematic because reset might race with query execution and make it try to use connection that has been returned to the pool. Also moved all RESET tests to a dedicated `SessionResetIT`. Made explicit transaction remain open after termination so that user code still needs to close it to return connection to the pool.
1 parent 8351b4f commit 2d86641

21 files changed

+1533
-711
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,25 @@ public class ExplicitTransaction implements Transaction
5959
private enum State
6060
{
6161
/** The transaction is running with no explicit success or failure marked */
62-
ACTIVE( true ),
62+
ACTIVE,
6363

6464
/** Running, user marked for success, meaning it'll value committed */
65-
MARKED_SUCCESS( true ),
65+
MARKED_SUCCESS,
6666

6767
/** User marked as failed, meaning it'll be rolled back. */
68-
MARKED_FAILED( true ),
68+
MARKED_FAILED,
6969

7070
/**
7171
* This transaction has been terminated either because of explicit {@link Session#reset()} or because of a
7272
* fatal connection error.
7373
*/
74-
TERMINATED( false ),
74+
TERMINATED,
7575

7676
/** This transaction has successfully committed */
77-
COMMITTED( false ),
77+
COMMITTED,
7878

7979
/** This transaction has been rolled back */
80-
ROLLED_BACK( false );
81-
82-
final boolean txOpen;
83-
84-
State( boolean txOpen )
85-
{
86-
this.txOpen = txOpen;
87-
}
80+
ROLLED_BACK
8881
}
8982

9083
private final Connection connection;
@@ -181,6 +174,7 @@ else if ( state == State.ROLLED_BACK )
181174
}
182175
else if ( state == State.TERMINATED )
183176
{
177+
transactionClosed( State.ROLLED_BACK );
184178
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
185179
}
186180
else
@@ -320,7 +314,7 @@ else if ( state == State.TERMINATED )
320314
@Override
321315
public boolean isOpen()
322316
{
323-
return state.txOpen;
317+
return state != State.COMMITTED && state != State.ROLLED_BACK;
324318
}
325319

326320
@Override

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,24 @@ public void reset()
258258

259259
private CompletionStage<Void> resetAsync()
260260
{
261-
return existingTransactionOrNull().thenAccept( tx ->
262-
{
263-
if ( tx != null )
264-
{
265-
tx.markTerminated();
266-
}
267-
} ).thenCompose( ignore -> releaseConnection() );
261+
return existingTransactionOrNull()
262+
.thenAccept( tx ->
263+
{
264+
if ( tx != null )
265+
{
266+
tx.markTerminated();
267+
}
268+
} )
269+
.thenCompose( ignore -> connectionStage )
270+
.thenCompose( connection ->
271+
{
272+
if ( connection != null )
273+
{
274+
// there exists an active connection, send a RESET message over it
275+
return connection.reset();
276+
}
277+
return completedWithNull();
278+
} );
268279
}
269280

270281
@Override

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.neo4j.driver.internal.BoltServerAddress;
3030
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
31+
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
3132
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
3233
import org.neo4j.driver.internal.messaging.Message;
3334
import org.neo4j.driver.internal.messaging.PullAllMessage;
@@ -39,6 +40,7 @@
3940
import org.neo4j.driver.internal.util.ServerVersion;
4041
import org.neo4j.driver.v1.Value;
4142

43+
import static java.util.Collections.emptyMap;
4244
import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason;
4345

4446
public class NettyConnection implements Connection
@@ -108,15 +110,24 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
108110
}
109111
}
110112

113+
@Override
114+
public CompletionStage<Void> reset()
115+
{
116+
CompletableFuture<Void> result = new CompletableFuture<>();
117+
ResetResponseHandler handler = new ResetResponseHandler( messageDispatcher, result );
118+
reset( handler, true );
119+
return result;
120+
}
121+
111122
@Override
112123
public CompletionStage<Void> release()
113124
{
114125
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
115126
{
116-
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
117-
setAutoRead( true );
127+
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel,
128+
channelPool, messageDispatcher, clock, releaseFuture );
118129

119-
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
130+
reset( handler, false );
120131
}
121132
return releaseFuture;
122133
}
@@ -152,12 +163,21 @@ private void run( String statement, Map<String,Value> parameters, ResponseHandle
152163
pullAllHandler, flush );
153164
}
154165

155-
private void reset( ResponseHandler resetHandler )
166+
private void reset( ResponseHandler resetHandler, boolean sendNothingIfClosed )
156167
{
157168
channel.eventLoop().execute( () ->
158169
{
159-
messageDispatcher.muteAckFailure();
160-
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
170+
if ( sendNothingIfClosed && !isOpen() )
171+
{
172+
resetHandler.onSuccess( emptyMap() );
173+
}
174+
else
175+
{
176+
messageDispatcher.muteAckFailure();
177+
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
178+
setAutoRead( true );
179+
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
180+
}
161181
} );
162182
}
163183

driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
7171
newRoutingResponseHandler( pullAllHandler ) );
7272
}
7373

74+
@Override
75+
public CompletionStage<Void> reset()
76+
{
77+
return delegate.reset();
78+
}
79+
7480
@Override
7581
public boolean isOpen()
7682
{

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.neo4j.driver.v1.Logger;
3434
import org.neo4j.driver.v1.Logging;
3535
import org.neo4j.driver.v1.Value;
36+
import org.neo4j.driver.v1.exceptions.ClientException;
3637

3738
import static java.util.Objects.requireNonNull;
3839
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
@@ -157,14 +158,24 @@ public void handleIgnoredMessage()
157158
log.debug( "S: IGNORED" );
158159

159160
ResponseHandler handler = handlers.remove();
161+
162+
Throwable error;
160163
if ( currentError != null )
161164
{
162-
handler.onFailure( currentError );
165+
error = currentError;
166+
}
167+
else if ( ackFailureMuted )
168+
{
169+
error = new ClientException( "Database ignored the request because session has been reset" );
163170
}
164171
else
165172
{
166-
log.warn( "Received IGNORED message for handler %s but error is missing", handler );
173+
log.warn( "Received IGNORED message for handler %s but error is missing and RESET is not in progress. " +
174+
"Current handlers %s", handler, handlers );
175+
176+
error = new ClientException( "Database ignored the request" );
167177
}
178+
handler.onFailure( error );
168179
}
169180

170181
public void handleFatalError( Throwable error )
@@ -212,15 +223,9 @@ public void muteAckFailure()
212223
* {@link #muteAckFailure()} when sending RESET message.
213224
* <p>
214225
* <b>This method is not thread-safe</b> and should only be executed by the event loop thread.
215-
*
216-
* @throws IllegalStateException if ACK_FAILURE is not muted right now.
217226
*/
218227
public void unMuteAckFailure()
219228
{
220-
if ( !ackFailureMuted )
221-
{
222-
throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" );
223-
}
224229
ackFailureMuted = false;
225230
}
226231

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.handlers;
20+
21+
import io.netty.channel.Channel;
22+
import io.netty.channel.pool.ChannelPool;
23+
import io.netty.util.concurrent.Future;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
27+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
28+
import org.neo4j.driver.internal.util.Clock;
29+
30+
import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp;
31+
32+
public class ChannelReleasingResetResponseHandler extends ResetResponseHandler
33+
{
34+
private final Channel channel;
35+
private final ChannelPool pool;
36+
private final Clock clock;
37+
38+
public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool,
39+
InboundMessageDispatcher messageDispatcher, Clock clock, CompletableFuture<Void> releaseFuture )
40+
{
41+
super( messageDispatcher, releaseFuture );
42+
this.channel = channel;
43+
this.pool = pool;
44+
this.clock = clock;
45+
}
46+
47+
@Override
48+
protected void resetCompleted( CompletableFuture<Void> completionFuture )
49+
{
50+
setLastUsedTimestamp( channel, clock.millis() );
51+
52+
Future<Void> released = pool.release( channel );
53+
released.addListener( ignore -> completionFuture.complete( null ) );
54+
}
55+
}

driver/src/main/java/org/neo4j/driver/internal/handlers/ResetResponseHandler.java

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,62 +18,50 @@
1818
*/
1919
package org.neo4j.driver.internal.handlers;
2020

21-
import io.netty.channel.Channel;
22-
import io.netty.channel.pool.ChannelPool;
23-
import io.netty.util.concurrent.Future;
24-
2521
import java.util.Map;
2622
import java.util.concurrent.CompletableFuture;
2723

2824
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2925
import org.neo4j.driver.internal.spi.ResponseHandler;
30-
import org.neo4j.driver.internal.util.Clock;
3126
import org.neo4j.driver.v1.Value;
3227

33-
import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp;
34-
3528
public class ResetResponseHandler implements ResponseHandler
3629
{
37-
private final Channel channel;
38-
private final ChannelPool pool;
3930
private final InboundMessageDispatcher messageDispatcher;
40-
private final Clock clock;
41-
private final CompletableFuture<Void> releaseFuture;
31+
private final CompletableFuture<Void> completionFuture;
4232

43-
public ResetResponseHandler( Channel channel, ChannelPool pool, InboundMessageDispatcher messageDispatcher,
44-
Clock clock, CompletableFuture<Void> releaseFuture )
33+
public ResetResponseHandler( InboundMessageDispatcher messageDispatcher, CompletableFuture<Void> completionFuture )
4534
{
46-
this.channel = channel;
47-
this.pool = pool;
4835
this.messageDispatcher = messageDispatcher;
49-
this.clock = clock;
50-
this.releaseFuture = releaseFuture;
36+
this.completionFuture = completionFuture;
5137
}
5238

5339
@Override
54-
public void onSuccess( Map<String,Value> metadata )
40+
public final void onSuccess( Map<String,Value> metadata )
5541
{
56-
releaseChannel();
42+
resetCompleted();
5743
}
5844

5945
@Override
60-
public void onFailure( Throwable error )
46+
public final void onFailure( Throwable error )
6147
{
62-
releaseChannel();
48+
resetCompleted();
6349
}
6450

6551
@Override
66-
public void onRecord( Value[] fields )
52+
public final void onRecord( Value[] fields )
6753
{
6854
throw new UnsupportedOperationException();
6955
}
7056

71-
private void releaseChannel()
57+
private void resetCompleted()
7258
{
7359
messageDispatcher.unMuteAckFailure();
74-
setLastUsedTimestamp( channel, clock.millis() );
60+
resetCompleted( completionFuture );
61+
}
7562

76-
Future<Void> released = pool.release( channel );
77-
released.addListener( ignore -> releaseFuture.complete( null ) );
63+
protected void resetCompleted( CompletableFuture<Void> completionFuture )
64+
{
65+
completionFuture.complete( null );
7866
}
7967
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ void run( String statement, Map<String,Value> parameters, ResponseHandler runHan
3939
void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandler runHandler,
4040
ResponseHandler pullAllHandler );
4141

42+
CompletionStage<Void> reset();
43+
4244
CompletionStage<Void> release();
4345

4446
void terminateAndRelease( String reason );

0 commit comments

Comments
 (0)