Skip to content

Commit b23a50b

Browse files
authored
Merge pull request #456 from lutovich/1.5-session-reset-fix
Fixed `Session#reset()` to only send a RESET message
2 parents 4711db9 + f3ae1d4 commit b23a50b

26 files changed

+1864
-741
lines changed

driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Map;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.CompletionStage;
24-
import java.util.function.BiConsumer;
2524
import java.util.function.BiFunction;
2625

2726
import org.neo4j.driver.internal.async.QueryRunner;
@@ -60,31 +59,25 @@ public class ExplicitTransaction implements Transaction
6059
private enum State
6160
{
6261
/** The transaction is running with no explicit success or failure marked */
63-
ACTIVE( true ),
62+
ACTIVE,
6463

6564
/** Running, user marked for success, meaning it'll value committed */
66-
MARKED_SUCCESS( true ),
65+
MARKED_SUCCESS,
6766

6867
/** User marked as failed, meaning it'll be rolled back. */
69-
MARKED_FAILED( true ),
68+
MARKED_FAILED,
7069

7170
/**
72-
* This transaction has been explicitly terminated by calling {@link Session#reset()}.
71+
* This transaction has been terminated either because of explicit {@link Session#reset()} or because of a
72+
* fatal connection error.
7373
*/
74-
TERMINATED( false ),
74+
TERMINATED,
7575

7676
/** This transaction has successfully committed */
77-
COMMITTED( false ),
77+
COMMITTED,
7878

7979
/** This transaction has been rolled back */
80-
ROLLED_BACK( false );
81-
82-
final boolean txOpen;
83-
84-
State( boolean txOpen )
85-
{
86-
this.txOpen = txOpen;
87-
}
80+
ROLLED_BACK
8881
}
8982

9083
private final Connection connection;
@@ -158,7 +151,7 @@ CompletionStage<Void> closeAsync()
158151
{
159152
return commitAsync();
160153
}
161-
else if ( state == State.ACTIVE || state == State.MARKED_FAILED || state == State.TERMINATED )
154+
else if ( state != State.COMMITTED && state != State.ROLLED_BACK )
162155
{
163156
return rollbackAsync();
164157
}
@@ -181,14 +174,14 @@ else if ( state == State.ROLLED_BACK )
181174
}
182175
else if ( state == State.TERMINATED )
183176
{
184-
return failedFuture(
185-
new ClientException( "Can't commit, transaction has been terminated by `Session#reset()`" ) );
177+
transactionClosed( State.ROLLED_BACK );
178+
return failedFuture( new ClientException( "Can't commit, transaction has been terminated" ) );
186179
}
187180
else
188181
{
189182
return resultCursors.retrieveNotConsumedError()
190183
.thenCompose( error -> doCommitAsync().handle( handleCommitOrRollback( error ) ) )
191-
.whenComplete( transactionClosed( State.COMMITTED ) );
184+
.whenComplete( ( ignore, error ) -> transactionClosed( State.COMMITTED ) );
192185
}
193186
}
194187

@@ -205,15 +198,15 @@ else if ( state == State.ROLLED_BACK )
205198
}
206199
else if ( state == State.TERMINATED )
207200
{
208-
// transaction has been terminated by RESET and should be rolled back by the database
209-
state = State.ROLLED_BACK;
201+
// no need for explicit rollback, transaction should've been rolled back by the database
202+
transactionClosed( State.ROLLED_BACK );
210203
return completedWithNull();
211204
}
212205
else
213206
{
214207
return resultCursors.retrieveNotConsumedError()
215208
.thenCompose( error -> doRollbackAsync().handle( handleCommitOrRollback( error ) ) )
216-
.whenComplete( transactionClosed( State.ROLLED_BACK ) );
209+
.whenComplete( ( ignore, error ) -> transactionClosed( State.ROLLED_BACK ) );
217210
}
218211
}
219212

@@ -314,15 +307,14 @@ else if ( state == State.MARKED_FAILED )
314307
}
315308
else if ( state == State.TERMINATED )
316309
{
317-
throw new ClientException(
318-
"Cannot run more statements in this transaction, it has been terminated by `Session#reset()`" );
310+
throw new ClientException( "Cannot run more statements in this transaction, it has been terminated" );
319311
}
320312
}
321313

322314
@Override
323315
public boolean isOpen()
324316
{
325-
return state.txOpen;
317+
return state != State.COMMITTED && state != State.ROLLED_BACK;
326318
}
327319

328320
@Override
@@ -394,14 +386,11 @@ else if ( commitOrRollbackError != null )
394386
};
395387
}
396388

