|
25 | 25 | import reactor.core.publisher.Mono;
|
26 | 26 | import reactor.core.scheduler.Schedulers;
|
27 | 27 | import reactor.util.context.Context;
|
28 |
| -import reactor.util.function.Tuples; |
| 28 | +import reactor.util.retry.Retry; |
29 | 29 |
|
30 | 30 | import java.time.Duration;
|
31 | 31 | import java.util.ArrayList;
|
|
34 | 34 | import java.util.concurrent.CompletionStage;
|
35 | 35 | import java.util.concurrent.ThreadLocalRandom;
|
36 | 36 | import java.util.concurrent.TimeUnit;
|
37 |
| -import java.util.function.Function; |
38 | 37 | import java.util.function.Supplier;
|
39 | 38 |
|
40 | 39 | import org.neo4j.driver.Logger;
|
@@ -144,7 +143,7 @@ public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
|
144 | 143 | @Override
|
145 | 144 | public <T> Publisher<T> retryRx( Publisher<T> work )
|
146 | 145 | {
|
147 |
| - return Flux.from( work ).retryWhen( retryRxCondition() ); |
| 146 | + return Flux.from( work ).retryWhen( exponentialBackoffRetryRx() ); |
148 | 147 | }
|
149 | 148 |
|
150 | 149 | protected boolean canRetryOn( Throwable error )
|
@@ -175,48 +174,46 @@ private static Throwable extractPossibleTerminationCause( Throwable error )
|
175 | 174 | return error;
|
176 | 175 | }
|
177 | 176 |
|
178 |
| - private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition() |
| 177 | + private Retry exponentialBackoffRetryRx() |
179 | 178 | {
|
180 |
| - return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 -> |
181 |
| - { |
182 |
| - |
183 |
| - Throwable throwable = t2.getT1(); |
184 |
| - Throwable error = extractPossibleTerminationCause( throwable ); |
185 |
| - |
186 |
| - Context ctx = t2.getT2(); |
187 |
| - |
188 |
| - List<Throwable> errors = ctx.getOrDefault( "errors", null ); |
189 |
| - |
190 |
| - long startTime = ctx.getOrDefault( "startTime", -1L ); |
191 |
| - long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs ); |
192 |
| - |
193 |
| - if ( canRetryOn( error ) ) |
194 |
| - { |
195 |
| - long currentTime = clock.millis(); |
196 |
| - if ( startTime == -1 ) |
| 179 | + return Retry.from( retrySignals -> retrySignals.flatMap( retrySignal -> Mono.deferContextual( |
| 180 | + contextView -> |
197 | 181 | {
|
198 |
| - startTime = currentTime; |
199 |
| - } |
| 182 | + Throwable throwable = retrySignal.failure(); |
| 183 | + Throwable error = extractPossibleTerminationCause( throwable ); |
200 | 184 |
|
201 |
| - long elapsedTime = currentTime - startTime; |
202 |
| - if ( elapsedTime < maxRetryTimeMs ) |
203 |
| - { |
204 |
| - long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); |
205 |
| - log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); |
206 |
| - |
207 |
| - nextDelayMs = (long) (nextDelayMs * multiplier); |
208 |
| - errors = recordError( error, errors ); |
| 185 | + List<Throwable> errors = contextView.getOrDefault( "errors", null ); |
209 | 186 |
|
210 |
| - // retry on netty event loop thread |
211 |
| - EventExecutor eventExecutor = eventExecutorGroup.next(); |
212 |
| - return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement( |
213 |
| - Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); |
214 |
| - } |
215 |
| - } |
216 |
| - addSuppressed( throwable, errors ); |
| 187 | + if ( canRetryOn( error ) ) |
| 188 | + { |
| 189 | + long currentTime = clock.millis(); |
| 190 | + |
| 191 | + long startTime = contextView.getOrDefault( "startTime", currentTime ); |
| 192 | + long nextDelayMs = contextView.getOrDefault( "nextDelayMs", initialRetryDelayMs ); |
| 193 | + |
| 194 | + long elapsedTime = currentTime - startTime; |
| 195 | + if ( elapsedTime < maxRetryTimeMs ) |
| 196 | + { |
| 197 | + long delayWithJitterMs = computeDelayWithJitter( nextDelayMs ); |
| 198 | + log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error ); |
| 199 | + |
| 200 | + nextDelayMs = (long) (nextDelayMs * multiplier); |
| 201 | + errors = recordError( error, errors ); |
| 202 | + |
| 203 | + // retry on netty event loop thread |
| 204 | + EventExecutor eventExecutor = eventExecutorGroup.next(); |
| 205 | + Context context = Context.of( |
| 206 | + "errors", errors, |
| 207 | + "startTime", startTime, |
| 208 | + "nextDelayMs", nextDelayMs |
| 209 | + ); |
| 210 | + return Mono.just( context ).delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) ); |
| 211 | + } |
| 212 | + } |
| 213 | + addSuppressed( throwable, errors ); |
217 | 214 |
|
218 |
| - return Mono.error( throwable ); |
219 |
| - } ); |
| 215 | + return Mono.error( throwable ); |
| 216 | + } ) ) ); |
220 | 217 | }
|
221 | 218 |
|
222 | 219 | private <T> void executeWorkInEventLoop( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work )
|
|
0 commit comments