Skip to content

Commit 4764e85

Browse files
committed
Respect routingRetryDelay during async rediscovery
This commit makes async `Rediscovery` use and respect routing retry delay exposed via `Config#withRoutingRetryDelay()` method. This is done by scheduling retries in Netty even loop. It's quite different compared to blocking version because we can't `Thread#sleep()` in async. Also added more unit tests for async rediscovery.
1 parent d205c75 commit 4764e85

File tree

11 files changed

+562
-64
lines changed

11 files changed

+562
-64
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal;
2020

2121
import io.netty.bootstrap.Bootstrap;
22-
import io.netty.channel.EventLoopGroup;
2322
import io.netty.util.concurrent.EventExecutorGroup;
2423

2524
import java.io.IOException;
@@ -77,16 +76,16 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7776
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
7877

7978
Bootstrap bootstrap = createBootstrap();
80-
EventLoopGroup eventLoopGroup = bootstrap.config().group();
81-
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );
79+
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
80+
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
8281

8382
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
8483
config );
8584

8685
try
8786
{
8887
return createDriver( uri, address, connectionPool, asyncConnectionPool, config, newRoutingSettings,
89-
securityPlan, retryLogic );
88+
eventExecutorGroup, securityPlan, retryLogic );
9089
}
9190
catch ( Throwable driverError )
9291
{
@@ -122,7 +121,7 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
122121

123122
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
124123
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
125-
SecurityPlan securityPlan, RetryLogic retryLogic )
124+
EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic )
126125
{
127126
String scheme = uri.getScheme().toLowerCase();
128127
switch ( scheme )
@@ -132,7 +131,7 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
132131
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
133132
case BOLT_ROUTING_URI_SCHEME:
134133
return createRoutingDriver( address, connectionPool, asyncConnectionPool, config, routingSettings,
135-
securityPlan, retryLogic );
134+
securityPlan, retryLogic, eventExecutorGroup );
136135
default:
137136
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
138137
}
@@ -160,14 +159,14 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
160159
*/
161160
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
162161
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
163-
SecurityPlan securityPlan, RetryLogic retryLogic )
162+
SecurityPlan securityPlan, RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup )
164163
{
165164
if ( !securityPlan.isRoutingCompatible() )
166165
{
167166
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
168167
}
169168
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, asyncConnectionPool,
170-
config, routingSettings );
169+
eventExecutorGroup, config, routingSettings );
171170
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
172171
return createDriver( config, securityPlan, sessionFactory );
173172
}
@@ -188,10 +187,13 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
188187
* <b>This method is protected only for testing</b>
189188
*/
190189
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
191-
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings )
190+
AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup,
191+
Config config, RoutingSettings routingSettings )
192192
{
193-
return new LoadBalancer( address, routingSettings, connectionPool, asyncConnectionPool, createClock(),
194-
config.logging(), createLoadBalancingStrategy( config, connectionPool, asyncConnectionPool ) );
193+
LoadBalancingStrategy loadBalancingStrategy =
194+
createLoadBalancingStrategy( config, connectionPool, asyncConnectionPool );
195+
return new LoadBalancer( address, routingSettings, connectionPool, asyncConnectionPool, eventExecutorGroup,
196+
createClock(), config.logging(), loadBalancingStrategy );
195197
}
196198

197199
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool,

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.LinkedHashSet;
22+
import java.util.Objects;
2223
import java.util.Set;
2324

2425
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -94,15 +95,39 @@ public long expirationTimestamp() {
9495
return this.expirationTimestamp;
9596
}
9697

98+
@Override
99+
public boolean equals( Object o )
100+
{
101+
if ( this == o )
102+
{
103+
return true;
104+
}
105+
if ( o == null || getClass() != o.getClass() )
106+
{
107+
return false;
108+
}
109+
ClusterComposition that = (ClusterComposition) o;
110+
return expirationTimestamp == that.expirationTimestamp &&
111+
Objects.equals( readers, that.readers ) &&
112+
Objects.equals( writers, that.writers ) &&
113+
Objects.equals( routers, that.routers );
114+
}
115+
116+
@Override
117+
public int hashCode()
118+
{
119+
return Objects.hash( readers, writers, routers, expirationTimestamp );
120+
}
121+
97122
@Override
98123
public String toString()
99124
{
100125
return "ClusterComposition{" +
101-
"expirationTimestamp=" + expirationTimestamp +
102-
", readers=" + readers +
103-
", writers=" + writers +
104-
", routers=" + routers +
105-
'}';
126+
"readers=" + readers +
127+
", writers=" + writers +
128+
", routers=" + routers +
129+
", expirationTimestamp=" + expirationTimestamp +
130+
'}';
106131
}
107132