397-
private BiConsumer<Object,Throwable> transactionClosed( State newState )
389+
private void transactionClosed( State newState )
398390
{
399-
return ( ignore, error ) ->
400-
{
401-
state = newState;
402-
connection.release(); // release in background
403-
session.setBookmark( bookmark );
404-
};
391+
state = newState;
392+
connection.release(); // release in background
393+
session.setBookmark( bookmark );
405394
}
406395

407396
private void terminateConnectionOnThreadInterrupt( String reason )

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.neo4j.driver.v1.exceptions.ClientException;
4747
import org.neo4j.driver.v1.types.TypeSystem;
4848

49+
import static java.util.concurrent.CompletableFuture.completedFuture;
4950
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
5051
import static org.neo4j.driver.internal.util.Futures.failedFuture;
5152
import static org.neo4j.driver.v1.Values.value;
@@ -257,13 +258,24 @@ public void reset()
257258

258259
private CompletionStage<Void> resetAsync()
259260
{
260-
return existingTransactionOrNull().thenAccept( tx ->
261-
{
262-
if ( tx != null )
263-
{
264-
tx.markTerminated();
265-
}
266-
} ).thenCompose( ignore -> releaseConnection() );
261+
return existingTransactionOrNull()
262+
.thenAccept( tx ->
263+
{
264+
if ( tx != null )
265+
{
266+
tx.markTerminated();
267+
}
268+
} )
269+
.thenCompose( ignore -> connectionStage )
270+
.thenCompose( connection ->
271+
{
272+
if ( connection != null )
273+
{
274+
// there exists an active connection, send a RESET message over it
275+
return connection.reset();
276+
}
277+
return completedWithNull();
278+
} );
267279
}
268280

269281
@Override
@@ -436,15 +448,32 @@ private CompletionStage<ExplicitTransaction> beginTransactionAsync( AccessMode m
436448
{
437449
ensureSessionIsOpen();
438450

439-
transactionStage = ensureNoOpenTxBeforeStartingTx()
451+
// create a chain that acquires connection and starts a transaction
452+
CompletionStage<ExplicitTransaction> newTransactionStage = ensureNoOpenTxBeforeStartingTx()
440453
.thenCompose( ignore -> acquireConnection( mode ) )
441454
.thenCompose( connection ->
442455
{
443456
ExplicitTransaction tx = new ExplicitTransaction( connection, NetworkSession.this );
444457
return tx.beginAsync( bookmark );
445458
} );
446459

447-
return transactionStage;
460+
// update the reference to the only known transaction
461+
CompletionStage<ExplicitTransaction> currentTransactionStage = transactionStage;
462+
463+
transactionStage = newTransactionStage
464+
.exceptionally( error -> null ) // ignore errors from starting new transaction
465+
.thenCompose( tx ->
466+
{
467+
if ( tx == null )
468+
{
469+
// failed to begin new transaction, keep reference to the existing one
470+
return currentTransactionStage;
471+
}
472+
// new transaction started, keep reference to it
473+
return completedFuture( tx );
474+
} );
475+
476+
return newTransactionStage;
448477
}
449478

450479
private CompletionStage<Connection> acquireConnection( AccessMode mode )

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import org.neo4j.driver.internal.BoltServerAddress;
3030
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
31+
import org.neo4j.driver.internal.handlers.ChannelReleasingResetResponseHandler;
3132
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
3233
import org.neo4j.driver.internal.messaging.Message;
3334
import org.neo4j.driver.internal.messaging.PullAllMessage;
@@ -39,6 +40,7 @@
3940
import org.neo4j.driver.internal.util.ServerVersion;
4041
import org.neo4j.driver.v1.Value;
4142

43+
import static java.util.Collections.emptyMap;
4244
import static org.neo4j.driver.internal.async.ChannelAttributes.setTerminationReason;
4345

4446
public class NettyConnection implements Connection
@@ -108,15 +110,24 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
108110
}
109111
}
110112

113+
@Override
114+
public CompletionStage<Void> reset()
115+
{
116+
CompletableFuture<Void> result = new CompletableFuture<>();
117+
ResetResponseHandler handler = new ResetResponseHandler( messageDispatcher, result );
118+
writeResetMessageIfNeeded( handler, true );
119+
return result;
120+
}
121+
111122
@Override
112123
public CompletionStage<Void> release()
113124
{
114125
if ( status.compareAndSet( Status.OPEN, Status.RELEASED ) )
115126
{
116-
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
117-
setAutoRead( true );
127+
ChannelReleasingResetResponseHandler handler = new ChannelReleasingResetResponseHandler( channel,
128+
channelPool, messageDispatcher, clock, releaseFuture );
118129

119-
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
130+
writeResetMessageIfNeeded( handler, false );
120131
}
121132
return releaseFuture;
122133
}
@@ -152,12 +163,21 @@ private void run( String statement, Map<String,Value> parameters, ResponseHandle
152163
pullAllHandler, flush );
153164
}
154165

