Skip to content

Commit fb45ba6

Browse files
author
Zhen Li
committed
Schedule reactive retries in Netty event loop thread pool.
By default, Project Reactor schedules delayed operations on a parallel scheduler, which is created lazily when needed. We can avoid creating such an extra resource by schedule the delayed work on our existing netty event loop thread pool. This behaviour is exactly the same as our existing async retries. Added tests to verify that the Parallel scheduler is not created.
1 parent a87bf79 commit fb45ba6

File tree

4 files changed

+49
-37
lines changed

4 files changed

+49
-37
lines changed

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.reactivestreams.Publisher;
2424
import reactor.core.publisher.Flux;
2525
import reactor.core.publisher.Mono;
26+
import reactor.core.scheduler.Schedulers;
2627
import reactor.util.context.Context;
2728
import reactor.util.function.Tuples;
2829

@@ -33,16 +34,16 @@
3334
import java.util.concurrent.CompletionStage;
3435
import java.util.concurrent.ThreadLocalRandom;
3536
import java.util.concurrent.TimeUnit;
36-
import java.util.function.Supplier;
3737
import java.util.function.Function;
38+
import java.util.function.Supplier;
3839

39-
import org.neo4j.driver.internal.util.Clock;
40-
import org.neo4j.driver.internal.util.Futures;
4140
import org.neo4j.driver.Logger;
4241
import org.neo4j.driver.Logging;
4342
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4443
import org.neo4j.driver.exceptions.SessionExpiredException;
4544
import org.neo4j.driver.exceptions.TransientException;
45+
import org.neo4j.driver.internal.util.Clock;
46+
import org.neo4j.driver.internal.util.Futures;
4647

4748
import static java.util.concurrent.TimeUnit.SECONDS;
4849

@@ -179,12 +180,15 @@ private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition()
179180
nextDelayMs = (long) (nextDelayMs * multiplier);
180181
errors = recordError( lastError, errors );
181182

183+
// retry on netty event loop thread
184+
EventExecutor eventExecutor = eventExecutorGroup.next();
182185
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) )
183-
.delayElement( Duration.ofMillis( delayWithJitterMs ) );
186+
.delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
184187
}
185188
else
186189
{
187190
addSuppressed( lastError, errors );
191+
188192
return Mono.error( lastError );
189193
}
190194
} );

driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.util.Arrays;
2929
import java.util.Iterator;
30+
import java.util.Set;
3031
import java.util.concurrent.atomic.AtomicInteger;
3132

3233
import org.neo4j.driver.Session;
@@ -46,6 +47,8 @@
4647

4748
import static java.util.Collections.emptyIterator;
4849
import static org.hamcrest.CoreMatchers.instanceOf;
50+
import static org.hamcrest.CoreMatchers.not;
51+
import static org.hamcrest.CoreMatchers.startsWith;
4952
import static org.hamcrest.MatcherAssert.assertThat;
5053
import static org.junit.jupiter.api.Assertions.assertEquals;
5154
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
@@ -119,6 +122,7 @@ void shouldRunAsyncTransactionWithRetriesOnAsyncFailures()
119122

120123
assertEquals( 4, work.invocationCount() );
121124
assertEquals( 1, countNodesByLabel( "Node" ) );
125+
assertNoParallelScheduler();
122126
}
123127

124128
@Test
@@ -134,6 +138,7 @@ void shouldRunAsyncTransactionWithRetriesOnSyncFailures()
134138

135139
assertEquals( 3, work.invocationCount() );
136140
assertEquals( 1, countNodesByLabel( "Test" ) );
141+
assertNoParallelScheduler();
137142
}
138143

139144
@Test
@@ -150,6 +155,7 @@ void shouldRunAsyncTransactionThatCanNotBeRetried()
150155

151156
assertEquals( 1, work.invocationCount() );
152157
assertEquals( 0, countNodesByLabel( "Hi" ) );
158+
assertNoParallelScheduler();
153159
}
154160

155161
@Test
@@ -173,6 +179,17 @@ void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure()
173179

174180
assertEquals( 2, work.invocationCount() );
175181
assertEquals( 0, countNodesByLabel( "Person" ) );
182+
assertNoParallelScheduler();
183+
}
184+
185+
private void assertNoParallelScheduler()
186+
{
187+
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
188+
for ( Thread t : threadSet )
189+
{
190+
String name = t.getName();
191+
assertThat( name, not( startsWith( "parallel" ) ) );
192+
}
176193
}
177194

178195
private long countNodesByLabel( String label )

driver/src/test/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogicTest.java

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,19 @@
2222
import org.mockito.ArgumentCaptor;
2323
import reactor.core.publisher.Flux;
2424
import reactor.test.StepVerifier;
25-
import reactor.util.function.Tuple2;
2625

27-
import java.time.Duration;
2826
import java.util.ArrayList;
2927
import java.util.List;
3028
import java.util.concurrent.CompletionStage;
3129
import java.util.function.Supplier;
3230

33-
import org.neo4j.driver.internal.util.Clock;
34-
import org.neo4j.driver.internal.util.ImmediateSchedulingEventExecutor;
3531
import org.neo4j.driver.Logger;
3632
import org.neo4j.driver.Logging;
3733
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3834
import org.neo4j.driver.exceptions.SessionExpiredException;
3935
import org.neo4j.driver.exceptions.TransientException;
36+
import org.neo4j.driver.internal.util.Clock;
37+
import org.neo4j.driver.internal.util.ImmediateSchedulingEventExecutor;
4038

4139
import static java.lang.Long.MAX_VALUE;
4240
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -751,53 +749,44 @@ public CompletionStage<Void> get()
751749
}
752750

