Skip to content

Commit 73fcafd

Browse files
author
Zhen Li
committed
Schedule reactive retries in Netty event loop thread pool.
By default, Project Reactor schedules delayed operations on a parallel scheduler, which is created lazily when needed. We can avoid creating such an extra resource by schedule the delayed work on our existing netty event loop thread pool. This behaviour is exactly the same as our existing async retries. Added tests to verify that the Parallel scheduler is not created.
1 parent dbce261 commit 73fcafd

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

driver/src/main/java/org/neo4j/driver/internal/retry/ExponentialBackoffRetryLogic.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.reactivestreams.Publisher;
2424
import reactor.core.publisher.Flux;
2525
import reactor.core.publisher.Mono;
26+
import reactor.core.scheduler.Schedulers;
2627
import reactor.util.context.Context;
2728
import reactor.util.function.Tuples;
2829

@@ -33,16 +34,16 @@
3334
import java.util.concurrent.CompletionStage;
3435
import java.util.concurrent.ThreadLocalRandom;
3536
import java.util.concurrent.TimeUnit;
36-
import java.util.function.Supplier;
3737
import java.util.function.Function;
38+
import java.util.function.Supplier;
3839

39-
import org.neo4j.driver.internal.util.Clock;
40-
import org.neo4j.driver.internal.util.Futures;
4140
import org.neo4j.driver.Logger;
4241
import org.neo4j.driver.Logging;
4342
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4443
import org.neo4j.driver.exceptions.SessionExpiredException;
4544
import org.neo4j.driver.exceptions.TransientException;
45+
import org.neo4j.driver.internal.util.Clock;
46+
import org.neo4j.driver.internal.util.Futures;
4647

4748
import static java.util.concurrent.TimeUnit.SECONDS;
4849

@@ -179,12 +180,15 @@ private Function<Flux<Throwable>,Publisher<Context>> retryRxCondition()
179180
nextDelayMs = (long) (nextDelayMs * multiplier);
180181
errors = recordError( lastError, errors );
181182

183+
// retry on netty event loop thread
184+
EventExecutor eventExecutor = eventExecutorGroup.next();
182185
return Mono.just( ctx.put( "errors", errors ).put( "startTime", startTime ).put( "nextDelayMs", nextDelayMs ) )
183-
.delayElement( Duration.ofMillis( delayWithJitterMs ) );
186+
.delayElement( Duration.ofMillis( delayWithJitterMs ), Schedulers.fromExecutorService( eventExecutor ) );
184187
}
185188
else
186189
{
187190
addSuppressed( lastError, errors );
191+
188192
return Mono.error( lastError );
189193
}
190194
} );

driver/src/test/java/org/neo4j/driver/integration/reactive/RxSessionIT.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.util.Arrays;
2929
import java.util.Iterator;
30+
import java.util.Set;
3031
import java.util.concurrent.atomic.AtomicInteger;
3132

3233
import org.neo4j.driver.Session;
@@ -46,6 +47,8 @@
4647

4748
import static java.util.Collections.emptyIterator;
4849
import static org.hamcrest.CoreMatchers.instanceOf;
50+
import static org.hamcrest.CoreMatchers.not;
51+
import static org.hamcrest.CoreMatchers.startsWith;
4952
import static org.hamcrest.MatcherAssert.assertThat;
5053
import static org.junit.jupiter.api.Assertions.assertEquals;
5154
import static org.neo4j.driver.internal.util.Neo4jFeature.BOLT_V4;
@@ -119,6 +122,7 @@ void shouldRunAsyncTransactionWithRetriesOnAsyncFailures()
119122

120123
assertEquals( 4, work.invocationCount() );
121124
assertEquals( 1, countNodesByLabel( "Node" ) );
125+
assertNoParallelScheduler();
122126
}
123127

124128
@Test
@@ -134,6 +138,7 @@ void shouldRunAsyncTransactionWithRetriesOnSyncFailures()
134138

135139
assertEquals( 3, work.invocationCount() );
136140
assertEquals( 1, countNodesByLabel( "Test" ) );
141+
assertNoParallelScheduler();
137142
}
138143

139144
@Test
@@ -150,6 +155,7 @@ void shouldRunAsyncTransactionThatCanNotBeRetried()
150155

151156
assertEquals( 1, work.invocationCount() );
152157
assertEquals( 0, countNodesByLabel( "Hi" ) );
158+
assertNoParallelScheduler();
153159
}
154160

155161
@Test
@@ -173,6 +179,17 @@ void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure()
173179

174180
assertEquals( 2, work.invocationCount() );
175181
assertEquals( 0, countNodesByLabel( "Person" ) );
182+
assertNoParallelScheduler();
183+
}
184+
185+
private void assertNoParallelScheduler()
186+
{
187+
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
188+
for ( Thread t : threadSet )
189+
{
190+
String name = t.getName();
191+
assertThat( name, not( startsWith( "parallel" ) ) );
192+
}
176193
}
177194

178195
private long countNodesByLabel( String label )

0 commit comments

Comments
 (0)