Skip to content

Commit 28f288f

Browse files
committed
Execute blocking tx functions in caller thread
This commit brings back blocking retry logic that performs retries in the caller thread and uses `Thread#sleep()` between retries. It is needed because blocking tx functions can't run in event loop as they perform blocking operations. Event loop thread can deadlock waiting for itself to read from the network. Previously code used a "hack" and executed given function in `ForkJoinPool.commonPool()`. Also added couple tests for retry logic and async transaction functions.
1 parent d8c0d37 commit 28f288f

File tree

6 files changed

+719
-49
lines changed

6 files changed

+719
-49
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -289,32 +289,34 @@ CompletionStage<Boolean> currentConnectionIsOpen()
289289

290290
private <T> T transaction( AccessMode mode, TransactionWork<T> work )
291291
{
292-
return getBlocking( transactionAsync( mode, tx ->
292+
// use different code path compared to async so that work is executed in the caller thread
293+
// caller thread will also be the one who sleeps between retries;
294+
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
295+
// event loop thread will bock and wait for itself to read some data
296+
return retryLogic.retry( () ->
293297
{
294-
try
295-
{
296-
// todo: given lambda can't be executed in even loop thread because it deadlocks
297-
// todo: event loop executes a blocking operation and waits for itself to read from the network
298-
// todo: this is most likely what happens...
299-
300-
// todo: use of supplyAsync is a hack and it makes blocking API very different from 1.4
301-
// todo: because we now execute function in FJP.commonPool()
302-
303-
// todo: bring back blocking retries with sleeps and etc. so that we execute TxWork in caller thread
304-
return CompletableFuture.supplyAsync( () -> work.execute( tx ) );
305-
// T result = work.execute( tx );
306-
// return completedFuture( result );
307-
}
308-
catch ( Throwable error )
298+
try ( Transaction tx = getBlocking( beginTransactionAsync( mode ) ) )
309299
{
310-
return failedFuture( error );
300+
try
301+
{
302+
T result = work.execute( tx );
303+
tx.success();
304+
return result;
305+
}
306+
catch ( Throwable t )
307+
{
308+
// mark transaction for failure if the given unit of work threw exception
309+
// this will override any success marks that were made by the unit of work
310+
tx.failure();
311+
throw t;
312+
}
311313
}
312-
} ) );
314+
} );
313315
}
314316

315317
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work )
316318
{
317-
return retryLogic.retry( () ->
319+
return retryLogic.retryAsync( () ->
318320
{
319321
CompletableFuture<T> resultFuture = new CompletableFuture<>();
320322
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode );

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,48 @@ public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup
8080
}
8181

8282
@Override
83-
public <T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work )
83+
public <T> T retry( Supplier<T> work )
84+
{
85+
List<Throwable> errors = null;
86+
long startTime = -1;
87+
long nextDelayMs = initialRetryDelayMs;
88+
89+
while ( true )
90+
{
91+
try
92+
{
93+
return work.get();
94+
}
95+
catch ( Throwable error )
96+
{
97+
if ( canRetryOn( error ) )
98+
{
99+
long currentTime = clock.millis();
100+
if ( startTime == -1 )
101+
{
102+
startTime = currentTime;
103+
}
104+
105+
long elapsedTime = currentTime - startTime;
106+
if ( elapsedTime < maxRetryTimeMs )
107+
{
108+
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
109+
log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error );
110+
111+
sleep( delayWithJitterMs );
112+
nextDelayMs = (long) (nextDelayMs * multiplier);
113+
errors = recordError( error, errors );
114+
continue;
115+
}
116+
}
117+
addSuppressed( error, errors );
118+
throw error;
119+
}
120+
}
121+
}
122+
123+
@Override
124+
public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
84125
{
85126
CompletableFuture<T> resultFuture = new CompletableFuture<>();
86127
executeWorkInEventLoop( resultFuture, work );
@@ -109,7 +150,7 @@ private <T> void retryWorkInEventLoop( CompletableFuture<T> resultFuture, Suppli
109150
EventExecutor eventExecutor = eventExecutorGroup.next();
110151

111152
long delayWithJitterMs = computeDelayWithJitter( delayMs );
112-
log.warn( "Transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
153+
log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
113154

114155
eventExecutor.schedule( () ->
115156
{
@@ -185,6 +226,19 @@ private long computeDelayWithJitter( long delayMs )
185226
return ThreadLocalRandom.current().nextLong( min, max + 1 );
186227
}
187228

229+
private void sleep( long delayMs )
230+
{
231+
try
232+
{
233+
clock.sleep( delayMs );
234+
}
235+
catch ( InterruptedException e )
236+
{
237+
Thread.currentThread().interrupt();
238+
throw new IllegalStateException( "Retries interrupted", e );
239+
}
240+
}
241+
188242
private void verifyAfterConstruction()
189243
{
190244
if ( maxRetryTimeMs < 0 )

driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,7 @@
2424

2525
public interface RetryLogic
2626
{
27-
<T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work );
27+
<T> T retry( Supplier<T> work );
28+
29+
<T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work );
2830
}

0 commit comments

Comments
 (0)