Skip to content

Execute blocking tx functions in caller thread #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,32 +289,34 @@ CompletionStage<Boolean> currentConnectionIsOpen()

private <T> T transaction( AccessMode mode, TransactionWork<T> work )
{
return getBlocking( transactionAsync( mode, tx ->
// use different code path compared to async so that work is executed in the caller thread
// caller thread will also be the one who sleeps between retries;
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
// event loop thread will bock and wait for itself to read some data
return retryLogic.retry( () ->
{
try
{
// todo: given lambda can't be executed in even loop thread because it deadlocks
// todo: event loop executes a blocking operation and waits for itself to read from the network
// todo: this is most likely what happens...

// todo: use of supplyAsync is a hack and it makes blocking API very different from 1.4
// todo: because we now execute function in FJP.commonPool()

// todo: bring back blocking retries with sleeps and etc. so that we execute TxWork in caller thread
return CompletableFuture.supplyAsync( () -> work.execute( tx ) );
// T result = work.execute( tx );
// return completedFuture( result );
}
catch ( Throwable error )
try ( Transaction tx = getBlocking( beginTransactionAsync( mode ) ) )
{
return failedFuture( error );
try
{
T result = work.execute( tx );
tx.success();
return result;
}
catch ( Throwable t )
{
// mark transaction for failure if the given unit of work threw exception
// this will override any success marks that were made by the unit of work
tx.failure();
throw t;
}
}
} ) );
} );
}

private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work )
{
return retryLogic.retry( () ->
return retryLogic.retryAsync( () ->
{
CompletableFuture<T> resultFuture = new CompletableFuture<>();
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,48 @@ public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup
}

@Override
public <T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work )
public <T> T retry( Supplier<T> work )
{
List<Throwable> errors = null;
long startTime = -1;
long nextDelayMs = initialRetryDelayMs;

while ( true )
{
try
{
return work.get();
}
catch ( Throwable error )
{
if ( canRetryOn( error ) )
{
long currentTime = clock.millis();
if ( startTime == -1 )
{
startTime = currentTime;
}

long elapsedTime = currentTime - startTime;
if ( elapsedTime < maxRetryTimeMs )
{
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error );

sleep( delayWithJitterMs );
nextDelayMs = (long) (nextDelayMs * multiplier);
errors = recordError( error, errors );
continue;
}
}
addSuppressed( error, errors );
throw error;
}
}
}

@Override
public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
{
CompletableFuture<T> resultFuture = new CompletableFuture<>();
executeWorkInEventLoop( resultFuture, work );
Expand Down Expand Up @@ -109,7 +150,7 @@ private <T> void retryWorkInEventLoop( CompletableFuture<T> resultFuture, Suppli
EventExecutor eventExecutor = eventExecutorGroup.next();

long delayWithJitterMs = computeDelayWithJitter( delayMs );
log.warn( "Transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );

eventExecutor.schedule( () ->
{
Expand Down Expand Up @@ -185,6 +226,19 @@ private long computeDelayWithJitter( long delayMs )
return ThreadLocalRandom.current().nextLong( min, max + 1 );
}

private void sleep( long delayMs )
{
try
{
clock.sleep( delayMs );
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw new IllegalStateException( "Retries interrupted", e );
}
}

private void verifyAfterConstruction()
{
if ( maxRetryTimeMs < 0 )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@

public interface RetryLogic
{
<T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work );
<T> T retry( Supplier<T> work );

<T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work );
}
Loading