Skip to content

Commit e246f31

Browse files
committed
Execute blocking tx functions in caller thread
This commit brings back blocking retry logic that performs retries in the caller thread and uses `Thread#sleep()` between retries. It is needed because blocking tx functions can't run in event loop as they perform blocking operations. Event loop thread can deadlock waiting for itself to read from the network. Previously code used a "hack" and executed given function in `ForkJoinPool.commonPool()`. Also added couple tests for retry logic and async transaction functions.
1 parent 1e805d0 commit e246f31

File tree

6 files changed

+719
-49
lines changed

6 files changed

+719
-49
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -277,32 +277,34 @@ CompletionStage<Boolean> currentConnectionIsOpen()
277277

278278
private <T> T transaction( AccessMode mode, TransactionWork<T> work )
279279
{
280-
return getBlocking( transactionAsync( mode, tx ->
280+
// use different code path compared to async so that work is executed in the caller thread
281+
// caller thread will also be the one who sleeps between retries;
282+
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
283+
// event loop thread will bock and wait for itself to read some data
284+
return retryLogic.retry( () ->
281285
{
282-
try
283-
{
284-
// todo: given lambda can't be executed in even loop thread because it deadlocks
285-
// todo: event loop executes a blocking operation and waits for itself to read from the network
286-
// todo: this is most likely what happens...
287-
288-
// todo: use of supplyAsync is a hack and it makes blocking API very different from 1.4
289-
// todo: because we now execute function in FJP.commonPool()
290-
291-
// todo: bring back blocking retries with sleeps and etc. so that we execute TxWork in caller thread
292-
return CompletableFuture.supplyAsync( () -> work.execute( tx ) );
293-
// T result = work.execute( tx );
294-
// return completedFuture( result );
295-
}
296-
catch ( Throwable error )
286+
try ( Transaction tx = getBlocking( beginTransactionAsync( mode ) ) )
297287
{
298-
return failedFuture( error );
288+
try
289+
{
290+
T result = work.execute( tx );
291+
tx.success();
292+
return result;
293+
}
294+
catch ( Throwable t )
295+
{
296+
// mark transaction for failure if the given unit of work threw exception
297+
// this will override any success marks that were made by the unit of work
298+
tx.failure();
299+
throw t;
300+
}
299301
}
300-
} ) );
302+
} );
301303
}
302304

303305
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work )
304306
{
305-
return retryLogic.retry( () ->
307+
return retryLogic.retryAsync( () ->
306308
{
307309
CompletableFuture<T> resultFuture = new CompletableFuture<>();
308310
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode );

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,48 @@ public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup
8080
}
8181

8282
@Override
83-
public <T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work )
83+
public <T> T retry( Supplier<T> work )
84+
{
85+
List<Throwable> errors = null;
86+
long startTime = -1;
87+
long nextDelayMs = initialRetryDelayMs;
88+
89+
while ( true )
90+
{
91+
try
92+
{
93+
return work.get();
94+
}
95+
catch ( Throwable error )
96+
{
97+
if ( canRetryOn( error ) )
98+
{
99+
long currentTime = clock.millis();
100+
if ( startTime == -1 )
101+
{
102+
startTime = currentTime;
103+
}
104+
105+
long elapsedTime = currentTime - startTime;
106+
if ( elapsedTime < maxRetryTimeMs )
107+
{
108+
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
109+
log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error );
110+
111+
sleep( delayWithJitterMs );
112+
nextDelayMs = (long) (nextDelayMs * multiplier);
113+
errors = recordError( error, errors );
114+
continue;
115+
}
116+
}
117+
addSuppressed( error, errors );
118+
throw error;
119+
}
120+
}
121+
}
122+
123+
@Override
124+
public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
84125
{
85126
CompletableFuture<T> resultFuture = new CompletableFuture<>();
86127
executeWorkInEventLoop( resultFuture, work );
@@ -109,7 +150,7 @@ private <T> void retryWorkInEventLoop( CompletableFuture<T> resultFuture, Suppli
109150
EventExecutor eventExecutor = eventExecutorGroup.next();
110151

111152
long delayWithJitterMs = computeDelayWithJitter( delayMs );
112-
log.warn( "Transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
153+
log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
113154

114155
eventExecutor.schedule( () ->
115156
{
@@ -185,6 +226,19 @@ private long computeDelayWithJitter( long delayMs )
185226
return ThreadLocalRandom.current().nextLong( min, max + 1 );
186227
}
187228

229+
private void sleep( long delayMs )
230+
{
231+
try
232+
{
233+
clock.sleep( delayMs );
234+
}
235+
catch ( InterruptedException e )
236+
{
237+
Thread.currentThread().interrupt();
238+
throw new IllegalStateException( "Retries interrupted", e );
239+
}
240+
}
241+
188242
private void verifyAfterConstruction()
189243
{
190244
if ( maxRetryTimeMs < 0 )

driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,7 @@
2424

2525
public interface RetryLogic
2626
{
27-
<T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work );
27+
<T> T retry( Supplier<T> work );
28+
29+
<T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work );
2830
}

0 commit comments

Comments
 (0)