Skip to content

Commit 558c61a

Browse files
Don’t swallow the cause of a TX termination. (neo4j#731)
* Don’t swallow the cause of a TX termination. There are a couple of scenarions in which a transaction gets terminated. The cause will be propagated by the pull handler and the transaction will be marked accordingly. There might be a chance that a degration from a leader to a follower on the server side happens in between to calls to a run method on that transaction: The transaction is still open, but cannot run queries any longer. Outside a transactional function this leads correctly to a ClientException. Inside a transactional function, this must not happen. The retry logic must be able to find the cause of the termination and if there’s any, it should judge on the cause if it retries or not. This PR changes the following: - Add a StateHolder to the UnmangedTransaction - The holder is necessary to keep the single field volatie - The holder holds the state and a possible cause of termination - The holder is able to determine whether a session is still open or not. - It removes markTerminated from InternalAsyncTransaction as it was used only for tests. * Polishing. This removes the circular dependency between holder and state. I had introduced it cause I wanted to avoid object allocation. This solution here is better: It also creates only 3 different holders for non terminated states, but without the circular dependency. Co-authored-by: Gregory Woods <[email protected]>
1 parent 0eb52ba commit 558c61a

17 files changed

+514
-116
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/InternalAsyncTransaction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,6 @@ public CompletionStage<ResultCursor> runAsync(Query query)
5050
return tx.runAsync(query, true );
5151
}
5252

53-
public void markTerminated()
54-
{
55-
tx.markTerminated();
56-
}
57-
5853
public boolean isOpen()
5954
{
6055
return tx.isOpen();

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public CompletionStage<Void> resetAsync()
138138
{
139139
if ( tx != null )
140140
{
141-
tx.markTerminated();
141+
tx.markTerminated( null );
142142
}
143143
} )
144144
.thenCompose( ignore -> connectionStage )

driver/src/main/java/org/neo4j/driver/internal/async/UnmanagedTransaction.java

Lines changed: 71 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.async;
2020

21+
import java.util.EnumSet;
2122
import java.util.concurrent.CompletionException;
2223
import java.util.concurrent.CompletionStage;
2324
import java.util.function.BiFunction;
@@ -58,13 +59,66 @@ private enum State
5859
ROLLED_BACK
5960
}
6061

62+
/**
63+
* This is a holder so that we can have ony the state volatile in the tx without having to synchronize the whole block.
64+
*/
65+
private static final class StateHolder
66+
{
67+
private static final EnumSet<State> OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED );
68+
private static final StateHolder ACTIVE_HOLDER = new StateHolder( State.ACTIVE, null );
69+
private static final StateHolder COMMITTED_HOLDER = new StateHolder( State.COMMITTED, null );
70+
private static final StateHolder ROLLED_BACK_HOLDER = new StateHolder( State.ROLLED_BACK, null );
71+
72+
/**
73+
* The actual state.
74+
*/
75+
final State value;
76+
77+
/**
78+
* If this holder contains a state of {@link State#TERMINATED}, this represents the cause if any.
79+
*/
80+
final Throwable causeOfTermination;
81+
82+
static StateHolder of( State value )
83+
{
84+
switch ( value )
85+
{
86+
case ACTIVE:
87+
return ACTIVE_HOLDER;
88+
case COMMITTED:
89+
return COMMITTED_HOLDER;
90+
case ROLLED_BACK:
91+
return ROLLED_BACK_HOLDER;
92+
case TERMINATED:
93+
default:
94+
throw new IllegalArgumentException( "Cannot provide a default state holder for state " + value );
95+
}
96+
}
97+
98+
static StateHolder terminatedWith( Throwable cause )
99+
{
100+
return new StateHolder( State.TERMINATED, cause );
101+
}
102+
103+
private StateHolder( State value, Throwable causeOfTermination )
104+
{
105+
this.value = value;
106+
this.causeOfTermination = causeOfTermination;
107+
}
108+
109+
boolean isOpen()
110+
{
111+
return OPEN_STATES.contains( this.value );
112+
}
113+
}
114+
61115
private final Connection connection;
62116
private final BoltProtocol protocol;
63117
private final BookmarkHolder bookmarkHolder;
64118
private final ResultCursorsHolder resultCursors;
65119
private final long fetchSize;
66120

67-
private volatile State state = State.ACTIVE;
121+
private volatile StateHolder state = StateHolder.of( State.ACTIVE );
68122

