Skip to content

Commit ac73660

Browse files
committed
Decouple load balancing logic from address set
Round-robin load balancing was previously coupled with the address set implementation and made it hard to use different load balancing algorithm. This commit turns RoundRobinAddressSet into a simple concurrent set of addresses and moves load balancing logic into a separate component that takes list of available addresses and returns one that should be used next. This allows easier implementation of new load balancing algorithms. Rediscovery procedure will now not use load balancing and query routers in a known order (which is randomized by the database).
1 parent dbc4912 commit ac73660

17 files changed

+614
-330
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java renamed to driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,23 @@
2020

2121
import java.util.Arrays;
2222
import java.util.Set;
23-
import java.util.concurrent.atomic.AtomicInteger;
2423

2524
import org.neo4j.driver.internal.net.BoltServerAddress;
2625

27-
public class RoundRobinAddressSet
26+
public class AddressSet
2827
{
2928
private static final BoltServerAddress[] NONE = {};
30-
private final AtomicInteger offset = new AtomicInteger();
31-
private volatile BoltServerAddress[] addresses = NONE;
3229

33-
public int size()
34-
{
35-
return addresses.length;
36-
}
30+
private volatile BoltServerAddress[] addresses = NONE;
3731

38-
public BoltServerAddress next()
32+
public BoltServerAddress[] toArray()
3933
{
40-
BoltServerAddress[] addresses = this.addresses;
41-
if ( addresses.length == 0 )
42-
{
43-
return null;
44-
}
45-
return addresses[next( addresses.length )];
34+
return addresses;
4635
}
4736

48-
int next( int divisor )
37+
public int size()
4938
{
50-
int index = offset.getAndIncrement();
51-
for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() )
52-
{
53-
offset.compareAndSet( Integer.MIN_VALUE, index % divisor );
54-
}
55-
return index % divisor;
39+
return addresses.length;
5640
}
5741

5842
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
@@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address )
132116
@Override
133117
public String toString()
134118
{
135-
return "RoundRobinAddressSet=" + Arrays.toString( addresses );
136-
}
137-
138-
/** breaking encapsulation in order to perform white-box testing of boundary case */
139-
void setOffset( int target )
140-
{
141-
offset.set( target );
119+
return "AddressSet=" + Arrays.toString( addresses );
142120
}
143121
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable
3636

3737
private final Clock clock;
3838
private volatile long expirationTimeout;
39-
private final RoundRobinAddressSet readers;
40-
private final RoundRobinAddressSet writers;
41-
private final RoundRobinAddressSet routers;
39+
private final AddressSet readers;
40+
private final AddressSet writers;
41+
private final AddressSet routers;
4242

4343
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4444
{
@@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock )
5151
this.clock = clock;
5252
this.expirationTimeout = clock.millis() - 1;
5353

54-
this.readers = new RoundRobinAddressSet();
55-
this.writers = new RoundRobinAddressSet();
56-
this.routers = new RoundRobinAddressSet();
54+
this.readers = new AddressSet();
55+
this.writers = new AddressSet();
56+
this.routers = new AddressSet();
5757
}
5858

5959
@Override
@@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address )
8585
}
8686

8787
@Override
88-
public RoundRobinAddressSet readers()
88+
public AddressSet readers()
8989
{
9090
return readers;
9191
}
9292

9393
@Override
94-
public RoundRobinAddressSet writers()
94+
public AddressSet writers()
9595
{
9696
return writers;
9797
}
9898

9999
@Override
100-
public BoltServerAddress nextRouter()
100+
public AddressSet routers()
101101
{
102-
return routers.next();
103-
}
104-
105-
@Override
106-
public int routerSize()
107-
{
108-
return routers.size();
102+
return routers;
109103
}
110104

111105
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
3939
private final ConnectionPool connections;
4040
private final RoutingTable routingTable;
4141
private final Rediscovery rediscovery;
42+
private final LoadBalancingStrategy loadBalancingStrategy;
4243
private final Logger log;
4344

4445
public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
@@ -59,6 +60,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
5960
this.connections = connections;
6061
this.routingTable = routingTable;
6162
this.rediscovery = rediscovery;
63+
this.loadBalancingStrategy = new RoundRobinLoadBalancingStrategy();
6264
this.log = log;
6365

6466
refreshRoutingTable();
@@ -67,7 +69,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
6769
@Override
6870
public PooledConnection acquireConnection( AccessMode mode )
6971
{
70-
RoundRobinAddressSet addressSet = addressSetFor( mode );
72+
AddressSet addressSet = addressSetFor( mode );
7173
PooledConnection connection = acquireConnection( mode, addressSet );
7274
return new RoutingPooledConnection( connection, this, mode );
7375
}
@@ -90,10 +92,10 @@ public void close() throws Exception
9092
connections.close();
9193
}
9294