155-
private void reset( ResponseHandler resetHandler )
166+
private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean isSessionReset )
156167
{
157168
channel.eventLoop().execute( () ->
158169
{
159-
messageDispatcher.muteAckFailure();
160-
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
170+
if ( isSessionReset && !isOpen() )
171+
{
172+
resetHandler.onSuccess( emptyMap() );
173+
}
174+
else
175+
{
176+
messageDispatcher.muteAckFailure();
177+
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
178+
setAutoRead( true );
179+
writeAndFlushMessage( ResetMessage.RESET, resetHandler );
180+
}
161181
} );
162182
}
163183

driver/src/main/java/org/neo4j/driver/internal/async/RoutingConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ public void runAndFlush( String statement, Map<String,Value> parameters, Respons
7171
newRoutingResponseHandler( pullAllHandler ) );
7272
}
7373

74+
@Override
75+
public CompletionStage<Void> reset()
76+
{
77+
return delegate.reset();
78+
}
79+
7480
@Override
7581
public boolean isOpen()
7682
{

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.neo4j.driver.v1.Logger;
3434
import org.neo4j.driver.v1.Logging;
3535
import org.neo4j.driver.v1.Value;
36+
import org.neo4j.driver.v1.exceptions.ClientException;
3637

3738
import static java.util.Objects.requireNonNull;
3839
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
@@ -157,14 +158,24 @@ public void handleIgnoredMessage()
157158
log.debug( "S: IGNORED" );
158159

159160
ResponseHandler handler = handlers.remove();
161+
162+
Throwable error;
160163
if ( currentError != null )
161164
{
162-
handler.onFailure( currentError );
165+
error = currentError;
166+
}
167+
else if ( ackFailureMuted )
168+
{
169+
error = new ClientException( "Database ignored the request because session has been reset" );
163170
}
164171
else
165172
{
166-
log.warn( "Received IGNORED message for handler %s but error is missing", handler );
173+
log.warn( "Received IGNORED message for handler %s but error is missing and RESET is not in progress. " +
174+
"Current handlers %s", handler, handlers );
175+
176+
error = new ClientException( "Database ignored the request" );
167177
}
178+
handler.onFailure( error );
168179
}
169180

170181
public void handleFatalError( Throwable error )
@@ -212,15 +223,9 @@ public void muteAckFailure()
212223
* {@link #muteAckFailure()} when sending RESET message.
213224
* <p>
214225
* <b>This method is not thread-safe</b> and should only be executed by the event loop thread.
215-
*
216-
* @throws IllegalStateException if ACK_FAILURE is not muted right now.
217226
*/
218227
public void unMuteAckFailure()
219228
{
220-
if ( !ackFailureMuted )
221-
{
222-
throw new IllegalStateException( "Can't un-mute ACK_FAILURE because it's not muted" );
223-
}
224229
ackFailureMuted = false;
225230
}
226231

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.handlers;
20+
21+
import io.netty.channel.Channel;
22+
import io.netty.channel.pool.ChannelPool;
23+
import io.netty.util.concurrent.Future;
24+
25+
import java.util.concurrent.CompletableFuture;
26+
27+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
28+
import org.neo4j.driver.internal.util.Clock;
29+
30+
import static org.neo4j.driver.internal.async.ChannelAttributes.setLastUsedTimestamp;
31+
32+
public class ChannelReleasingResetResponseHandler extends ResetResponseHandler
33+
{
34+
private final Channel channel;
35+
private final ChannelPool pool;
36+
private final Clock clock;
37+
38+
public ChannelReleasingResetResponseHandler( Channel channel, ChannelPool pool,
39+
InboundMessageDispatcher messageDispatcher, Clock clock, CompletableFuture<Void> releaseFuture )
40+
{
41+
super( messageDispatcher, releaseFuture );
42+
this.channel = channel;
43+
this.pool = pool;
44+
this.clock = clock;
45+
}
46+
47+
@Override
48+
protected void resetCompleted( CompletableFuture<Void> completionFuture )
49+
{
50+
setLastUsedTimestamp( channel, clock.millis() );
51+
52+
Future<Void> released = pool.release( channel );
53+
released.addListener( ignore -> completionFuture.complete( null ) );
54+
}
55+
}

0 commit comments

Comments
 (0)