Skip to content

Don’t swallow the cause of a TX termination. (#731) #737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ public CompletionStage<ResultCursor> runAsync(Query query)
return tx.runAsync(query, true );
}

public void markTerminated()
{
tx.markTerminated();
}

public boolean isOpen()
{
return tx.isOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public CompletionStage<Void> resetAsync()
{
if ( tx != null )
{
tx.markTerminated();
tx.markTerminated( null );
}
} )
.thenCompose( ignore -> connectionStage )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.neo4j.driver.internal.async;

import java.util.EnumSet;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -58,13 +59,66 @@ private enum State
ROLLED_BACK
}

/**
* This is a holder so that we can have ony the state volatile in the tx without having to synchronize the whole block.
*/
private static final class StateHolder
{
private static final EnumSet<State> OPEN_STATES = EnumSet.of( State.ACTIVE, State.TERMINATED );
private static final StateHolder ACTIVE_HOLDER = new StateHolder( State.ACTIVE, null );
private static final StateHolder COMMITTED_HOLDER = new StateHolder( State.COMMITTED, null );
private static final StateHolder ROLLED_BACK_HOLDER = new StateHolder( State.ROLLED_BACK, null );

/**
* The actual state.
*/
final State value;

/**
* If this holder contains a state of {@link State#TERMINATED}, this represents the cause if any.
*/
final Throwable causeOfTermination;

static StateHolder of( State value )
{
switch ( value )
{
case ACTIVE:
return ACTIVE_HOLDER;
case COMMITTED:
return COMMITTED_HOLDER;
case ROLLED_BACK:
return ROLLED_BACK_HOLDER;
case TERMINATED:
default:
throw new IllegalArgumentException( "Cannot provide a default state holder for state " + value );
}
}

static StateHolder terminatedWith( Throwable cause )
{
return new StateHolder( State.TERMINATED, cause );
}

private StateHolder( State value, Throwable causeOfTermination )
{
this.value = value;
this.causeOfTermination = causeOfTermination;
}

boolean isOpen()
{
return OPEN_STATES.contains( this.value );
}
}

private final Connection connection;
private final BoltProtocol protocol;
private final BookmarkHolder bookmarkHolder;
private final ResultCursorsHolder resultCursors;
private final long fetchSize;

private volatile State state = State.ACTIVE;
private volatile StateHolder state = StateHolder.of( State.ACTIVE );

public UnmanagedTransaction(Connection connection, BookmarkHolder bookmarkHolder, long fetchSize )
{
Expand Down Expand Up @@ -104,11 +158,11 @@ public CompletionStage<Void> closeAsync()

public CompletionStage<Void> commitAsync()
{
if ( state == State.COMMITTED )
if ( state.value == State.COMMITTED )
{
return failedFuture( new ClientException( "Can't commit, transaction has been committed" ) );
}
else if ( state == State.ROLLED_BACK )
else if ( state.value == State.ROLLED_BACK )
{
return failedFuture( new ClientException( "Can't commit, transaction has been rolled back" ) );
}
Expand All @@ -122,11 +176,11 @@ else if ( state == State.ROLLED_BACK )

public CompletionStage<Void> rollbackAsync()
{
if ( state == State.COMMITTED )
if ( state.value == State.COMMITTED )
{
return failedFuture( new ClientException( "Can't rollback, transaction has been committed" ) );
}
else if ( state == State.ROLLED_BACK )
else if ( state.value == State.ROLLED_BACK )
{
return failedFuture( new ClientException( "Can't rollback, transaction has been rolled back" ) );
}
Expand Down Expand Up @@ -158,12 +212,12 @@ public CompletionStage<RxResultCursor> runRx(Query query)

public boolean isOpen()
{
return state != State.COMMITTED && state != State.ROLLED_BACK;
return state.isOpen();
}

public void markTerminated()
public void markTerminated( Throwable cause )
{
state = State.TERMINATED;
state = StateHolder.terminatedWith( cause );
}

public Connection connection()
Expand All @@ -173,34 +227,34 @@ public Connection connection()

private void ensureCanRunQueries()
{
if ( state == State.COMMITTED )
if ( state.value == State.COMMITTED )
{
throw new ClientException( "Cannot run more queries in this transaction, it has been committed" );
}
else if ( state == State.ROLLED_BACK )
else if ( state.value == State.ROLLED_BACK )
{
throw new ClientException( "Cannot run more queries in this transaction, it has been rolled back" );
}
else if ( state == State.TERMINATED )
else if ( state.value == State.TERMINATED )
{
throw new ClientException( "Cannot run more queries in this transaction, " +
"it has either experienced an fatal error or was explicitly terminated" );
"it has either experienced an fatal error or was explicitly terminated", state.causeOfTermination );
}
}

private CompletionStage<Void> doCommitAsync()
{
if ( state == State.TERMINATED )
if ( state.value == State.TERMINATED )
{
return failedFuture( new ClientException( "Transaction can't be committed. " +
"It has been rolled back either because of an error or explicit termination" ) );
"It has been rolled back either because of an error or explicit termination", state.causeOfTermination ) );
}
return protocol.commitTransaction( connection ).thenAccept( bookmarkHolder::setBookmark );
}

