Skip to content

Commit f40f45d

Browse files
authored
Introduce updateRoutingTableTimeout option (neo4j#1267)
Sets maximum amount of time the driver may wait for routing table acquisition. This option allows setting API response time expectation. It does not limit the time the driver might need when getting routing table.
1 parent 2b9a2c2 commit f40f45d

File tree

9 files changed

+140
-15
lines changed

9 files changed

+140
-15
lines changed

driver/src/main/java/org/neo4j/driver/Config.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ public class Config implements Serializable {
7979

8080
private final boolean logLeakedSessions;
8181

82+
private final long updateRoutingTableTimeoutMillis;
83+
8284
private final int maxConnectionPoolSize;
8385

8486
private final long idleTimeBeforeConnectionTest;
@@ -102,6 +104,7 @@ private Config(ConfigBuilder builder) {
102104
this.logging = builder.logging;
103105
this.logLeakedSessions = builder.logLeakedSessions;
104106

107+
this.updateRoutingTableTimeoutMillis = builder.updateRoutingTableTimeoutMillis;
105108
this.idleTimeBeforeConnectionTest = builder.idleTimeBeforeConnectionTest;
106109
this.maxConnectionLifetimeMillis = builder.maxConnectionLifetimeMillis;
107110
this.maxConnectionPoolSize = builder.maxConnectionPoolSize;
@@ -137,6 +140,15 @@ public boolean logLeakedSessions() {
137140
return logLeakedSessions;
138141
}
139142

143+
/**
144+
* Returns maximum amount of time the driver may wait for routing table acquisition.
145+
*
146+
* @return the maximum time in milliseconds
147+
*/
148+
public long updateRoutingTableTimeoutMillis() {
149+
return updateRoutingTableTimeoutMillis;
150+
}
151+
140152
/**
141153
* Pooled connections that have been idle in the pool for longer than this timeout
142154
* will be tested before they are used again, to ensure they are still live.
@@ -257,6 +269,7 @@ public String userAgent() {
257269
public static class ConfigBuilder {
258270
private Logging logging = DEV_NULL_LOGGING;
259271
private boolean logLeakedSessions;
272+
private long updateRoutingTableTimeoutMillis = TimeUnit.SECONDS.toMillis(90);
260273
private int maxConnectionPoolSize = PoolSettings.DEFAULT_MAX_CONNECTION_POOL_SIZE;
261274
private long idleTimeBeforeConnectionTest = PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST;
262275
private long maxConnectionLifetimeMillis = PoolSettings.DEFAULT_MAX_CONNECTION_LIFETIME;
@@ -310,6 +323,26 @@ public ConfigBuilder withLeakedSessionsLogging() {
310323
return this;
311324
}
312325

326+
/**
327+
* Sets maximum amount of time the driver may wait for routing table acquisition.
328+
* <p>
329+
* This option allows setting API response time expectation. It does not limit the time the driver might need when getting routing table.
330+
* <p>
331+
* Default is 90 seconds.
332+
*
333+
* @param value the maximum time amount
334+
* @param unit the time unit
335+
* @return this builder
336+
*/
337+
public ConfigBuilder withUpdateRoutingTableTimeout(long value, TimeUnit unit) {
338+
var millis = unit.toMillis(value);
339+
if (millis <= 0) {
340+
throw new IllegalArgumentException("The provided value must be at least 1 millisecond.");
341+
}
342+
this.updateRoutingTableTimeoutMillis = millis;
343+
return this;
344+
}
345+
313346
/**
314347
* Pooled connections that have been idle in the pool for longer than this timeout
315348
* will be tested before they are used again, to ensure they are still live.

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ protected LoadBalancer createLoadBalancer(
281281
address,
282282
routingSettings,
283283
connectionPool,
284+
config.updateRoutingTableTimeoutMillis(),
284285
eventExecutorGroup,
285286
createClock(),
286287
config.logging(),

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@
2727
import java.util.Optional;
2828
import java.util.Set;
2929
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.CompletionException;
3031
import java.util.concurrent.CompletionStage;
3132
import java.util.concurrent.ConcurrentHashMap;
3233
import java.util.concurrent.ConcurrentMap;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3336
import java.util.concurrent.atomic.AtomicReference;
3437
import org.neo4j.driver.Logger;
3538
import org.neo4j.driver.Logging;
39+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3640
import org.neo4j.driver.internal.BoltServerAddress;
3741
import org.neo4j.driver.internal.DatabaseName;
3842
import org.neo4j.driver.internal.DatabaseNameUtil;
@@ -42,23 +46,27 @@
4246
import org.neo4j.driver.internal.util.Futures;
4347

4448
public class RoutingTableRegistryImpl implements RoutingTableRegistry {
49+
static final String TABLE_ACQUISITION_TIMEOUT_MESSAGE = "Failed to acquire routing table in configured timeout.";
4550
private final ConcurrentMap<DatabaseName, RoutingTableHandler> routingTableHandlers;
4651
private final Map<Principal, CompletionStage<DatabaseName>> principalToDatabaseNameStage;
4752
private final RoutingTableHandlerFactory factory;
4853
private final Logger log;
54+
private final long updateRoutingTableTimeoutMillis;
4955
private final Clock clock;
5056
private final ConnectionPool connectionPool;
5157
private final Rediscovery rediscovery;
5258

5359
public RoutingTableRegistryImpl(
5460
ConnectionPool connectionPool,
5561
Rediscovery rediscovery,
62+
long updateRoutingTableTimeoutMillis,
5663
Clock clock,
5764
Logging logging,
5865
long routingTablePurgeDelayMs) {
5966
this(
6067
new ConcurrentHashMap<>(),
6168
new RoutingTableHandlerFactory(connectionPool, rediscovery, clock, logging, routingTablePurgeDelayMs),
69+
updateRoutingTableTimeoutMillis,
6270
clock,
6371
connectionPool,
6472
rediscovery,
@@ -68,13 +76,15 @@ public RoutingTableRegistryImpl(
6876
RoutingTableRegistryImpl(
6977
ConcurrentMap<DatabaseName, RoutingTableHandler> routingTableHandlers,
7078
RoutingTableHandlerFactory factory,
79+
long updateRoutingTableTimeoutMillis,
7180
Clock clock,
7281
ConnectionPool connectionPool,
7382
Rediscovery rediscovery,
7483
Logging logging) {
7584
this.factory = factory;
7685
this.routingTableHandlers = routingTableHandlers;
7786
this.principalToDatabaseNameStage = new HashMap<>();
87+
this.updateRoutingTableTimeoutMillis = updateRoutingTableTimeoutMillis;
7888
this.clock = clock;
7989
this.connectionPool = connectionPool;
8090
this.rediscovery = rediscovery;
@@ -83,14 +93,18 @@ public RoutingTableRegistryImpl(
8393

8494
@Override
8595
public CompletionStage<RoutingTableHandler> ensureRoutingTable(ConnectionContext context) {
86-
return ensureDatabaseNameIsCompleted(context).thenCompose(ctxAndHandler -> {
87-
ConnectionContext completedContext = ctxAndHandler.getContext();
88-
RoutingTableHandler handler = ctxAndHandler.getHandler() != null
89-
? ctxAndHandler.getHandler()
90-
: getOrCreate(Futures.joinNowOrElseThrow(
91-
completedContext.databaseNameFuture(), PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER));
92-
return handler.ensureRoutingTable(completedContext).thenApply(ignored -> handler);
93-
});
96+
return ensureDatabaseNameIsCompleted(context)
97+
.thenCompose(ctxAndHandler -> {
98+
ConnectionContext completedContext = ctxAndHandler.getContext();
99+
RoutingTableHandler handler = ctxAndHandler.getHandler() != null
100+
? ctxAndHandler.getHandler()
101+
: getOrCreate(Futures.joinNowOrElseThrow(
102+
completedContext.databaseNameFuture(), PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER));
103+
return handler.ensureRoutingTable(completedContext).thenApply(ignored -> handler);
104+
})
105+
.toCompletableFuture()
106+
.orTimeout(updateRoutingTableTimeoutMillis, TimeUnit.MILLISECONDS)
107+
.handle(this::handleTimeoutException);
94108
}
95109

96110
private CompletionStage<ConnectionContextAndHandler> ensureDatabaseNameIsCompleted(ConnectionContext context) {
@@ -190,6 +204,19 @@ public Optional<RoutingTableHandler> getRoutingTableHandler(DatabaseName databas
190204
return Optional.ofNullable(routingTableHandlers.get(databaseName));
191205
}
192206

207+
private RoutingTableHandler handleTimeoutException(RoutingTableHandler handler, Throwable throwable) {
208+
if (throwable != null) {
209+
if (throwable instanceof TimeoutException) {
210+
throw new ServiceUnavailableException(TABLE_ACQUISITION_TIMEOUT_MESSAGE, throwable);
211+
} else if (throwable instanceof RuntimeException runtimeException) {
212+
throw runtimeException;
213+
} else {
214+
throw new CompletionException(throwable);
215+
}
216+
}
217+
return handler;
218+
}
219+
193220
// For tests
194221
public boolean contains(DatabaseName databaseName) {
195222
return routingTableHandlers.containsKey(databaseName);

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ public LoadBalancer(
7676
BoltServerAddress initialRouter,
7777
RoutingSettings settings,
7878
ConnectionPool connectionPool,
79+
long updateRoutingTableTimeoutMillis,
7980
EventExecutorGroup eventExecutorGroup,
8081
Clock clock,
8182
Logging logging,
@@ -88,6 +89,7 @@ public LoadBalancer(
8889
initialRouter, resolver, settings, clock, logging, requireNonNull(domainNameResolver)),
8990
settings,
9091
loadBalancingStrategy,
92+
updateRoutingTableTimeoutMillis,
9193
eventExecutorGroup,
9294
clock,
9395
logging);
@@ -98,12 +100,14 @@ private LoadBalancer(
98100
Rediscovery rediscovery,
99101
RoutingSettings settings,
100102
LoadBalancingStrategy loadBalancingStrategy,
103+
long updateRoutingTableTimeoutMillis,
101104
EventExecutorGroup eventExecutorGroup,
102105
Clock clock,
103106
Logging logging) {
104107
this(
105108
connectionPool,
106-
createRoutingTables(connectionPool, rediscovery, settings, clock, logging),
109+
createRoutingTables(
110+
connectionPool, rediscovery, settings, updateRoutingTableTimeoutMillis, clock, logging),
107111
rediscovery,
108112
loadBalancingStrategy,
109113
eventExecutorGroup,
@@ -275,10 +279,16 @@ private static RoutingTableRegistry createRoutingTables(
275279
ConnectionPool connectionPool,
276280
Rediscovery rediscovery,
277281
RoutingSettings settings,
282+
long updateRoutingTableTimeoutMillis,
278283
Clock clock,
279284
Logging logging) {
280285
return new RoutingTableRegistryImpl(
281-
connectionPool, rediscovery, clock, logging, settings.routingTablePurgeDelayMs());
286+
connectionPool,
287+
rediscovery,
288+
updateRoutingTableTimeoutMillis,
289+
clock,
290+
logging,
291+
settings.routingTablePurgeDelayMs());
282292
}
283293

284294
private static Rediscovery createRediscovery(

driver/src/test/java/org/neo4j/driver/ConfigTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,29 @@ void shouldTurnOnLeakedSessionsLogging() {
147147
assertTrue(Config.builder().withLeakedSessionsLogging().build().logLeakedSessions());
148148
}
149149

150+
@Test
151+
void shouldHaveDefaultUpdateRoutingTableTimeout() {
152+
var defaultConfig = Config.defaultConfig();
153+
assertEquals(TimeUnit.SECONDS.toMillis(90), defaultConfig.updateRoutingTableTimeoutMillis());
154+
}
155+
156+
@Test
157+
void shouldSetUpdateRoutingTableTimeout() {
158+
var value = 1;
159+
var config = Config.builder()
160+
.withUpdateRoutingTableTimeout(value, TimeUnit.HOURS)
161+
.build();
162+
assertEquals(TimeUnit.HOURS.toMillis(value), config.updateRoutingTableTimeoutMillis());
163+
}
164+
165+
@Test
166+
void shouldRejectLessThen1Millisecond() {
167+
var builder = Config.builder();
168+
assertThrows(
169+
IllegalArgumentException.class,
170+
() -> builder.withUpdateRoutingTableTimeout(999_999, TimeUnit.NANOSECONDS));
171+
}
172+
150173
@Test
151174
void shouldHaveDefaultConnectionTimeout() {
152175
Config defaultConfig = Config.defaultConfig();

driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImplTest.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,20 @@
2525
import static org.hamcrest.Matchers.empty;
2626
import static org.hamcrest.Matchers.equalTo;
2727
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
29+
import static org.junit.jupiter.api.Assertions.assertThrows;
2830
import static org.junit.jupiter.api.Assertions.assertTrue;
2931
import static org.mockito.ArgumentMatchers.any;
3032
import static org.mockito.ArgumentMatchers.eq;
33+
import static org.mockito.BDDMockito.given;
3134
import static org.mockito.Mockito.mock;
3235
import static org.mockito.Mockito.verify;
3336
import static org.mockito.Mockito.when;
3437
import static org.neo4j.driver.internal.DatabaseNameUtil.SYSTEM_DATABASE_NAME;
3538
import static org.neo4j.driver.internal.DatabaseNameUtil.database;
3639
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
3740
import static org.neo4j.driver.internal.cluster.RoutingSettings.STALE_ROUTING_TABLE_PURGE_DELAY_MS;
41+
import static org.neo4j.driver.internal.cluster.RoutingTableRegistryImpl.TABLE_ACQUISITION_TIMEOUT_MESSAGE;
3842
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
3943
import static org.neo4j.driver.internal.util.ClusterCompositionUtil.A;
4044
import static org.neo4j.driver.internal.util.ClusterCompositionUtil.B;
@@ -48,13 +52,16 @@
4852
import java.util.Collections;
4953
import java.util.HashSet;
5054
import java.util.Set;
55+
import java.util.concurrent.CompletableFuture;
5156
import java.util.concurrent.ConcurrentHashMap;
5257
import java.util.concurrent.ConcurrentMap;
58+
import java.util.concurrent.TimeoutException;
5359
import org.junit.jupiter.api.Test;
5460
import org.junit.jupiter.params.ParameterizedTest;
5561
import org.junit.jupiter.params.provider.EnumSource;
5662
import org.junit.jupiter.params.provider.ValueSource;
5763
import org.neo4j.driver.AccessMode;
64+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
5865
import org.neo4j.driver.internal.BoltServerAddress;
5966
import org.neo4j.driver.internal.DatabaseName;
6067
import org.neo4j.driver.internal.async.ImmutableConnectionContext;
@@ -135,7 +142,7 @@ void shouldReturnFreshRoutingTable(AccessMode mode) throws Throwable {
135142
RoutingTableHandler handler = mockedRoutingTableHandler();
136143
RoutingTableHandlerFactory factory = mockedHandlerFactory(handler);
137144
RoutingTableRegistryImpl routingTables =
138-
new RoutingTableRegistryImpl(map, factory, null, null, null, DEV_NULL_LOGGING);
145+
new RoutingTableRegistryImpl(map, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING);
139146

140147
ImmutableConnectionContext context =
141148
new ImmutableConnectionContext(defaultDatabase(), Collections.emptySet(), mode);
@@ -155,7 +162,7 @@ void shouldReturnServersInAllRoutingTables() throws Throwable {
155162
map.put(database("Orange"), mockedRoutingTableHandler(E, F, C));
156163
RoutingTableHandlerFactory factory = mockedHandlerFactory();
157164
RoutingTableRegistryImpl routingTables =
158-
new RoutingTableRegistryImpl(map, factory, null, null, null, DEV_NULL_LOGGING);
165+
new RoutingTableRegistryImpl(map, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING);
159166

160167
// When
161168
Set<BoltServerAddress> servers = routingTables.allServers();
@@ -198,6 +205,26 @@ void shouldRemoveStaleRoutingTableHandlers() throws Throwable {
198205
assertThat(routingTables.allServers(), empty());
199206
}
200207

208+
@Test
209+
void shouldReturnExistingRoutingTableHandlerWhenFreshRoutingTables() throws Throwable {
210+
// Given
211+
var map = new ConcurrentHashMap<DatabaseName, RoutingTableHandler>();
212+
var handler = mock(RoutingTableHandler.class);
213+
given(handler.ensureRoutingTable(any())).willReturn(new CompletableFuture<>());
214+
var database = database("neo4j");
215+
map.put(database, handler);
216+
217+
var factory = mockedHandlerFactory();
218+
var routingTables = new RoutingTableRegistryImpl(map, factory, 250, null, null, null, DEV_NULL_LOGGING);
219+
var context = new ImmutableConnectionContext(database, Collections.emptySet(), AccessMode.READ);
220+
221+
// When & Then
222+
var actual =
223+
assertThrows(ServiceUnavailableException.class, () -> await(routingTables.ensureRoutingTable(context)));
224+
assertEquals(TABLE_ACQUISITION_TIMEOUT_MESSAGE, actual.getMessage());
225+
assertInstanceOf(TimeoutException.class, actual.getCause());
226+
}
227+
201228
private RoutingTableHandler mockedRoutingTableHandler(BoltServerAddress... servers) {
202229
RoutingTableHandler handler = mock(RoutingTableHandler.class);
203230
when(handler.servers()).thenReturn(new HashSet<>(Arrays.asList(servers)));
@@ -207,7 +234,7 @@ private RoutingTableHandler mockedRoutingTableHandler(BoltServerAddress... serve
207234

208235
private RoutingTableRegistryImpl newRoutingTables(
209236
ConcurrentMap<DatabaseName, RoutingTableHandler> handlers, RoutingTableHandlerFactory factory) {
210-
return new RoutingTableRegistryImpl(handlers, factory, null, null, null, DEV_NULL_LOGGING);
237+
return new RoutingTableRegistryImpl(handlers, factory, Long.MAX_VALUE, null, null, null, DEV_NULL_LOGGING);
211238
}
212239

213240
private RoutingTableHandlerFactory mockedHandlerFactory(RoutingTableHandler handler) {

driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/RoutingTableAndConnectionPoolTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ private ConnectionPool newConnectionPool() {
323323

324324
private RoutingTableRegistryImpl newRoutingTables(ConnectionPool connectionPool, Rediscovery rediscovery) {
325325
return new RoutingTableRegistryImpl(
326-
connectionPool, rediscovery, clock, logging, STALE_ROUTING_TABLE_PURGE_DELAY_MS);
326+
connectionPool, rediscovery, Long.MAX_VALUE, clock, logging, STALE_ROUTING_TABLE_PURGE_DELAY_MS);
327327
}
328328

329329
private LoadBalancer newLoadBalancer(ConnectionPool connectionPool, RoutingTableRegistry routingTables) {

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public class GetFeatures implements TestkitRequest {
5757
"Detail:DefaultSecurityConfigValueEquality",
5858
"Optimization:ImplicitDefaultArguments",
5959
"Feature:Bolt:Patch:UTC",
60-
"Feature:API:Type.Temporal"));
60+
"Feature:API:Type.Temporal",
61+
"Feature:API:UpdateRoutingTableTimeout"));
6162

6263
private static final Set<String> SYNC_FEATURES = new HashSet<>(Arrays.asList(
6364
"Feature:Bolt:3.0",

testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ public TestkitResponse process(TestkitState testkitState) {
101101
domainNameResolver = callbackDomainNameResolver(testkitState);
102102
}
103103
Optional.ofNullable(data.userAgent).ifPresent(configBuilder::withUserAgent);
104+
Optional.ofNullable(data.updateRoutingTableTimeoutMs)
105+
.ifPresent(timeout -> configBuilder.withUpdateRoutingTableTimeout(timeout, TimeUnit.MILLISECONDS));
104106
Optional.ofNullable(data.connectionTimeoutMs)
105107
.ifPresent(timeout -> configBuilder.withConnectionTimeout(timeout, TimeUnit.MILLISECONDS));
106108
Optional.ofNullable(data.fetchSize).ifPresent(configBuilder::withFetchSize);
@@ -278,6 +280,7 @@ public static class NewDriverBody {
278280
private String userAgent;
279281
private boolean resolverRegistered;
280282
private boolean domainNameResolverRegistered;
283+
private Long updateRoutingTableTimeoutMs;
281284
private Long connectionTimeoutMs;
282285
private Integer fetchSize;
283286
private Long maxTxRetryTimeMs;

0 commit comments

Comments
 (0)