93-
private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers )
95+
private PooledConnection acquireConnection( AccessMode mode, AddressSet servers )
9496
{
9597
ensureRouting( mode );
96-
for ( BoltServerAddress address; (address = servers.next()) != null; )
98+
for ( BoltServerAddress address; (address = selectAddress( mode, servers )) != null; )
9799
{
98100
try
99101
{
@@ -141,7 +143,7 @@ synchronized void refreshRoutingTable()
141143
log.info( "Refreshed routing information. %s", routingTable );
142144
}
143145

144-
private RoundRobinAddressSet addressSetFor( AccessMode mode )
146+
private AddressSet addressSetFor( AccessMode mode )
145147
{
146148
switch ( mode )
147149
{
@@ -150,7 +152,22 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
150152
case WRITE:
151153
return routingTable.writers();
152154
default:
153-
throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
155+
throw unknownMode( mode );
156+
}
157+
}
158+
159+
private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers )
160+
{
161+
BoltServerAddress[] addresses = servers.toArray();
162+
163+
switch ( mode )
164+
{
165+
case READ:
166+
return loadBalancingStrategy.selectReader( addresses );
167+
case WRITE:
168+
return loadBalancingStrategy.selectWriter( addresses );
169+
default:
170+
throw unknownMode( mode );
154171
}
155172
}
156173

@@ -161,4 +178,9 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R
161178
new RoutingProcedureClusterCompositionProvider( clock, log, settings );
162179
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) );
163180
}
181+
182+
private static RuntimeException unknownMode( AccessMode mode )
183+
{
184+
return new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
185+
}
164186
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import org.neo4j.driver.internal.net.BoltServerAddress;
22+
23+
public interface LoadBalancingStrategy
24+
{
25+
BoltServerAddress selectReader( BoltServerAddress[] knownReaders );
26+
27+
BoltServerAddress selectWriter( BoltServerAddress[] knownWriters );
28+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable
136136
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
137137
Set<BoltServerAddress> seenServers )
138138
{
139-
int size = routingTable.routerSize();
140-
for ( int i = 0; i < size; i++ )
141-
{
142-
BoltServerAddress address = routingTable.nextRouter();
143-
if ( address == null )
144-
{
145-
break;
146-
}
139+
BoltServerAddress[] addresses = routingTable.routers().toArray();
147140

141+
for ( BoltServerAddress address : addresses )
142+
{
148143
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
149144
if ( composition != null )
150145
{
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
public class RoundRobinArrayIndex
24+
{
25+
private final AtomicInteger offset;
26+
27+
RoundRobinArrayIndex()
28+
{
29+
this( 0 );
30+
}
31+
32+
// only for testing
33+
RoundRobinArrayIndex( int initialOffset )
34+
{
35+
this.offset = new AtomicInteger( initialOffset );
36+
}
37+
38+
public int next( int arrayLength )
39+
{
40+
if ( arrayLength == 0 )
41+
{
42+
return -1;
43+
}
44+
45+
int nextOffset;
46+
while ( (nextOffset = offset.getAndIncrement()) < 0 )
47+
{
48+
// overflow, try resetting back to zero
49+
offset.compareAndSet( nextOffset + 1, 0 );
50+
}
51+
return nextOffset % arrayLength;
52+
}
53+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import org.neo4j.driver.internal.net.BoltServerAddress;
22+
23+
public class RoundRobinLoadBalancingStrategy implements LoadBalancingStrategy
24+
{
25+
private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex();
26+
private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex();
27+
28+
@Override
29+
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
30+
{
31+
return select( knownReaders, readersIndex );
32+
}
33+
34+
@Override
35+
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
36+
{
37+
return select( knownWriters, writersIndex );
38+
}
39+
40+
private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex roundRobinIndex )
41+
{
42+
int length = addresses.length;
43+
if ( length == 0 )
44+
{
45+
return null;
46+
}
47+
int index = roundRobinIndex.next( length );
48+
return addresses[index];
49+
}
50+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,11 @@ public interface RoutingTable
3131

3232
void forget( BoltServerAddress address );
3333

34-
RoundRobinAddressSet readers();
34+
AddressSet readers();
3535

36-
RoundRobinAddressSet writers();
36+
AddressSet writers();
3737

38-
BoltServerAddress nextRouter();
39-
40-
int routerSize();
38+
AddressSet routers();
4139

4240
void removeWriter( BoltServerAddress toRemove );
4341
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -808,11 +808,11 @@ public void shouldRetryReadTransactionAndPerformRediscoveryUntilSuccess() throws
808808
@Test
809809
public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throws Exception
810810
{
811-
StubServer router1 = StubServer.start( "acquire_endpoints.script", 9010 );
812-
StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9007 );
811+
StubServer router1 = StubServer.start( "discover_servers.script", 9010 );
812+
StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9001 );
813+
StubServer router2 = StubServer.start( "acquire_endpoints.script", 9002 );
813814
StubServer brokenWriter2 = StubServer.start( "dead_write_server.script", 9008 );
814-
StubServer router2 = StubServer.start( "discover_servers.script", 9002 );
815-
StubServer writer = StubServer.start( "write_server.script", 9001 );
815+
StubServer writer = StubServer.start( "write_server.script", 9007 );
816816

817817
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9010" );
818818
Session session = driver.session() )
@@ -827,9 +827,9 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw
827827
{
828828
assertEquals( 0, router1.exitStatus() );
829829
assertEquals( 0, brokenWriter1.exitStatus() );
830-
assertEquals( 0, brokenWriter2.exitStatus() );
831830
assertEquals( 0, router2.exitStatus() );
832831
assertEquals( 0, writer.exitStatus() );
832+
assertEquals( 0, brokenWriter2.exitStatus() );
833833
}
834834
}
835835

0 commit comments

Comments
 (0)