Skip to content

Commit 510de72

Browse files
committed
Experinental kill switch for using RoundRobinStrategy for load balancing
1 parent 190ed2c commit 510de72

File tree

6 files changed

+92
-19
lines changed

6 files changed

+92
-19
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424

2525
import org.neo4j.driver.internal.cluster.RoutingContext;
2626
import org.neo4j.driver.internal.cluster.RoutingSettings;
27+
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
2728
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
29+
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
30+
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
2831
import org.neo4j.driver.internal.net.BoltServerAddress;
2932
import org.neo4j.driver.internal.net.SocketConnector;
3033
import org.neo4j.driver.internal.net.pooling.PoolSettings;
@@ -146,7 +149,21 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
146149
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
147150
RoutingSettings routingSettings )
148151
{
149-
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging() );
152+
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging(),
153+
loadBalancingStrategy( config, connectionPool ) );
154+
}
155+
156+
private LoadBalancingStrategy loadBalancingStrategy( Config config,
157+
ConnectionPool connectionPool )
158+
{
159+
if ( config.loadBalancingStrategy() == Config.LoadBalancingStrategy.ROUND_ROBIN )
160+
{
161+
return new RoundRobinLoadBalancingStrategy();
162+
}
163+
else
164+
{
165+
return new LeastConnectedLoadBalancingStrategy( connectionPool );
166+
}
150167
}
151168

152169
/**

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,28 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
5353
private final Logger log;
5454

5555
public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
56-
Clock clock, Logging logging )
56+
Clock clock, Logging logging, LoadBalancingStrategy loadBalancingStrategy )
5757
{
5858
this( initialRouter, settings, connections, new ClusterRoutingTable( clock, initialRouter ), clock,
59-
logging.getLog( LOAD_BALANCER_LOG_NAME ) );
59+
logging.getLog( LOAD_BALANCER_LOG_NAME ), loadBalancingStrategy );
6060
}
6161

6262
private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
63-
RoutingTable routingTable, Clock clock, Logger log )
63+
RoutingTable routingTable, Clock clock, Logger log,
64+
LoadBalancingStrategy loadBalancingStrategy )
6465
{
65-
this( connections, routingTable, createRediscovery( initialRouter, settings, clock, log ), log );
66+
this( connections, routingTable, createRediscovery( initialRouter, settings, clock, log ), log,
67+
loadBalancingStrategy );
6668
}
6769

6870
// Used only in testing
69-
public LoadBalancer( ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery, Logger log )
71+
public LoadBalancer( ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery, Logger log,
72+
LoadBalancingStrategy loadBalancingStrategy )
7073
{
7174
this.connections = connections;
7275
this.routingTable = routingTable;
7376
this.rediscovery = rediscovery;
74-
this.loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( connections );
77+
this.loadBalancingStrategy = loadBalancingStrategy;
7578
this.log = log;
7679

7780
refreshRoutingTable();

driver/src/main/java/org/neo4j/driver/v1/Config.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3030
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
3131
import org.neo4j.driver.v1.exceptions.TransientException;
32+
import org.neo4j.driver.v1.util.Experimental;
3233
import org.neo4j.driver.v1.util.Immutable;
3334
import org.neo4j.driver.v1.util.Resource;
3435

@@ -74,6 +75,8 @@ public class Config
7475
private final int connectionTimeoutMillis;
7576
private final RetrySettings retrySettings;
7677

78+
private final LoadBalancingStrategy loadBalancingStrategy;
79+
7780
private Config( ConfigBuilder builder)
7881
{
7982
this.logging = builder.logging;
@@ -88,6 +91,17 @@ private Config( ConfigBuilder builder)
8891
this.routingRetryDelayMillis = builder.routingRetryDelayMillis;
8992
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
9093
this.retrySettings = builder.retrySettings;
94+
this.loadBalancingStrategy = builder.loadBalancingStrategy;
95+
}
96+
97+
/**
98+
* Load Balancing Strategy
99+
*
100+
* @return load balancing strategy to use
101+
*/
102+
public LoadBalancingStrategy loadBalancingStrategy()
103+
{
104+
return loadBalancingStrategy;
91105
}
92106

