Skip to content

Commit cb4941b

Browse files
committed
Remove reactor-bom and upgrade to reactor 3.4.7
1 parent 7b643cd commit cb4941b

File tree

2 files changed

+54
-45
lines changed

2 files changed

+54
-45
lines changed

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,17 @@
2525
import reactor.core.publisher.Mono;
2626
import reactor.core.scheduler.Schedulers;
2727
import reactor.util.context.Context;
28+
import reactor.util.context.ContextView;
2829
import reactor.util.function.Tuples;
30+
import reactor.util.retry.Retry;
2931

3032
import java.time.Duration;
31-
import java.util.ArrayList;
33+
import java.util.LinkedList;
3234
import java.util.List;
3335
import java.util.concurrent.CompletableFuture;
3436
import java.util.concurrent.CompletionStage;
3537
import java.util.concurrent.ThreadLocalRandom;
3638
import java.util.concurrent.TimeUnit;
37-
import java.util.function.Function;
3839
import java.util.function.Supplier;
3940

4041
import org.neo4j.driver.Logger;
@@ -145,7 +146,7 @@ public <T> CompletionStage<T> retryAsync( Supplier<CompletionStage<T>> work )
145146
@Override
146147
public <T> Publisher<T> retryRx( Publisher<T> work )
147148
{
148-
return Flux.from( work ).retryWhen( retryRxCondition() );
149+
return Flux.from( work ).retryWhen( exponentialBackoffRetryRx() );
149150
}
150151

151152
protected boolean canRetryOn( Throwable error )
@@ -177,48 +178,52 @@ private static Throwable extractPossibleTerminationCause( Throwable error )
177178
return error;
178179
}
179180

180-
private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition()
181+
private Retry exponentialBackoffRetryRx()
181182
{
182-
return errorCurrentAttempt -> errorCurrentAttempt.flatMap( e -> Mono.subscriberContext().map( ctx -> Tuples.of( e, ctx ) ) ).flatMap( t2 ->
183-
{
184-
185-
Throwable throwable = t2.getT1();
186-
Throwable error = extractPossibleTerminationCause( throwable );
187-
188-
Context ctx = t2.getT2();
189-
190-
List<Throwable> errors = ctx.getOrDefault( "errors", null );
191-
192-
long startTime = ctx.getOrDefault( "startTime", -1L );
193-
long nextDelayMs = ctx.getOrDefault( "nextDelayMs", initialRetryDelayMs );
194-
195-
if ( canRetryOn( error ) )
196-
{
197-
long currentTime = clock.millis();
198-
if ( startTime == -1 )
183+
return Retry.from( retrySignals -> retrySignals.flatMap( retrySignal -> Mono.deferContextual(
184+
contextView -> Mono.just( Tuples.of( retrySignal, contextView ) ) ) ).flatMap(
185+
tuple ->
199186
{
200-
startTime = currentTime;
201-
}
187+
Throwable throwable = tuple.getT1().failure();
188+
Throwable error = extractPossibleTerminationCause( throwable );
202189

203-
long elapsedTime = currentTime - startTime;
204-
if ( elapsedTime < maxRetryTimeMs )
205-
{
206-
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
207-
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
190+
ContextView contextView = tuple.getT2();
191+
List<Throwable> errors = contextView.getOrDefault( "errors", null );
208192

209-
nextDelayMs = (long) (nextDelayMs * multiplier);
210-
errors = recordError( error, errors );
193+
long startTime = contextView.getOrDefault( "startTime", -1L );
194+
long nextDelayMs = contextView.getOrDefault( "nextDelayMs", initialRetryDelayMs );
211195

212-
// retry on netty event loop thread
213-
EventExecutor eventExecutor = eventExecutorGroup.next();
214-
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) ).delayElement(
215-
Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
216-
}
217-
}
218-
addSuppressed( throwable, errors );
196+
if ( canRetryOn( error ) )
197+
{
198+
long currentTime = clock.millis();
199+
if ( startTime == -1 )
200+
{
201+
startTime = currentTime;
202+
}
203+
204+
long elapsedTime = currentTime - startTime;
205+
if ( elapsedTime < maxRetryTimeMs )
206+
{
207+
long delayWithJitterMs = computeDelayWithJitter( nextDelayMs );
208+
log.warn( "Reactive transaction failed and is scheduled to retry in " + delayWithJitterMs + "ms", error );
209+
210+
nextDelayMs = (long) (nextDelayMs * multiplier);
211+
errors = recordError( error, errors );
212+
213+
// retry on netty event loop thread
214+
EventExecutor eventExecutor = eventExecutorGroup.next();
215+
Context context = Context.of(
216+
"errors", errors,
217+
"startTime", startTime,
218+
"nextDelayMs", nextDelayMs
219+
);
220+
return Mono.just( context ).delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
221+
}
222+
}
223+
addSuppressed( throwable, errors );
219224

220-
return Mono.error( throwable );
221-
} );
225+
return Mono.error( throwable );
226+
} ) );
222227
}
223228

224229
private <T> void executeWorkInEventLoop( CompletableFuture<T> resultFuture, Supplier<CompletionStage<T>> work )
@@ -373,7 +378,7 @@ private static List<Throwable> recordError( Throwable error, List<Throwable> err
373378
{
374379
if ( errors == null )
375380
{
376-
errors = new ArrayList<>();
381+
errors = new LinkedList<>();
377382
}
378383
errors.add( error );
379384
return errors;

pom.xml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
<!-- Please note that when updating this dependency -->
3333
<!-- (i.e. due to a security vulnerability or bug) that the -->
3434
<!-- corresponding server dependency also needs updating.-->
35-
<reactor-bom.version>Dysprosium-SR21</reactor-bom.version>
35+
<reactor.version>3.4.7</reactor.version>
3636
<rxjava.version>2.2.21</rxjava.version>
3737
<slf4j-api.version>1.7.31</slf4j-api.version>
3838
<hamcrest-junit.version>2.0.0.0</hamcrest-junit.version>
@@ -90,10 +90,8 @@
9090
</dependency>
9191
<dependency>
9292
<groupId>io.projectreactor</groupId>
93-
<artifactId>reactor-bom</artifactId>
94-
<version>${reactor-bom.version}</version>
95-
<type>pom</type>
96-
<scope>import</scope>
93+
<artifactId>reactor-core</artifactId>
94+
<version>${reactor.version}</version>
9795
</dependency>
9896

9997
<!--Compile dependencies only used by Examples-->
@@ -111,6 +109,12 @@
111109
</dependency>
112110

113111
<!-- Test dependencies -->
112+
<dependency>
113+
<groupId>io.projectreactor</groupId>
114+
<artifactId>reactor-test</artifactId>
115+
<version>${reactor.version}</version>
116+
<scope>test</scope>
117+
</dependency>
114118
<dependency>
115119
<groupId>org.hamcrest</groupId>
116120
<artifactId>hamcrest-junit</artifactId>

0 commit comments

Comments
 (0)