753751
@Test
754-
void shouldRetryWithBackOff() {
752+
void shouldRetryWithBackOff()
753+
{
755754
Exception exception = new TransientException( "Unknown", "Retry this error." );
756-
List<Long> elapsedList = new ArrayList<>();
757-
758755
ExponentialBackoffRetryLogic retryLogic = new ExponentialBackoffRetryLogic( 1000, 100, 2, 0,
759-
null, Clock.SYSTEM, DEV_NULL_LOGGING );
756+
eventExecutor, Clock.SYSTEM, DEV_NULL_LOGGING );
760757

761758
Flux<Integer> source = Flux.concat( Flux.range( 0, 2 ), Flux.error( exception ) );
762-
StepVerifier.withVirtualTime( () -> // This test uses a virtual time. So do not panic if you saw this test runs faster than it should be.
763-
Flux.from( retryLogic.retryRx( source ) )
764-
.elapsed()
765-
.doOnNext( elapsed -> { if ( elapsed.getT2() == 0 ) elapsedList.add( elapsed.getT1() ); } )
766-
.map( Tuple2::getT2 ) )
767-
.thenAwait( Duration.ofSeconds( 2 ) )
759+
Flux<Integer> retriedSource = Flux.from( retryLogic.retryRx( source ) );
760+
StepVerifier.create( retriedSource )
768761
.expectNext( 0, 1 ) // first run
769762
.expectNext( 0, 1, 0, 1, 0, 1, 0, 1 ) //4 retry attempts
770763
.verifyErrorSatisfies( e -> assertThat( e, equalTo( exception ) ) );
771764

772-
assertThat( elapsedList.size(), equalTo( 5 ) );
773-
assertThat( elapsedList, contains( 0L, 100L, 200L, 400L, 800L ) );
765+
List<Long> delays = eventExecutor.scheduleDelays();
766+
assertThat( delays.size(), equalTo( 4 ) );
767+
assertThat( delays, contains( 100L, 200L, 400L, 800L ) );
774768
}
775769

776770
@Test
777-
void shouldRetryWithRandomBackOff() {
778-
List<Long> elapsedList = new ArrayList<>();
771+
void shouldRetryWithRandomBackOff()
772+
{
779773
Exception exception = new TransientException( "Unknown", "Retry this error." );
780-
781774
ExponentialBackoffRetryLogic retryLogic = new ExponentialBackoffRetryLogic( 1000, 100, 2, 0.1,
782-
null, Clock.SYSTEM, DEV_NULL_LOGGING );
775+
eventExecutor, Clock.SYSTEM, DEV_NULL_LOGGING );
783776

784777
Flux<Integer> source = Flux.concat( Flux.range( 0, 2 ), Flux.error( exception ) );
785-
StepVerifier.withVirtualTime( () -> // This test uses a virtual time. So do not panic if you saw this test runs faster than it should be.
786-
Flux.from( retryLogic.retryRx( source ) )
787-
.elapsed()
788-
.doOnNext( elapsed -> { if ( elapsed.getT2() == 0 ) elapsedList.add( elapsed.getT1() ); } )
789-
.map( Tuple2::getT2 ) )
790-
.thenAwait( Duration.ofSeconds( 2 ) )
778+
Flux<Integer> retriedSource = Flux.from( retryLogic.retryRx( source ) );
779+
StepVerifier.create( retriedSource )
791780
.expectNext( 0, 1 ) // first run
792781
.expectNext( 0, 1, 0, 1, 0, 1, 0, 1 ) // 4 retry attempts
793782
.verifyErrorSatisfies( e -> assertThat( e, equalTo( exception ) ) );
794783

795-
assertThat( elapsedList.size(), equalTo( 5 ) );
796-
assertThat( elapsedList.get( 0 ), equalTo( 0L ) );
797-
assertThat( elapsedList.get( 1 ), allOf( greaterThanOrEqualTo( 90L ), lessThanOrEqualTo( 110L ) ) );
798-
assertThat( elapsedList.get( 2 ), allOf( greaterThanOrEqualTo( 180L ), lessThanOrEqualTo( 220L ) ) );
799-
assertThat( elapsedList.get( 3 ), allOf( greaterThanOrEqualTo( 260L ), lessThanOrEqualTo( 440L ) ) );
800-
assertThat( elapsedList.get( 4 ), allOf( greaterThanOrEqualTo( 720L ), lessThanOrEqualTo( 880L ) ) );
784+
List<Long> delays = eventExecutor.scheduleDelays();
785+
assertThat( delays.size(), equalTo( 4 ) );
786+
assertThat( delays.get( 0 ), allOf( greaterThanOrEqualTo( 90L ), lessThanOrEqualTo( 110L ) ) );
787+
assertThat( delays.get( 1 ), allOf( greaterThanOrEqualTo( 180L ), lessThanOrEqualTo( 220L ) ) );
788+
assertThat( delays.get( 2 ), allOf( greaterThanOrEqualTo( 260L ), lessThanOrEqualTo( 440L ) ) );
789+
assertThat( delays.get( 3 ), allOf( greaterThanOrEqualTo( 720L ), lessThanOrEqualTo( 880L ) ) );
801790
}
802791

803792
private static void retry( ExponentialBackoffRetryLogic retryLogic, final int times )

driver/src/test/java/org/neo4j/driver/internal/util/ImmediateSchedulingEventExecutor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ public ScheduledFuture<?> schedule( Runnable command, long delay, TimeUnit unit
180180
@Override
181181
public <V> ScheduledFuture<V> schedule( Callable<V> callable, long delay, TimeUnit unit )
182182
{
183-
throw new UnsupportedOperationException();
183+
scheduleDelays.add( unit.toMillis( delay ) );
184+
delegate.submit( callable );
185+
return mock( ScheduledFuture.class );
184186
}
185187

186188
@Override

0 commit comments

Comments
 (0)