93107
/**
@@ -210,6 +224,7 @@ public static class ConfigBuilder
210224
private long idleTimeBeforeConnectionTest = PoolSettings.DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST;
211225
private boolean encrypted = true;
212226
private TrustStrategy trustStrategy = trustAllCertificates();
227+
private LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LEAST_CONNECTED;
213228
private int routingFailureLimit = 1;
214229
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
215230
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
@@ -229,6 +244,23 @@ public ConfigBuilder withLogging( Logging logging )
229244
return this;
230245
}
231246

247+
/**
248+
* Provide an alternative load balancing implementation for the driver to use. By default we use
249+
* {@link LoadBalancingStrategy#LEAST_CONNECTED}.
250+
* <p>
251+
* We are experimnenting with different strategies before sticking to one. This could be removed in the next
252+
* minor version
253+
*
254+
* @param loadBalancingStrategy the strategy to use
255+
* @return this builder
256+
*/
257+
@Experimental
258+
public ConfigBuilder withLoadBalancingStrategy( LoadBalancingStrategy loadBalancingStrategy )
259+
{
260+
this.loadBalancingStrategy = loadBalancingStrategy;
261+
return this;
262+
}
263+
232264
/**
233265
* Enable logging of leaked sessions.
234266
* <p>
@@ -542,6 +574,13 @@ public enum EncryptionLevel
542574
REQUIRED
543575
}
544576

577+
@Experimental
578+
public enum LoadBalancingStrategy
579+
{
580+
ROUND_ROBIN,
581+
LEAST_CONNECTED
582+
}
583+
545584
/**
546585
* Control how the driver determines if it can trust the encryption certificates provided by the Neo4j instance it is connected to.
547586
*/

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map;
3131

3232
import org.neo4j.driver.internal.cluster.RoutingSettings;
33+
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
3334
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
3435
import org.neo4j.driver.internal.net.BoltServerAddress;
3536
import org.neo4j.driver.internal.retry.FixedRetryLogic;
@@ -347,7 +348,8 @@ private final Driver driverWithServers( long ttl, Map<String,Object>... serverIn
347348
private Driver driverWithPool( ConnectionPool pool )
348349
{
349350
RoutingSettings settings = new RoutingSettings( 10, 5_000, null );
350-
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging );
351+
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging,
352+
new LeastConnectedLoadBalancingStrategy( pool ) );
351353
Config config = Config.build().withLogging( logging ).toConfig();
352354
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );
353355
return new InternalDriver( insecure(), sessionFactory, logging );

driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.HashSet;
3131
import java.util.List;
3232

33+
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
3334
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
3435
import org.neo4j.driver.internal.net.BoltServerAddress;
3536
import org.neo4j.driver.internal.net.pooling.PoolSettings;
@@ -325,7 +326,8 @@ private static LoadBalancer newLoadBalancer( ClusterComposition clusterCompositi
325326
{
326327
Rediscovery rediscovery = mock( Rediscovery.class );
327328
when( rediscovery.lookupClusterComposition( routingTable, connectionPool ) ).thenReturn( clusterComposition );
328-
return new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
329+
return new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER,
330+
new LeastConnectedLoadBalancingStrategy( connectionPool ) );
329331
}
330332

331333
private interface ConnectionMethod

driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale
9393
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set );
9494

9595
// when
96-
LoadBalancer balancer = new LoadBalancer( conns, routingTable, rediscovery, DEV_NULL_LOGGER );
96+
LoadBalancer balancer = new LoadBalancer( conns, routingTable, rediscovery, DEV_NULL_LOGGER,
97+
new LeastConnectedLoadBalancingStrategy( conns ) );
9798

