25
25
import reactor .core .publisher .Mono ;
26
26
import reactor .core .scheduler .Schedulers ;
27
27
import reactor .util .context .Context ;
28
- import reactor .util .function . Tuples ;
28
+ import reactor .util .retry . Retry ;
29
29
30
30
import java .time .Duration ;
31
31
import java .util .ArrayList ;
34
34
import java .util .concurrent .CompletionStage ;
35
35
import java .util .concurrent .ThreadLocalRandom ;
36
36
import java .util .concurrent .TimeUnit ;
37
- import java .util .function .Function ;
38
37
import java .util .function .Supplier ;
39
38
40
39
import org .neo4j .driver .Logger ;
@@ -143,7 +142,7 @@ public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
143
142
@ Override
144
143
public <T > Publisher <T > retryRx ( Publisher <T > work )
145
144
{
146
- return Flux .from ( work ).retryWhen ( retryRxCondition () );
145
+ return Flux .from ( work ).retryWhen ( exponentialBackoffRetryRx () );
147
146
}
148
147
149
148
protected boolean canRetryOn ( Throwable error )
@@ -168,48 +167,46 @@ private static Throwable extractPossibleTerminationCause( Throwable error )
168
167
return error ;
169
168
}
170
169
171
- private Function < Flux < Throwable >, Publisher < Context >> retryRxCondition ()
170
+ private Retry exponentialBackoffRetryRx ()
172
171
{
173
- return errorCurrentAttempt -> errorCurrentAttempt .flatMap ( e -> Mono .subscriberContext ().map ( ctx -> Tuples .of ( e , ctx ) ) ).flatMap ( t2 ->
174
- {
175
-
176
- Throwable throwable = t2 .getT1 ();
177
- Throwable error = extractPossibleTerminationCause ( throwable );
178
-
179
- Context ctx = t2 .getT2 ();
180
-
181
- List <Throwable > errors = ctx .getOrDefault ( "errors" , null );
182
-
183
- long startTime = ctx .getOrDefault ( "startTime" , -1L );
184
- long nextDelayMs = ctx .getOrDefault ( "nextDelayMs" , initialRetryDelayMs );
185
-
186
- if ( canRetryOn ( error ) )
187
- {
188
- long currentTime = clock .millis ();
189
- if ( startTime == -1 )
172
+ return Retry .from ( retrySignals -> retrySignals .flatMap ( retrySignal -> Mono .deferContextual (
173
+ contextView ->
190
174
{
191
- startTime = currentTime ;
192
- }
175
+ Throwable throwable = retrySignal . failure () ;
176
+ Throwable error = extractPossibleTerminationCause ( throwable );
193
177
194
- long elapsedTime = currentTime - startTime ;
195
- if ( elapsedTime < maxRetryTimeMs )
196
- {
197
- long delayWithJitterMs = computeDelayWithJitter ( nextDelayMs );
198
- log .warn ( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms" , error );
199
-
200
- nextDelayMs = (long ) (nextDelayMs * multiplier );
201
- errors = recordError ( error , errors );
178
+ List <Throwable > errors = contextView .getOrDefault ( "errors" , null );
202
179
203
- // retry on netty event loop thread
204
- EventExecutor eventExecutor = eventExecutorGroup .next ();
205
- return Mono .just ( ctx .put ( "errors" , errors ).put ( "startTime" , startTime ).put ( "nextDelayMs" , nextDelayMs ) ).delayElement (
206
- Duration .ofMillis ( delayWithJitterMs ), Schedulers .fromExecutorService ( eventExecutor ) );
207
- }
208
- }
209
- addSuppressed ( throwable , errors );
180
+ if ( canRetryOn ( error ) )
181
+ {
182
+ long currentTime = clock .millis ();
183
+
184
+ long startTime = contextView .getOrDefault ( "startTime" , currentTime );
185
+ long nextDelayMs = contextView .getOrDefault ( "nextDelayMs" , initialRetryDelayMs );
186
+
187
+ long elapsedTime = currentTime - startTime ;
188
+ if ( elapsedTime < maxRetryTimeMs )
189
+ {
190
+ long delayWithJitterMs = computeDelayWithJitter ( nextDelayMs );
191
+ log .warn ( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms" , error );
192
+
193
+ nextDelayMs = (long ) (nextDelayMs * multiplier );
194
+ errors = recordError ( error , errors );
195
+
196
+ // retry on netty event loop thread
197
+ EventExecutor eventExecutor = eventExecutorGroup .next ();
198
+ Context context = Context .of (
199
+ "errors" , errors ,
200
+ "startTime" , startTime ,
201
+ "nextDelayMs" , nextDelayMs
202
+ );
203
+ return Mono .just ( context ).delayElement ( Duration .ofMillis ( delayWithJitterMs ), Schedulers .fromExecutorService ( eventExecutor ) );
204
+ }
205
+ }
206
+ addSuppressed ( throwable , errors );
210
207
211
- return Mono .error ( throwable );
212
- } );
208
+ return Mono .error ( throwable );
209
+ } ) ) );
213
210
}
214
211
215
212
private <T > void executeWorkInEventLoop ( CompletableFuture <T > resultFuture , Supplier <CompletionStage <T >> work )
0 commit comments