Skip to content

Commit 5417654

Browse files
authored
Allow supplying a Rediscovery implementation (#1350)
* Allow supplying a Rediscovery implementation This is only for internal purposes. * Add rediscoverySupplier tests to DriverFactory * Add null protection
1 parent 7519388 commit 5417654

File tree

9 files changed

+168
-70
lines changed

9 files changed

+168
-70
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.netty.util.concurrent.EventExecutorGroup;
2828
import io.netty.util.internal.logging.InternalLoggerFactory;
2929
import java.net.URI;
30+
import java.util.Objects;
31+
import java.util.function.Supplier;
3032
import org.neo4j.driver.AuthToken;
3133
import org.neo4j.driver.AuthTokens;
3234
import org.neo4j.driver.Config;
@@ -39,11 +41,13 @@
3941
import org.neo4j.driver.internal.async.connection.ChannelConnectorImpl;
4042
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
4143
import org.neo4j.driver.internal.async.pool.PoolSettings;
44+
import org.neo4j.driver.internal.cluster.Rediscovery;
45+
import org.neo4j.driver.internal.cluster.RediscoveryImpl;
4246
import org.neo4j.driver.internal.cluster.RoutingContext;
47+
import org.neo4j.driver.internal.cluster.RoutingProcedureClusterCompositionProvider;
4348
import org.neo4j.driver.internal.cluster.RoutingSettings;
4449
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
4550
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
46-
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
4751
import org.neo4j.driver.internal.logging.NettyLogging;
4852
import org.neo4j.driver.internal.metrics.DevNullMetricsProvider;
4953
import org.neo4j.driver.internal.metrics.InternalMetricsProvider;
@@ -70,7 +74,7 @@ public final Driver newInstance(
7074
RetrySettings retrySettings,
7175
Config config,
7276
SecurityPlan securityPlan) {
73-
return newInstance(uri, authToken, routingSettings, retrySettings, config, null, securityPlan);
77+
return newInstance(uri, authToken, routingSettings, retrySettings, config, null, securityPlan, null);
7478
}
7579

7680
public final Driver newInstance(
@@ -80,7 +84,8 @@ public final Driver newInstance(
8084
RetrySettings retrySettings,
8185
Config config,
8286
EventLoopGroup eventLoopGroup,
83-
SecurityPlan securityPlan) {
87+
SecurityPlan securityPlan,
88+
Supplier<Rediscovery> rediscoverySupplier) {
8489
Bootstrap bootstrap;
8590
boolean ownsEventLoopGroup;
8691
if (eventLoopGroup == null) {
@@ -119,6 +124,7 @@ public final Driver newInstance(
119124
newRoutingSettings,
120125
retryLogic,
121126
metricsProvider,
127+
rediscoverySupplier,
122128
config);
123129
}
124130

@@ -185,6 +191,7 @@ private InternalDriver createDriver(
185191
RoutingSettings routingSettings,
186192
RetryLogic retryLogic,
187193
MetricsProvider metricsProvider,
194+
Supplier<Rediscovery> rediscoverySupplier,
188195
Config config) {
189196
try {
190197
String scheme = uri.getScheme().toLowerCase();
@@ -198,6 +205,7 @@ private InternalDriver createDriver(
198205
routingSettings,
199206
retryLogic,
200207
metricsProvider,
208+
rediscoverySupplier,
201209
config);
202210
} else {
203211
assertNoRoutingContext(uri, routingSettings);
@@ -243,9 +251,10 @@ protected InternalDriver createRoutingDriver(
243251
RoutingSettings routingSettings,
244252
RetryLogic retryLogic,
245253
MetricsProvider metricsProvider,
254+
Supplier<Rediscovery> rediscoverySupplier,
246255
Config config) {
247-
ConnectionProvider connectionProvider =
248-
createLoadBalancer(address, connectionPool, eventExecutorGroup, config, routingSettings);
256+
ConnectionProvider connectionProvider = createLoadBalancer(
257+
address, connectionPool, eventExecutorGroup, config, routingSettings, rediscoverySupplier);
249258
SessionFactory sessionFactory = createSessionFactory(connectionProvider, retryLogic, config);
250259
InternalDriver driver = createDriver(securityPlan, sessionFactory, metricsProvider, config);
251260
Logger log = config.logging().getLog(getClass());
@@ -273,24 +282,41 @@ protected LoadBalancer createLoadBalancer(
273282
ConnectionPool connectionPool,
274283
EventExecutorGroup eventExecutorGroup,
275284
Config config,
276-
RoutingSettings routingSettings) {
277-
LoadBalancingStrategy loadBalancingStrategy =
278-
new LeastConnectedLoadBalancingStrategy(connectionPool, config.logging());
279-
ServerAddressResolver resolver = createResolver(config);
280-
LoadBalancer loadBalancer = new LoadBalancer(
281-
address,
282-
routingSettings,
285+
RoutingSettings routingSettings,
286+
Supplier<Rediscovery> rediscoverySupplier) {
287+
var loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(connectionPool, config.logging());
288+
var resolver = createResolver(config);
289+
var domainNameResolver = Objects.requireNonNull(getDomainNameResolver(), "domainNameResolver must not be null");
290+
var clock = createClock();
291+
var logging = config.logging();
292+
if (rediscoverySupplier == null) {
293+
rediscoverySupplier =
294+
() -> createRediscovery(address, resolver, routingSettings, clock, logging, domainNameResolver);
295+
}
296+
var loadBalancer = new LoadBalancer(
283297
connectionPool,
284-
eventExecutorGroup,
285-
createClock(),
286-
config.logging(),
298+
rediscoverySupplier.get(),
299+
routingSettings,
287300
loadBalancingStrategy,
288-
resolver,
289-
getDomainNameResolver());
301+
eventExecutorGroup,
302+
clock,
303+
logging);
290304
handleNewLoadBalancer(loadBalancer);
291305
return loadBalancer;
292306
}
293307

308+
protected Rediscovery createRediscovery(
309+
BoltServerAddress initialRouter,
310+
ServerAddressResolver resolver,
311+
RoutingSettings settings,
312+
Clock clock,
313+
Logging logging,
314+
DomainNameResolver domainNameResolver) {
315+
var clusterCompositionProvider =
316+
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext());
317+
return new RediscoveryImpl(initialRouter, clusterCompositionProvider, resolver, logging, domainNameResolver);
318+
}
319+
294320
/**
295321
* Handles new {@link LoadBalancer} instance.
296322
* <p>
@@ -307,8 +333,6 @@ private static ServerAddressResolver createResolver(Config config) {
307333

308334
/**
309335
* Creates new {@link Clock}.
310-
* <p>
311-
* <b>This method is protected only for testing</b>
312336
*/
313337
protected Clock createClock() {
314338
return Clock.SYSTEM;

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import static java.util.Objects.requireNonNull;
2122
import static org.neo4j.driver.internal.async.ConnectionContext.PENDING_DATABASE_NAME_EXCEPTION_SUPPLIER;
2223

2324
import java.util.HashMap;
@@ -72,6 +73,7 @@ public RoutingTableRegistryImpl(
7273
ConnectionPool connectionPool,
7374
Rediscovery rediscovery,
7475
Logging logging) {
76+
requireNonNull(rediscovery, "rediscovery must not be null");
7577
this.factory = factory;
7678
this.routingTableHandlers = routingTableHandlers;
7779
this.principalToDatabaseNameStage = new HashMap<>();

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,9 @@
4040
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4141
import org.neo4j.driver.exceptions.SessionExpiredException;
4242
import org.neo4j.driver.internal.BoltServerAddress;
43-
import org.neo4j.driver.internal.DomainNameResolver;
4443
import org.neo4j.driver.internal.async.ConnectionContext;
4544
import org.neo4j.driver.internal.async.connection.RoutingConnection;
46-
import org.neo4j.driver.internal.cluster.ClusterCompositionProvider;
4745
import org.neo4j.driver.internal.cluster.Rediscovery;
48-
import org.neo4j.driver.internal.cluster.RediscoveryImpl;
49-
import org.neo4j.driver.internal.cluster.RoutingProcedureClusterCompositionProvider;
5046
import org.neo4j.driver.internal.cluster.RoutingSettings;
5147
import org.neo4j.driver.internal.cluster.RoutingTable;
5248
import org.neo4j.driver.internal.cluster.RoutingTableRegistry;
@@ -56,7 +52,6 @@
5652
import org.neo4j.driver.internal.spi.ConnectionProvider;
5753
import org.neo4j.driver.internal.util.Clock;
5854
import org.neo4j.driver.internal.util.Futures;
59-
import org.neo4j.driver.net.ServerAddressResolver;
6055

6156
public class LoadBalancer implements ConnectionProvider {
6257
private static final String CONNECTION_ACQUISITION_COMPLETION_FAILURE_MESSAGE =
@@ -73,27 +68,6 @@ public class LoadBalancer implements ConnectionProvider {
7368
private final Rediscovery rediscovery;
7469

7570
public LoadBalancer(
76-
BoltServerAddress initialRouter,
77-
RoutingSettings settings,
78-
ConnectionPool connectionPool,
79-
EventExecutorGroup eventExecutorGroup,
80-
Clock clock,
81-
Logging logging,
82-
LoadBalancingStrategy loadBalancingStrategy,
83-
ServerAddressResolver resolver,
84-
DomainNameResolver domainNameResolver) {
85-
this(
86-
connectionPool,
87-
createRediscovery(
88-
initialRouter, resolver, settings, clock, logging, requireNonNull(domainNameResolver)),
89-
settings,
90-
loadBalancingStrategy,
91-
eventExecutorGroup,
92-
clock,
93-
logging);
94-
}
95-
96-
private LoadBalancer(
9771
ConnectionPool connectionPool,
9872
Rediscovery rediscovery,
9973
RoutingSettings settings,
@@ -117,6 +91,7 @@ private LoadBalancer(
11791
LoadBalancingStrategy loadBalancingStrategy,
11892
EventExecutorGroup eventExecutorGroup,
11993
Logging logging) {
94+
requireNonNull(rediscovery, "rediscovery must not be null");
12095
this.connectionPool = connectionPool;
12196
this.routingTables = routingTables;
12297
this.rediscovery = rediscovery;
@@ -281,19 +256,14 @@ private static RoutingTableRegistry createRoutingTables(
281256
connectionPool, rediscovery, clock, logging, settings.routingTablePurgeDelayMs());
282257
}
283258

284-
private static Rediscovery createRediscovery(
285-
BoltServerAddress initialRouter,
286-
ServerAddressResolver resolver,
287-
RoutingSettings settings,
288-
Clock clock,
289-
Logging logging,
290-
DomainNameResolver domainNameResolver) {
291-
ClusterCompositionProvider clusterCompositionProvider =
292-
new RoutingProcedureClusterCompositionProvider(clock, settings.routingContext());
293-
return new RediscoveryImpl(initialRouter, clusterCompositionProvider, resolver, logging, domainNameResolver);
294-
}
295-
296259
private static RuntimeException unknownMode(AccessMode mode) {
297260
return new IllegalArgumentException("Mode '" + mode + "' is not supported");
298261
}
262+
263+
/**
264+
* <b>This method is only for testing</b>
265+
*/
266+
public Rediscovery getRediscovery() {
267+
return rediscovery;
268+
}
299269
}

driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
import java.net.URI;
3333
import java.util.Iterator;
3434
import java.util.List;
35+
import java.util.function.Supplier;
3536
import org.junit.jupiter.api.Test;
3637
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3738
import org.neo4j.driver.internal.BoltServerAddress;
3839
import org.neo4j.driver.internal.DriverFactory;
3940
import org.neo4j.driver.internal.InternalDriver;
41+
import org.neo4j.driver.internal.cluster.Rediscovery;
4042
import org.neo4j.driver.internal.cluster.RoutingSettings;
4143
import org.neo4j.driver.internal.metrics.MetricsProvider;
4244
import org.neo4j.driver.internal.retry.RetryLogic;
@@ -147,6 +149,7 @@ protected InternalDriver createRoutingDriver(
147149
RoutingSettings routingSettings,
148150
RetryLogic retryLogic,
149151
MetricsProvider metricsProvider,
152+
Supplier<Rediscovery> rediscoverySupplier,
150153
Config config) {
151154
return driverIterator.next();
152155
}

driver/src/test/java/org/neo4j/driver/integration/SharedEventLoopIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ private Driver createDriver(EventLoopGroup eventLoopGroup) {
8989
RetrySettings.DEFAULT,
9090
Config.defaultConfig(),
9191
eventLoopGroup,
92-
SecurityPlanImpl.insecure());
92+
SecurityPlanImpl.insecure(),
93+
null);
9394
}
9495

9596
private void testConnection(Driver driver) {

driver/src/test/java/org/neo4j/driver/internal/CustomSecurityPlanTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ void testCustomSecurityPlanUsed() {
5151
RetrySettings.DEFAULT,
5252
Config.defaultConfig(),
5353
null,
54-
securityPlan);
54+
securityPlan,
55+
null);
5556

5657
assertFalse(driverFactory.capturedSecurityPlans.isEmpty());
5758
assertTrue(driverFactory.capturedSecurityPlans.stream().allMatch(capturePlan -> capturePlan == securityPlan));

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@
2424
import static org.hamcrest.Matchers.is;
2525
import static org.hamcrest.junit.MatcherAssert.assertThat;
2626
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
2728
import static org.junit.jupiter.api.Assertions.assertNotNull;
2829
import static org.junit.jupiter.api.Assertions.assertThrows;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
2931
import static org.junit.jupiter.api.Assertions.fail;
32+
import static org.mockito.BDDMockito.given;
33+
import static org.mockito.BDDMockito.then;
3034
import static org.mockito.Mockito.any;
3135
import static org.mockito.Mockito.mock;
3236
import static org.mockito.Mockito.never;
@@ -41,6 +45,7 @@
4145
import io.netty.bootstrap.Bootstrap;
4246
import io.netty.util.concurrent.EventExecutorGroup;
4347
import java.net.URI;
48+
import java.util.function.Supplier;
4449
import java.util.stream.Stream;
4550
import org.junit.jupiter.api.Test;
4651
import org.junit.jupiter.params.ParameterizedTest;
@@ -55,6 +60,8 @@
5560
import org.neo4j.driver.internal.async.LeakLoggingNetworkSession;
5661
import org.neo4j.driver.internal.async.NetworkSession;
5762
import org.neo4j.driver.internal.async.connection.BootstrapFactory;
63+
import org.neo4j.driver.internal.cluster.Rediscovery;
64+
import org.neo4j.driver.internal.cluster.RediscoveryImpl;
5865
import org.neo4j.driver.internal.cluster.RoutingContext;
5966
import org.neo4j.driver.internal.cluster.RoutingSettings;
6067
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
@@ -191,6 +198,61 @@ void shouldCreateAppropriateDriverType(String uri) {
191198
}
192199
}
193200

201+
@Test
202+
void shouldUseBuiltInRediscoveryByDefault() {
203+
// GIVEN
204+
var driverFactory = new DriverFactory();
205+
var securityPlan =
206+
new SecuritySettings.SecuritySettingsBuilder().build().createSecurityPlan("neo4j");
207+
208+
// WHEN
209+
var driver = driverFactory.newInstance(
210+
URI.create("neo4j://localhost:7687"),
211+
AuthTokens.none(),
212+
RoutingSettings.DEFAULT,
213+
RetrySettings.DEFAULT,
214+
Config.defaultConfig(),
215+
null,
216+
securityPlan,
217+
null);
218+
219+
// THEN
220+
var sessionFactory = ((InternalDriver) driver).getSessionFactory();
221+
var connectionProvider = ((SessionFactoryImpl) sessionFactory).getConnectionProvider();
222+
var rediscovery = ((LoadBalancer) connectionProvider).getRediscovery();
223+
assertTrue(rediscovery instanceof RediscoveryImpl);
224+
}
225+
226+
@Test
227+
void shouldUseSuppliedRediscovery() {
228+
// GIVEN
229+
var driverFactory = new DriverFactory();
230+
var securityPlan =
231+
new SecuritySettings.SecuritySettingsBuilder().build().createSecurityPlan("neo4j");
232+
@SuppressWarnings("unchecked")
233+
Supplier<Rediscovery> rediscoverySupplier = mock(Supplier.class);
234+
var rediscovery = mock(Rediscovery.class);
235+
given(rediscoverySupplier.get()).willReturn(rediscovery);
236+
237+
// WHEN
238+
var driver = driverFactory.newInstance(
239+
URI.create("neo4j://localhost:7687"),
240+
AuthTokens.none(),
241+
RoutingSettings.DEFAULT,
242+
RetrySettings.DEFAULT,
243+
Config.defaultConfig(),
244+
null,
245+
securityPlan,
246+
rediscoverySupplier);
247+
248+
// THEN
249+
var sessionFactory = ((InternalDriver) driver).getSessionFactory();
250+
var connectionProvider = ((SessionFactoryImpl) sessionFactory).getConnectionProvider();
251+
var actualRediscovery = ((LoadBalancer) connectionProvider).getRediscovery();
252+
then(rediscoverySupplier).should().get();
253+
assertEquals(rediscovery, actualRediscovery);
254+
}
255+
194256
private Driver createDriver(String uri, DriverFactory driverFactory) {
195257
return createDriver(uri, driverFactory, defaultConfig());
196258
}
@@ -239,6 +301,7 @@ protected InternalDriver createRoutingDriver(
239301
RoutingSettings routingSettings,
240302
RetryLogic retryLogic,
241303
MetricsProvider metricsProvider,
304+
Supplier<Rediscovery> rediscoverySupplier,
242305
Config config) {
243306
throw new UnsupportedOperationException("Can't create routing driver");
244307
}
@@ -276,7 +339,8 @@ protected LoadBalancer createLoadBalancer(
276339
ConnectionPool connectionPool,
277340
EventExecutorGroup eventExecutorGroup,
278341
Config config,
279-
RoutingSettings routingSettings) {
342+
RoutingSettings routingSettings,
343+
Supplier<Rediscovery> rediscoverySupplier) {
280344
return null;
281345
}
282346

0 commit comments

Comments
 (0)