108133
public static ClusterComposition parse( Record record, long now )

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import io.netty.util.concurrent.EventExecutorGroup;
22+
2123
import java.util.Collections;
2224
import java.util.HashSet;
2325
import java.util.Set;
2426
import java.util.concurrent.CompletableFuture;
2527
import java.util.concurrent.CompletionException;
2628
import java.util.concurrent.CompletionStage;
29+
import java.util.concurrent.TimeUnit;
2730

2831
import org.neo4j.driver.internal.async.AsyncConnection;
2932
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
@@ -37,7 +40,6 @@
3740

3841
import static java.lang.String.format;
3942
import static java.util.concurrent.CompletableFuture.completedFuture;
40-
import static org.neo4j.driver.internal.async.Futures.failedFuture;
4143

4244
public class Rediscovery
4345
{
@@ -49,18 +51,30 @@ public class Rediscovery
4951
private final Logger logger;
5052
private final ClusterCompositionProvider provider;
5153
private final HostNameResolver hostNameResolver;
54+
private final EventExecutorGroup eventExecutorGroup;
5255

5356
private volatile boolean useInitialRouter;
5457

55-
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger,
56-
ClusterCompositionProvider provider, HostNameResolver hostNameResolver )
58+
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, ClusterCompositionProvider provider,
59+
EventExecutorGroup eventExecutorGroup, HostNameResolver hostNameResolver, Clock clock, Logger logger )
60+
{
61+
// todo: set useInitialRouter to true when driver only does async
62+
this( initialRouter, settings, provider, hostNameResolver, eventExecutorGroup, clock, logger, false );
63+
}
64+
65+
// Test-only constructor
66+
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, ClusterCompositionProvider provider,
67+
HostNameResolver hostNameResolver, EventExecutorGroup eventExecutorGroup, Clock clock, Logger logger,
68+
boolean useInitialRouter )
5769
{
5870
this.initialRouter = initialRouter;
5971
this.settings = settings;
6072
this.clock = clock;
6173
this.logger = logger;
6274
this.provider = provider;
6375
this.hostNameResolver = hostNameResolver;
76+
this.eventExecutorGroup = eventExecutorGroup;
77+
this.useInitialRouter = useInitialRouter;
6478
}
6579

