diff --git a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java index 872238962a..fc9860e9bf 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java +++ b/driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java @@ -25,7 +25,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.util.context.Context; -import reactor.util.function.Tuples; +import reactor.util.retry.Retry; import java.time.Duration; import java.util.ArrayList; @@ -34,7 +34,6 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.function.Supplier; import org.neo4j.driver.Logger; @@ -144,7 +143,7 @@ public CompletionStage retryAsync( Supplier> work ) @Override public Publisher retryRx( Publisher work ) { - return Flux.from( work ).retryWhen( retryRxCondition() ); + return Flux.from( work ).retryWhen( exponentialBackoffRetryRx() ); } protected boolean canRetryOn( Throwable error ) @@ -175,48 +174,46 @@ private static Throwable extractPossibleTerminationCause( Throwable error ) return error; } - private Function,Publisher> retryRxCondition() + private Retry exponentialBackoffRetryRx() { - return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> - { - - Throwable throwable = t2.getT1(); - Throwable error = extractPossibleTerminationCause( throwable ); - - Context ctx = t2.getT2(); - - List errors = ctx.getOrDefault( "errors", null ); - - long startTime = ctx.getOrDefault( "startTime", -1L ); - long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs ); - - if ( canRetryOn( error ) ) - { - long currentTime = clock.millis(); - if ( startTime == -1 ) + return Retry.from( retrySignals -> retrySignals.flatMap( retrySignal -> Mono.deferContextual( + contextView -> { - startTime = currentTime; - } + Throwable throwable = retrySignal.failure(); + Throwable error = extractPossibleTerminationCause( throwable ); - long elapsedTime = currentTime - startTime; - if ( elapsedTime < maxRetryTimeMs ) - { - long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); - log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); - - nextDelayMs = (long) (nextDelayMs * multiplier); - errors = recordError( error, errors ); + List errors = contextView.getOrDefault( "errors", null ); - // retry on netty event loop thread - EventExecutor eventExecutor = eventExecutorGroup.next(); - return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement( - Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); - } - } - addSuppressed( throwable, errors ); + if ( canRetryOn( error ) ) + { + long currentTime = clock.millis(); + + long startTime = contextView.getOrDefault( "startTime", currentTime ); + long nextDelayMs = contextView.getOrDefault( "nextDelayMs", initialRetryDelayMs ); + + long elapsedTime = currentTime - startTime; + if ( elapsedTime < maxRetryTimeMs ) + { + long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); + log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); + + nextDelayMs = (long) (nextDelayMs * multiplier); + errors = recordError( error, errors ); + + // retry on netty event loop thread + EventExecutor eventExecutor = eventExecutorGroup.next(); + Context context = Context.of( + "errors", errors, + "startTime", startTime, + "nextDelayMs", nextDelayMs + ); + return Mono.just( context ).delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); + } + } + addSuppressed( throwable, errors ); - return Mono.error( throwable ); - } ); + return Mono.error( throwable ); + } ) ) ); } private void executeWorkInEventLoop( CompletableFuture resultFuture, Supplier> work ) diff --git a/pom.xml b/pom.xml index b29b521bf3..3a1a703e50 100644 --- a/pom.xml +++ b/pom.xml @@ -28,22 +28,22 @@ - 4.1.65.Final + 4.1.70.Final - Dysprosium-SR21 + 2020.0.13 2.2.21 - 1.7.31 + 1.7.32 2.0.0.0 2.28.2 - 5.7.2 - 1.1.0 + 5.8.1 + 1.2.0 1.69 - 1.2.3 - 2.12.3 - 1.18.20 - 20.3.2 + 1.2.6 + 2.13.0 + 1.18.22 + 20.3.4