Skip to content

Commit 4357602

Browse files
committed
Added least connected load balancing strategy
This replaces the existing round-robin. It gives us better performance with clusters composed of different machine capacities. Lease connected load balancing strategy selects start index in round-robin fashion to avoid always selecting same machine when cluster does not have any running transactions or all machines have same number of active connections.
1 parent ab54cc5 commit 4357602

File tree

9 files changed

+440
-3
lines changed

9 files changed

+440
-3
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import org.neo4j.driver.internal.net.BoltServerAddress;
22+
import org.neo4j.driver.internal.spi.ConnectionPool;
23+
24+
public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy
25+
{
26+
private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex();
27+
private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex();
28+
29+
private final ConnectionPool connectionPool;
30+
31+
public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool )
32+
{
33+
this.connectionPool = connectionPool;
34+
}
35+
36+
@Override
37+
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
38+
{
39+
return select( knownReaders, readersIndex );
40+
}
41+
42+
@Override
43+
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
44+
{
45+
return select( knownWriters, writersIndex );
46+
}
47+
48+
private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex )
49+
{
50+
int size = addresses.length;
51+
if ( size == 0 )
52+
{
53+
return null;
54+
}
55+
else
56+
{
57+
int startIndex = addressesIndex.next( size );
58+
int index = startIndex;
59+
60+
BoltServerAddress leastConnectedAddress = null;
61+
int leastActiveConnections = Integer.MAX_VALUE;
62+
63+
do
64+
{
65+
BoltServerAddress address = addresses[index];
66+
int activeConnections = connectionPool.activeConnections( address );
67+
68+
if ( activeConnections < leastActiveConnections )
69+
{
70+
leastConnectedAddress = address;
71+
leastActiveConnections = activeConnections;
72+
}
73+
74+
if ( index == size - 1 )
75+
{
76+
index = 0;
77+
}
78+
else
79+
{
80+
index++;
81+
}
82+
}
83+
while ( index != startIndex );
84+
85+
return leastConnectedAddress;
86+
}
87+
}
88+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings,
6060
this.connections = connections;
6161
this.routingTable = routingTable;
6262
this.rediscovery = rediscovery;
63-
this.loadBalancingStrategy = new RoundRobinLoadBalancingStrategy();
63+
this.loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( connections );
6464
this.log = log;
6565

6666
refreshRoutingTable();

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ public int size()
120120
return queue.size();
121121
}
122122

123+
public int activeConnections()
124+
{
125+
return acquiredConnections.size();
126+
}
127+
123128
public boolean contains( PooledConnection pooledConnection )
124129
{
125130
return queue.contains( pooledConnection );

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ public boolean hasAddress( BoltServerAddress address )
9595
return pools.containsKey( address );
9696
}
9797