private CompletionStage<Void> doRollbackAsync()
{
if ( state == State.TERMINATED )
if ( state.value == State.TERMINATED )
{
return completedWithNull();
}
Expand All @@ -224,11 +278,11 @@ private void transactionClosed( boolean isCommitted )
{
if ( isCommitted )
{
state = State.COMMITTED;
state = StateHolder.of( State.COMMITTED );
}
else
{
state = State.ROLLED_BACK;
state = StateHolder.of( State.ROLLED_BACK );
}
connection.release(); // release in background
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ public void afterFailure( Throwable error )
// always mark transaction as terminated because every error is "acknowledged" with a RESET message
// so database forgets about the transaction after the first error
// such transaction should not attempt to commit and can be considered as rolled back
tx.markTerminated();
tx.markTerminated( error );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public RxResult run(Query query)
// The logic here shall be the same as `TransactionPullResponseHandler#afterFailure` as that is where cursor handling failure
// This is optional as tx still holds a reference to all cursor futures and they will be clean up properly in commit
Throwable error = Futures.completionExceptionCause( completionError );
tx.markTerminated();
tx.markTerminated( error );
cursorFuture.completeExceptionally( error );
}
} );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
Expand Down Expand Up @@ -100,8 +101,9 @@ public <T> T retry( Supplier<T> work )
{
return work.get();
}
catch ( Throwable error )
catch ( Throwable throwable )
{
Throwable error = extractPossibleTerminationCause( throwable );
if ( canRetryOn( error ) )
{
long currentTime = clock.millis();
Expand All @@ -122,8 +124,10 @@ public <T> T retry( Supplier<T> work )
continue;
}
}
addSuppressed( error, errors );
throw error;

// Add the original error in case we didn't continue the loop from within the if above.
addSuppressed( throwable, errors );
throw throwable;
}
}
}
Expand All @@ -144,54 +148,67 @@ public <T> Publisher<T> retryRx( Publisher<T> work )

protected boolean canRetryOn( Throwable error )
{
return error instanceof SessionExpiredException ||
error instanceof ServiceUnavailableException ||
isTransientError( error );
return error instanceof SessionExpiredException || error instanceof ServiceUnavailableException || isTransientError( error );
}

/**
* Extracts the possible cause of a transaction that has been marked terminated.
*
* @param error
* @return
*/
private static Throwable extractPossibleTerminationCause( Throwable error )
{

// Having a dedicated "TerminatedException" inheriting from ClientException might be a good idea.
if ( error instanceof ClientException && error.getCause() != null )
{
return error.getCause();
}
return error;
}

private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition()
{
return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> {
Throwable lastError = t2.getT1();
return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 ->
{

Throwable throwable = t2.getT1();
Throwable error = extractPossibleTerminationCause( throwable );

Context ctx = t2.getT2();

List<Throwable> errors = ctx.getOrDefault( "errors", null );

long startTime = ctx.getOrDefault( "startTime", -1L );
long startTime = ctx.getOrDefault( "startTime", -1L );
long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs );

if( !canRetryOn( lastError ) )
if ( canRetryOn( error ) )
{
addSuppressed( lastError, errors );
return Mono.error( lastError );
}

long currentTime = clock.millis();
if ( startTime == -1 )
{
startTime = currentTime;
}
long currentTime = clock.millis();
if ( startTime == -1 )
{
startTime = currentTime;
}

long elapsedTime = currentTime - startTime;
if ( elapsedTime < maxRetryTimeMs )
{
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", lastError );
long elapsedTime = currentTime - startTime;
if ( elapsedTime < maxRetryTimeMs )
{
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );

nextDelayMs = (long) (nextDelayMs * multiplier);
errors = recordError( lastError, errors );
nextDelayMs = (long) (nextDelayMs * multiplier);
errors = recordError( error, errors );

// retry on netty event loop thread
EventExecutor eventExecutor = eventExecutorGroup.next();
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) )
.delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
// retry on netty event loop thread
EventExecutor eventExecutor = eventExecutorGroup.next();
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement(
Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
}
}
else
{
addSuppressed( lastError, errors );
addSuppressed( throwable, errors );

return Mono.error( lastError );
}
return Mono.error( throwable );
} );
}

Expand Down Expand Up @@ -249,9 +266,10 @@ private <T> void executeWork( CompletableFuture<T> resultFuture, Supplier<Comple
} );
}

private <T> void retryOnError( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work,
long startTime, long retryDelayMs, Throwable error, List<Throwable> errors )
private <T> void retryOnError( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work, long startTime, long retryDelayMs, Throwable throwable,
List<Throwable> errors )
{
Throwable error = extractPossibleTerminationCause( throwable );
if ( canRetryOn( error ) )
{
long currentTime = clock.millis();
Expand All @@ -269,8 +287,8 @@ private <T> void retryOnError( CompletableFuture<T> resultFuture, Supplier<Compl
}
}

addSuppressed( error, errors );
resultFuture.completeExceptionally( error );
addSuppressed( throwable, errors );
resultFuture.completeExceptionally( throwable );
}

private long computeDelayWithJitter( long delayMs )
Expand Down
Loading