Skip to content

Commit 5ecb109

Browse files
authored
Merge pull request #420 from lutovich/1.5-tx-function-in-caller-thread
Execute blocking tx functions in caller thread
2 parents d8c0d37 + 28f288f commit 5ecb109

File tree

6 files changed

+719
-49
lines changed

6 files changed

+719
-49
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -289,32 +289,34 @@ CompletionStage<Boolean> currentConnectionIsOpen()
289289

290290
private <T> T transaction( AccessMode mode, TransactionWork<T> work )
291291
{
292-
return getBlocking( transactionAsync( mode, tx ->
292+
// use different code path compared to async so that work is executed in the caller thread
293+
// caller thread will also be the one who sleeps between retries;
294+
// it is unsafe to execute retries in the event loop threads because this can cause a deadlock
295+
// event loop thread will bock and wait for itself to read some data
296+
return retryLogic.retry( () ->
293297
{
294-
try
295-
{
296-
// todo: given lambda can't be executed in even loop thread because it deadlocks
297-
// todo: event loop executes a blocking operation and waits for itself to read from the network
298-
// todo: this is most likely what happens...
299-
300-
// todo: use of supplyAsync is a hack and it makes blocking API very different from 1.4
301-
// todo: because we now execute function in FJP.commonPool()
302-
303-
// todo: bring back blocking retries with sleeps and etc. so that we execute TxWork in caller thread
304-
return CompletableFuture.supplyAsync( () -> work.execute( tx ) );
305-
// T result = work.execute( tx );
306-
// return completedFuture( result );
307-
}
308-
catch ( Throwable error )
298+
try ( Transaction tx = getBlocking( beginTransactionAsync( mode ) ) )
309299
{
310-
return failedFuture( error );
300+
try
301+
{
302+
T result = work.execute( tx );
303+
tx.success();
304+
return result;
305+
}
306+
catch ( Throwable t )
307+
{
308+
// mark transaction for failure if the given unit of work threw exception
309+
// this will override any success marks that were made by the unit of work
310+
tx.failure();
311+
throw t;
312+
}
311313
}
312-
} ) );
314+
} );
313315
}
314316

315317
private <T> CompletionStage<T> transactionAsync( AccessMode mode, TransactionWork<CompletionStage<T>> work )
316318
{
317-
return retryLogic.retry( () ->
319+
return retryLogic.retryAsync( () ->
318320
{
319321
CompletableFuture<T> resultFuture = new CompletableFuture<>();
320322
CompletionStage<ExplicitTransaction> txFuture = beginTransactionAsync( mode );

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,48 @@ public ExponentialBackoffRetryLogic( RetrySettings settings, EventExecutorGroup
8080
}
8181

8282
@Override
83-
public <T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work )
83+
public <T> T retry( Supplier<T> work )
84+
{
85+
List<Throwable> errors = null;
86+
long startTime = -1;
87+
long nextDelayMs = initialRetryDelayMs;
88+
89+
while ( true )
90+
{
91+
try
92+
{
93+
return work.get();
94+
}
95+
catch ( Throwable error )
96+
{
97+
if ( canRetryOn( error ) )
98+
{
99+
long currentTime = clock.millis();
100+
if ( startTime == -1 )
101+
{
102+
startTime = currentTime;
103+
}
104+
105+
long elapsedTime = currentTime - startTime;
106+
if ( elapsedTime < maxRetryTimeMs )
107+
{
108+
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
109+
log.warn( "Transaction failed and will be retried in " + delayWithJitterMs + "ms", error );
110+
111+
sleep( delayWithJitterMs );
112+
nextDelayMs = (long) (nextDelayMs * multiplier);
113+
errors = recordError( error, errors );
114+
continue;
115+
}
116+
}
117+
addSuppressed( error, errors );
118+
throw error;
119+
}
120+
}
121+
}
122+
123+
@Override
124+
public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
84125
{
85126
CompletableFuture<T> resultFuture = new CompletableFuture<>();
86127
executeWorkInEventLoop( resultFuture, work );
@@ -109,7 +150,7 @@ private <T> void retryWorkInEventLoop( CompletableFuture<T> resultFuture, Suppli
109150
EventExecutor eventExecutor = eventExecutorGroup.next();
110151

111152
long delayWithJitterMs = computeDelayWithJitter( delayMs );
112-
log.warn( "Transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
153+
log.warn( "Async transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
113154

114155
eventExecutor.schedule( () ->
115156
{
@@ -185,6 +226,19 @@ private long computeDelayWithJitter( long delayMs )
185226
return ThreadLocalRandom.current().nextLong( min, max + 1 );
186227
}
187228

229+
private void sleep( long delayMs )
230+
{
231+
try
232+
{
233+
clock.sleep( delayMs );
234+
}
235+
catch ( InterruptedException e )
236+
{
237+
Thread.currentThread().interrupt();
238+
throw new IllegalStateException( "Retries interrupted", e );
239+
}
240+
}
241+
188242
private void verifyAfterConstruction()
189243
{
190244
if ( maxRetryTimeMs < 0 )

driver/src/main/java/org/neo4j/driver/internal/retry/RetryLogic.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,7 @@
2424

2525
public interface RetryLogic
2626
{
27-
<T> CompletionStage<T> retry( Supplier<CompletionStage<T>> work );
27+
<T> T retry( Supplier<T> work );
28+
29+
<T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work );
2830
}

0 commit comments

Comments
 (0)