6680
/**
@@ -97,25 +111,39 @@ public ClusterComposition lookupClusterComposition( RoutingTable routingTable, C
97111
public CompletionStage<ClusterComposition> lookupClusterCompositionAsync( RoutingTable routingTable,
98112
AsyncConnectionPool connectionPool )
99113
{
100-
return lookupClusterCompositionAsync( routingTable, connectionPool, 0 );
114+
CompletableFuture<ClusterComposition> result = new CompletableFuture<>();
115+
lookupClusterComposition( routingTable, connectionPool, 0, 0, result );
116+
return result;
101117
}
102118

103-
private CompletionStage<ClusterComposition> lookupClusterCompositionAsync( RoutingTable routingTable,
104-
AsyncConnectionPool connectionPool, int failures )
119+
private void lookupClusterComposition( RoutingTable routingTable, AsyncConnectionPool pool,
120+
int failures, long previousDelay, CompletableFuture<ClusterComposition> result )
105121
{
106122
if ( failures >= settings.maxRoutingFailures() )
107123
{
108-
return failedFuture( new ServiceUnavailableException( NO_ROUTERS_AVAILABLE ) );
124+
result.completeExceptionally( new ServiceUnavailableException( NO_ROUTERS_AVAILABLE ) );
125+
return;
109126
}
110127

111-
// todo: use settings.retryTimeoutDelay()?
112-
return lookupAsync( routingTable, connectionPool ).thenCompose( composition ->
128+
lookupAsync( routingTable, pool ).whenComplete( ( composition, error ) ->
113129
{
114-
if ( composition != null )
130+
if ( error != null )
115131
{
116-
return completedFuture( composition );
132+
result.completeExceptionally( error );
133+
}
134+
else if ( composition != null )
135+
{
136+
result.complete( composition );
137+
}
138+
else
139+
{
140+
long nextDelay = Math.max( settings.retryTimeoutDelay(), previousDelay * 2 );
141+
logger.info( "Unable to fetch new routing table, will try again in " + nextDelay + "ms" );
142+
eventExecutorGroup.next().schedule(
143+
() -> lookupClusterComposition( routingTable, pool, failures + 1, nextDelay, result ),
144+
nextDelay, TimeUnit.MILLISECONDS
145+
);
117146
}
118-
return lookupClusterCompositionAsync( routingTable, connectionPool, failures + 1 );
119147
} );
120148
}
121149

@@ -356,6 +384,7 @@ private ClusterComposition handleRoutingProcedureError( Throwable error, Routing
356384
}
357385
else
358386
{
387+
System.err.println( error );
359388
// connection turned out to be broken
360389
logger.error( format( "Failed to connect to routing server '%s'.", routerAddress ), error );
361390
routingTable.forget( routerAddress );

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster.loadbalancing;
2020

21+
import io.netty.util.concurrent.EventExecutorGroup;
22+
2123
import java.util.Set;
2224
import java.util.concurrent.CompletableFuture;
2325
import java.util.concurrent.CompletionStage;
@@ -65,12 +67,12 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
6567
private CompletableFuture<RoutingTable> refreshRoutingTableFuture;
6668

6769
public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
68-
AsyncConnectionPool asyncConnectionPool, Clock clock, Logging logging,
69-
LoadBalancingStrategy loadBalancingStrategy )
70+
AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup, Clock clock,
71+
Logging logging, LoadBalancingStrategy loadBalancingStrategy )
7072
{
7173
this( connections, asyncConnectionPool, new ClusterRoutingTable( clock, initialRouter ),
72-
createRediscovery( initialRouter, settings, clock, logging ), loadBalancerLogger( logging ),
73-
loadBalancingStrategy );
74+
createRediscovery( initialRouter, settings, eventExecutorGroup, clock, logging ),
75+
loadBalancerLogger( logging ), loadBalancingStrategy );
7476
}
7577

7678
// Used only in testing
@@ -302,12 +304,13 @@ private BoltServerAddress selectAddressAsync( AccessMode mode, AddressSet server
302304
}
303305

304306
private static Rediscovery createRediscovery( BoltServerAddress initialRouter, RoutingSettings settings,
305-
Clock clock, Logging logging )
307+
EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging )
306308
{
307309
Logger log = loadBalancerLogger( logging );
308-
ClusterCompositionProvider clusterComposition =
310+
ClusterCompositionProvider clusterCompositionProvider =
309311
new RoutingProcedureClusterCompositionProvider( clock, log, settings );
310-
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) );
312+
return new Rediscovery( initialRouter, settings, clusterCompositionProvider, eventExecutorGroup,
313+
new DnsResolver( log ), clock, log );
311314
}
312315

313316
private static Logger loadBalancerLogger( Logging logging )

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import io.netty.util.concurrent.EventExecutorGroup;
2122
import org.junit.Test;
2223
import org.junit.runner.RunWith;
2324
import org.junit.runners.Parameterized;
@@ -171,7 +172,7 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
171172
@Override
172173
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
173174
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
174-
SecurityPlan securityPlan, RetryLogic retryLogic )
175+
SecurityPlan securityPlan, RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup )
175176
{
176177
throw new UnsupportedOperationException( "Can't create routing driver" );
177178
}
@@ -195,7 +196,8 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
195196

196197
@Override
197198
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
198-
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings )
199+
AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup,
200+
Config config, RoutingSettings routingSettings )
199201
{
200202
return null;
201203
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21+
import io.netty.util.concurrent.GlobalEventExecutor;
2122
import org.junit.After;
2223
import org.junit.Rule;
2324
import org.junit.Test;
@@ -365,8 +366,8 @@ private Driver driverWithPool( ConnectionPool pool )
365366
AsyncConnectionPool asyncConnectionPool = mock( AsyncConnectionPool.class );
366367
LoadBalancingStrategy loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( pool,
367368
asyncConnectionPool, logging );
368-
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, asyncConnectionPool, clock,
369-
logging, loadBalancingStrategy );
369+
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, asyncConnectionPool,
370+
GlobalEventExecutor.INSTANCE, clock, logging, loadBalancingStrategy );
370371
Config config = Config.build().withLogging( logging ).toConfig();
371372
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );
372373
return new InternalDriver( insecure(), sessionFactory, logging );

0 commit comments

Comments
 (0)