98+
@Override
99+
public int activeConnections( BoltServerAddress address )
100+
{
101+
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
102+
return connectionQueue == null ? 0 : connectionQueue.activeConnections();
103+
}
104+
98105
@Override
99106
public void close()
100107
{

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,29 @@ public interface ConnectionPool extends AutoCloseable
2626
* Acquire a connection - if a live connection exists in the pool, it will
2727
* be used, otherwise a new connection will be created.
2828
*
29-
* @param address The address to acquire
29+
* @param address the address to acquire
3030
*/
3131
PooledConnection acquire( BoltServerAddress address );
3232

3333
/**
3434
* Removes all connections to a given address from the pool.
35-
* @param address The address to remove.
35+
* @param address the address to remove.
3636
*/
3737
void purge( BoltServerAddress address );
3838

39+
/**
40+
* Check if pool has connections for the given address.
41+
*
42+
* @param address the address to check connections.
43+
* @return {@code true} when pool has connections towards the given address, {@code false} otherwise.
44+
*/
3945
boolean hasAddress( BoltServerAddress address );
46+
47+
/**
48+
* Gen number of active connections pool has towards the given address.
49+
*
50+
* @param address the address to get connections.
51+
* @return number of active (checked out of this pool) connections.
52+
*/
53+
int activeConnections( BoltServerAddress address );
4054
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
import org.mockito.Mock;
24+
25+
import org.neo4j.driver.internal.net.BoltServerAddress;
26+
import org.neo4j.driver.internal.spi.ConnectionPool;
27+
28+
import static org.junit.Assert.assertEquals;
29+
import static org.junit.Assert.assertNull;
30+
import static org.mockito.Mockito.when;
31+
import static org.mockito.MockitoAnnotations.initMocks;
32+
33+
public class LeastConnectedLoadBalancingStrategyTest
34+
{
35+
@Mock
36+
private ConnectionPool connectionPool;
37+
private LeastConnectedLoadBalancingStrategy strategy;
38+
39+
@Before
40+
public void setUp() throws Exception
41+
{
42+
initMocks( this );
43+
strategy = new LeastConnectedLoadBalancingStrategy( connectionPool );
44+
}
45+
46+
@Test
47+
public void shouldHandleEmptyReadersArray()
48+
{
49+
assertNull( strategy.selectReader( new BoltServerAddress[0] ) );
50+
}
51+
52+
@Test
53+
public void shouldHandleEmptyWritersArray()
54+
{
55+
assertNull( strategy.selectWriter( new BoltServerAddress[0] ) );
56+
}
57+
58+
@Test
59+
public void shouldHandleSingleReaderWithoutActiveConnections()
60+
{
61+
BoltServerAddress address = new BoltServerAddress( "reader", 9999 );
62+
63+
assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) );
64+
}
65+
66+
@Test
67+
public void shouldHandleSingleWriterWithoutActiveConnections()
68+
{
69+
BoltServerAddress address = new BoltServerAddress( "writer", 9999 );
70+
71+
assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) );
72+
}
73+
74+
@Test
75+
public void shouldHandleSingleReaderWithActiveConnections()
76+
{
77+
BoltServerAddress address = new BoltServerAddress( "reader", 9999 );
78+
when( connectionPool.activeConnections( address ) ).thenReturn( 42 );
79+
80+
assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) );
81+
}
82+
83+
@Test
84+
public void shouldHandleSingleWriterWithActiveConnections()
85+
{
86+
BoltServerAddress address = new BoltServerAddress( "writer", 9999 );
87+
when( connectionPool.activeConnections( address ) ).thenReturn( 24 );
88+
89+
assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) );
90+
}
91+
92+
@Test
93+
public void shouldHandleMultipleReadersWithActiveConnections()
94+
{
95+
BoltServerAddress address1 = new BoltServerAddress( "reader", 1 );
96+
BoltServerAddress address2 = new BoltServerAddress( "reader", 2 );
97+
BoltServerAddress address3 = new BoltServerAddress( "reader", 3 );
98+
99+
when( connectionPool.activeConnections( address1 ) ).thenReturn( 3 );
100+
when( connectionPool.activeConnections( address2 ) ).thenReturn( 4 );
101+
when( connectionPool.activeConnections( address3 ) ).thenReturn( 1 );
102+
103+
assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
104+
}
105+
106+
@Test
107+
public void shouldHandleMultipleWritersWithActiveConnections()
108+
{
109+
BoltServerAddress address1 = new BoltServerAddress( "writer", 1 );
110+
BoltServerAddress address2 = new BoltServerAddress( "writer", 2 );
111+
BoltServerAddress address3 = new BoltServerAddress( "writer", 3 );
112+
BoltServerAddress address4 = new BoltServerAddress( "writer", 4 );
113+
114+
when( connectionPool.activeConnections( address1 ) ).thenReturn( 5 );
115+
when( connectionPool.activeConnections( address2 ) ).thenReturn( 6 );
116+
when( connectionPool.activeConnections( address3 ) ).thenReturn( 0 );
117+
when( connectionPool.activeConnections( address4 ) ).thenReturn( 1 );
118+
119+
assertEquals( address3,
120+
strategy.selectWriter( new BoltServerAddress[]{address1, address2, address3, address4} ) );
121+
}
122+
123+
@Test
124+
public void shouldReturnDifferentReaderOnEveryInvocationWhenNoActiveConnections()
125+
{
126+
BoltServerAddress address1 = new BoltServerAddress( "reader", 1 );
127+
BoltServerAddress address2 = new BoltServerAddress( "reader", 2 );
128+
BoltServerAddress address3 = new BoltServerAddress( "reader", 3 );
129+
130+
assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
131+
assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
132+
assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
133+
134+
assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
135+
assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
136+
assertEquals( address3, strategy.selectReader( new BoltServerAddress[]{address1, address2, address3} ) );
137+
}
138+
139+
@Test
140+
public void shouldReturnDifferentWriterOnEveryInvocationWhenNoActiveConnections()
141+
{
142+
BoltServerAddress address1 = new BoltServerAddress( "writer", 1 );
143+
BoltServerAddress address2 = new BoltServerAddress( "writer", 2 );
144+
145+
assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
146+
assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
147+
148+
assertEquals( address1, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
149+
assertEquals( address2, strategy.selectReader( new BoltServerAddress[]{address1, address2} ) );
150+
}
151+
}

0 commit comments

Comments
 (0)