69123
public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize )
70124
{
@@ -104,11 +158,11 @@ public CompletionStage<Void> closeAsync()
104158

105159
public CompletionStage<Void> commitAsync()
106160
{
107-
if ( state == State.COMMITTED )
161+
if ( state.value == State.COMMITTED )
108162
{
109163
return failedFuture( new ClientException( "Can't commit, transaction has been committed" ) );
110164
}
111-
else if ( state == State.ROLLED_BACK )
165+
else if ( state.value == State.ROLLED_BACK )
112166
{
113167
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
114168
}
@@ -122,11 +176,11 @@ else if ( state == State.ROLLED_BACK )
122176

123177
public CompletionStage<Void> rollbackAsync()
124178
{
125-
if ( state == State.COMMITTED )
179+
if ( state.value == State.COMMITTED )
126180
{
127181
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
128182
}
129-
else if ( state == State.ROLLED_BACK )
183+
else if ( state.value == State.ROLLED_BACK )
130184
{
131185
return failedFuture( new ClientException( "Can't rollback, transaction has been rolled back" ) );
132186
}
@@ -158,12 +212,12 @@ public CompletionStage<RxResultCursor> runRx(Query query)
158212

159213
public boolean isOpen()
160214
{
161-
return state != State.COMMITTED && state != State.ROLLED_BACK;
215+
return state.isOpen();
162216
}
163217

164-
public void markTerminated()
218+
public void markTerminated( Throwable cause )
165219
{
166-
state = State.TERMINATED;
220+
state = StateHolder.terminatedWith( cause );
167221
}
168222

169223
public Connection connection()
@@ -173,34 +227,34 @@ public Connection connection()
173227

174228
private void ensureCanRunQueries()
175229
{
176-
if ( state == State.COMMITTED )
230+
if ( state.value == State.COMMITTED )
177231
{
178232
throw new ClientException( "Cannot run more queries in this transaction, it has been committed" );
179233
}
180-
else if ( state == State.ROLLED_BACK )
234+
else if ( state.value == State.ROLLED_BACK )
181235
{
182236
throw new ClientException( "Cannot run more queries in this transaction, it has been rolled back" );
183237
}
184-
else if ( state == State.TERMINATED )
238+
else if ( state.value == State.TERMINATED )
185239
{
186240
throw new ClientException( "Cannot run more queries in this transaction, " +
187-
"it has either experienced an fatal error or was explicitly terminated" );
241+
"it has either experienced an fatal error or was explicitly terminated", state.causeOfTermination );
188242
}
189243
}
190244

191245
private CompletionStage<Void> doCommitAsync()
192246
{
193-
if ( state == State.TERMINATED )
247+
if ( state.value == State.TERMINATED )
194248
{
195249
return failedFuture( new ClientException( "Transaction can't be committed. " +
196-
"It has been rolled back either because of an error or explicit termination" ) );
250+
"It has been rolled back either because of an error or explicit termination", state.causeOfTermination ) );
197251
}
198252
return protocol.commitTransaction( connection ).thenAccept( bookmarkHolder::setBookmark );
199253
}
200254

201255
private CompletionStage<Void> doRollbackAsync()
202256
{
203-
if ( state == State.TERMINATED )
257+
if ( state.value == State.TERMINATED )
204258
{
205259
return completedWithNull();
206260
}
@@ -224,11 +278,11 @@ private void transactionClosed( boolean isCommitted )
224278
{
225279
if ( isCommitted )
226280
{
227-
state = State.COMMITTED;
281+
state = StateHolder.of( State.COMMITTED );
228282
}
229283
else
230284
{
231-
state = State.ROLLED_BACK;
285+
state = StateHolder.of( State.ROLLED_BACK );
232286
}
233287
connection.release(); // release in background
234288
}

driver/src/main/java/org/neo4j/driver/internal/handlers/TransactionPullResponseCompletionListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,6 @@ public void afterFailure( Throwable error )
4545
// always mark transaction as terminated because every error is "acknowledged" with a RESET message
4646
// so database forgets about the transaction after the first error
4747
// such transaction should not attempt to commit and can be considered as rolled back
48-
tx.markTerminated();
48+
tx.markTerminated( error );
4949
}
5050
}

driver/src/main/java/org/neo4j/driver/internal/reactive/InternalRxTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public RxResult run(Query query)
5656
// The logic here shall be the same as `TransactionPullResponseHandler#afterFailure` as that is where cursor handling failure
5757
// This is optional as tx still holds a reference to all cursor futures and they will be clean up properly in commit
5858
Throwable error = Futures.completionExceptionCause( completionError );
59-
tx.markTerminated();
59+
tx.markTerminated( error );
6060
cursorFuture.completeExceptionally( error );
6161
}
6262
} );

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import org.neo4j.driver.Logger;
4141
import org.neo4j.driver.Logging;
42+
import org.neo4j.driver.exceptions.ClientException;
4243
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4344
import org.neo4j.driver.exceptions.SessionExpiredException;
4445
import org.neo4j.driver.exceptions.TransientException;
@@ -100,8 +101,9 @@ public <T> T retry( Supplier<T> work )
100101
{
101102
return work.get();
102103
}
103-
catch ( Throwable error )
104+
catch ( Throwable throwable )
104105
{
106+
Throwable error = extractPossibleTerminationCause( throwable );
105107
if ( canRetryOn( error ) )
106108
{
107109
long currentTime = clock.millis();
@@ -122,8 +124,10 @@ public <T> T retry( Supplier<T> work )
122124
continue;
123125
}
124126
}
125-
addSuppressed( error, errors );
126-
throw error;
127+
128+
// Add the original error in case we didn't continue the loop from within the if above.
129+
addSuppressed( throwable, errors );
130+
throw throwable;
127131
}
128132
}
129133
}
@@ -144,54 +148,67 @@ public <T> Publisher<T> retryRx( Publisher<T> work )
144148