9899
// then
99100
assertNotNull( balancer );
@@ -109,7 +110,8 @@ public void shouldRefreshRoutingTableOnInitialization() throws Exception
109110
// given & when
110111
final AtomicInteger refreshRoutingTableCounter = new AtomicInteger( 0 );
111112
LoadBalancer balancer = new LoadBalancer( mock( ConnectionPool.class ), mock( RoutingTable.class ),
112-
mock( Rediscovery.class ), DEV_NULL_LOGGER )
113+
mock( Rediscovery.class ), DEV_NULL_LOGGER,
114+
new LeastConnectedLoadBalancingStrategy( mock( ConnectionPool.class ) ) )
113115
{
114116
@Override
115117
synchronized void refreshRoutingTable()
@@ -163,7 +165,8 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
163165
RoutingTable routingTable = mock( RoutingTable.class );
164166
ConnectionPool connectionPool = mock( ConnectionPool.class );
165167
Rediscovery rediscovery = mock( Rediscovery.class );
166-
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
168+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER,
169+
new LeastConnectedLoadBalancingStrategy( connectionPool ) );
167170
BoltServerAddress address = new BoltServerAddress( "host", 42 );
168171

169172
PooledConnection connection = newConnectionWithFailingSync( address );
@@ -197,7 +200,8 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
197200
PooledConnection connectionWithFailingSync = newConnectionWithFailingSync( address );
198201
when( connectionPool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connectionWithFailingSync );
199202
Rediscovery rediscovery = mock( Rediscovery.class );
200-
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
203+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER,
204+
new LeastConnectedLoadBalancingStrategy( connectionPool ) );
201205

202206
Session session = newSession( loadBalancer );
203207
// begin transaction to make session obtain a connection
@@ -243,7 +247,8 @@ public void shouldThrowWhenRediscoveryReturnsNoSuitableServers()
243247
when( routingTable.readers() ).thenReturn( new AddressSet() );
244248
when( routingTable.writers() ).thenReturn( new AddressSet() );
245249

246-
LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER );
250+
LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER,
251+
new LeastConnectedLoadBalancingStrategy( connections ) );
247252

248253
try
249254
{
@@ -283,7 +288,8 @@ public void shouldSelectLeastConnectedAddress()
283288

284289
Rediscovery rediscovery = mock( Rediscovery.class );
285290

286-
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
291+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER,
292+
new LeastConnectedLoadBalancingStrategy( connectionPool ) );
287293

288294
Set<BoltServerAddress> seenAddresses = new HashSet<>();
289295
for ( int i = 0; i < 10; i++ )
@@ -309,7 +315,8 @@ public void shouldRoundRobinWhenNoActiveConnections()
309315

310316
Rediscovery rediscovery = mock( Rediscovery.class );
311317

312-
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
318+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER,
319+
new LeastConnectedLoadBalancingStrategy( connectionPool ) );
313320

314321
Set<BoltServerAddress> seenAddresses = new HashSet<>();
315322
for ( int i = 0; i < 10; i++ )
@@ -330,7 +337,8 @@ private void testRediscoveryWhenStale( AccessMode mode )
330337
RoutingTable routingTable = newStaleRoutingTableMock( mode );
331338
Rediscovery rediscovery = newRediscoveryMock();
332339

333-
LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER );
340+
LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER,
341+
new LeastConnectedLoadBalancingStrategy( connections ) );
334342
verify( rediscovery ).lookupClusterComposition( routingTable, connections );
335343

336344
assertNotNull( loadBalancer.acquireConnection( mode ) );
@@ -346,7 +354,8 @@ private void testNoRediscoveryWhenNotStale( AccessMode staleMode, AccessMode not
346354
RoutingTable routingTable = newStaleRoutingTableMock( staleMode );
347355
Rediscovery rediscovery = newRediscoveryMock();
348356

349-
LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER );
357+
LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER,
358+
new LeastConnectedLoadBalancingStrategy( connections ) );
350359
verify( rediscovery ).lookupClusterComposition( routingTable, connections );
351360

352361
assertNotNull( loadBalancer.acquireConnection( notStaleMode ) );
@@ -379,7 +388,8 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne
379388
when( routingTable.readers() ).thenReturn( readerAddrs );
380389
when( routingTable.writers() ).thenReturn( writerAddrs );
381390

382-
return new LoadBalancer( connPool, routingTable, rediscovery, DEV_NULL_LOGGER );
391+
return new LoadBalancer( connPool, routingTable, rediscovery, DEV_NULL_LOGGER,
392+
new LeastConnectedLoadBalancingStrategy( connPool ) );
383393
}
384394

385395
private static Session newSession( LoadBalancer loadBalancer )

0 commit comments

Comments
 (0)