145149
protected boolean canRetryOn( Throwable error )
146150
{
147-
return error instanceof SessionExpiredException ||
148-
error instanceof ServiceUnavailableException ||
149-
isTransientError( error );
151+
return error instanceof SessionExpiredException || error instanceof ServiceUnavailableException || isTransientError( error );
152+
}
153+
154+
/**
155+
* Extracts the possible cause of a transaction that has been marked terminated.
156+
*
157+
* @param error
158+
* @return
159+
*/
160+
private static Throwable extractPossibleTerminationCause( Throwable error )
161+
{
162+
163+
// Having a dedicated "TerminatedException" inheriting from ClientException might be a good idea.
164+
if ( error instanceof ClientException && error.getCause() != null )
165+
{
166+
return error.getCause();
167+
}
168+
return error;
150169
}
151170

152171
private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition()
153172
{
154-
return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> {
155-
Throwable lastError = t2.getT1();
173+
return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 ->
174+
{
175+
176+
Throwable throwable = t2.getT1();
177+
Throwable error = extractPossibleTerminationCause( throwable );
178+
156179
Context ctx = t2.getT2();
157180

158181
List<Throwable> errors = ctx.getOrDefault( "errors", null );
159182

160-
long startTime = ctx.getOrDefault( "startTime", -1L );
183+
long startTime = ctx.getOrDefault( "startTime", -1L );
161184
long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs );
162185

163-
if( !canRetryOn( lastError ) )
186+
if ( canRetryOn( error ) )
164187
{
165-
addSuppressed( lastError, errors );
166-
return Mono.error( lastError );
167-
}
168-
169-
long currentTime = clock.millis();
170-
if ( startTime == -1 )
171-
{
172-
startTime = currentTime;
173-
}
188+
long currentTime = clock.millis();
189+
if ( startTime == -1 )
190+
{
191+
startTime = currentTime;
192+
}
174193

175-
long elapsedTime = currentTime - startTime;
176-
if ( elapsedTime < maxRetryTimeMs )
177-
{
178-
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
179-
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", lastError );
194+
long elapsedTime = currentTime - startTime;
195+
if ( elapsedTime < maxRetryTimeMs )
196+
{
197+
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
198+
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
180199

181-
nextDelayMs = (long) (nextDelayMs * multiplier);
182-
errors = recordError( lastError, errors );
200+
nextDelayMs = (long) (nextDelayMs * multiplier);
201+
errors = recordError( error, errors );
183202

184-
// retry on netty event loop thread
185-
EventExecutor eventExecutor = eventExecutorGroup.next();
186-
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) )
187-
.delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
203+
// retry on netty event loop thread
204+
EventExecutor eventExecutor = eventExecutorGroup.next();
205+
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement(
206+
Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
207+
}
188208
}
189-
else
190-
{
191-
addSuppressed( lastError, errors );
209+
addSuppressed( throwable, errors );
192210

193-
return Mono.error( lastError );
194-
}
211+
return Mono.error( throwable );
195212
} );
196213
}
197214

@@ -249,9 +266,10 @@ private <T> void executeWork( CompletableFuture<T> resultFuture, Supplier<Comple
249266
} );
250267
}
251268

252-
private <T> void retryOnError( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work,
253-
long startTime, long retryDelayMs, Throwable error, List<Throwable> errors )
269+
private <T> void retryOnError( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work, long startTime, long retryDelayMs, Throwable throwable,
270+
List<Throwable> errors )
254271
{
272+
Throwable error = extractPossibleTerminationCause( throwable );
255273
if ( canRetryOn( error ) )
256274
{
257275
long currentTime = clock.millis();
@@ -269,8 +287,8 @@ private <T> void retryOnError( CompletableFuture<T> resultFuture, Supplier<Compl
269287
}
270288
}
271289

272-
addSuppressed( error, errors );
273-
resultFuture.completeExceptionally( error );
290+
addSuppressed( throwable, errors );
291+
resultFuture.completeExceptionally( throwable );
274292
}
275293

276294
private long computeDelayWithJitter( long delayMs )

0 commit comments

Comments
 (0)