Skip to content

Commit f024d8b

Browse files
committed
Fixed issue with connection pool deactivation
Previously load balancer moved connection pool to deactivated state when corresponding cluster member had a network error. This was made to disallow any new connections towards that member. Member has also been removed from the routing table, so that callers never try to acquire connection from a deactivated pool. Deactivated pools were re-activated during rediscovery. However, there was a case when deactivated pool towards the seed router would remain deactivated forever without a chance to be re-activated. It happened when connections to all cores failed and driver had to perform rediscovery using seed router. In this case all connections pools were deactivated, and rediscovery was not able to complete because it failed trying to obtain connection from a deactivated pool. This commit fixes the problem by removing pool activation/deactivation. Instead driver will simply make instance non-routable by removing it's address from the routing table. Corresponding connection pool will not be changed. Later rediscovery will cleanup pools for non-routable addresses that have no active connections.
1 parent 915d6a6 commit f024d8b

16 files changed

+400
-407
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public boolean isStaleFor( AccessMode mode )
6464
mode == AccessMode.WRITE && writers.size() == 0;
6565
}
6666

67-
private Set<BoltServerAddress> servers()
67+
@Override
68+
public Set<BoltServerAddress> servers()
6869
{
6970
Set<BoltServerAddress> servers = new HashSet<>();
7071
servers.addAll( readers.servers() );

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,13 @@ private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSe
110110

111111
private synchronized void forget( BoltServerAddress address )
112112
{
113-
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
113+
// remove from the routing table, to prevent concurrent threads from making connections to this address
114114
routingTable.forget( address );
115+
115116
if ( PURGE_ON_ERROR )
116117
{
117118
connections.purge( address );
118119
}
119-
else
120-
{
121-
connections.deactivate( address );
122-
}
123120
}
124121

125122
synchronized void ensureRouting( AccessMode mode )
@@ -153,15 +150,7 @@ private void updateConnectionPool( RoutingTableChange routingTableChange )
153150
}
154151
else
155152
{
156-
for ( BoltServerAddress addedAddress : routingTableChange.added() )
157-
{
158-
connections.activate( addedAddress );
159-
}
160-
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
161-
{
162-
connections.deactivate( removedAddress );
163-
}
164-
connections.compact();
153+
connections.retainAll( routingTable.servers() );
165154
}
166155
}
167156

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.Set;
22+
2123
import org.neo4j.driver.internal.net.BoltServerAddress;
2224
import org.neo4j.driver.v1.AccessMode;
2325

@@ -37,5 +39,7 @@ public interface RoutingTable
3739

3840
int routerSize();
3941

42+
Set<BoltServerAddress> servers();
43+
4044
void removeWriter( BoltServerAddress toRemove );
4145
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.concurrent.BlockingQueue;
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.LinkedBlockingQueue;
26-
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2727

2828
import org.neo4j.driver.internal.logging.DelegatingLogger;
2929
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -40,15 +40,11 @@ public class BlockingPooledConnectionQueue
4040
{
4141
public static final String LOG_NAME = "ConnectionQueue";
4242

43-
private static final int ACTIVE = 1;
44-
private static final int INACTIVE = 2;
45-
private static final int TERMINATED = 3;
46-
4743
/** The backing queue, keeps track of connections currently in queue */
4844
private final BlockingQueue<PooledConnection> queue;
4945
private final Logger logger;
5046

51-
private final AtomicInteger state = new AtomicInteger( ACTIVE );
47+
private final AtomicBoolean terminated = new AtomicBoolean();
5248

5349
/** Keeps track of acquired connections */
5450
private final Set<PooledConnection> acquiredConnections =
@@ -75,7 +71,7 @@ public boolean offer( PooledConnection pooledConnection )
7571
{
7672
disposeSafely( pooledConnection );
7773
}
78-
if ( state.get() != ACTIVE )
74+
if ( terminated.get() )
7975
{
8076
terminateIdleConnections();
8177
}
@@ -96,13 +92,11 @@ public PooledConnection acquire( Supplier<PooledConnection> supplier )
9692
}
9793
acquiredConnections.add( connection );
9894

99-
int poolState = state.get();
100-
if ( poolState != ACTIVE )
95+
if ( terminated.get() )
10196
{
10297
acquiredConnections.remove( connection );
10398
disposeSafely( connection );
104-
throw new IllegalStateException( "Pool is " + (poolState == INACTIVE ? "deactivated" : "terminated") +
105-
", new connections can't be acquired" );
99+
throw new IllegalStateException( "Pool is terminated, new connections can't be acquired" );
106100
}
107101
else
108102
{
@@ -131,24 +125,6 @@ public boolean contains( PooledConnection pooledConnection )
131125
return queue.contains( pooledConnection );
132126
}
133127

134-
public void activate()
135-
{
136-
state.compareAndSet( INACTIVE, ACTIVE );
137-
}
138-
139-
public void deactivate()
140-
{
141-
if ( state.compareAndSet( ACTIVE, INACTIVE ) )
142-
{
143-
terminateIdleConnections();
144-
}
145-
}
146-
147-
public boolean isActive()
148-
{
149-
return state.get() == ACTIVE;
150-
}
151-
152128
/**
153129
* Terminates all connections, both those that are currently in the queue as well
154130
* as those that have been acquired.
@@ -157,7 +133,7 @@ public boolean isActive()
157133
*/
158134
public void terminate()
159135
{
160-
if ( state.getAndSet( TERMINATED ) != TERMINATED )
136+
if ( terminated.compareAndSet( false, true ) )
161137
{
162138
terminateIdleConnections();
163139
terminateAcquiredConnections();

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.Collections;
2221
import java.util.Map;
2322
import java.util.Set;
2423
import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +32,7 @@
3332
import org.neo4j.driver.internal.spi.PooledConnection;
3433
import org.neo4j.driver.internal.util.Clock;
3534
import org.neo4j.driver.internal.util.Supplier;
35+
import org.neo4j.driver.v1.Logger;
3636
import org.neo4j.driver.v1.Logging;
3737

3838
/**
@@ -61,6 +61,7 @@ public class SocketConnectionPool implements ConnectionPool
6161
private final ConnectionValidator<PooledConnection> connectionValidator;
6262
private final Clock clock;
6363
private final Logging logging;
64+
private final Logger log;
6465

6566
public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clock clock, Logging logging )
6667
{
@@ -69,6 +70,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo
6970
this.connectionValidator = new PooledConnectionValidator( this );
7071
this.clock = clock;
7172
this.logging = logging;
73+
this.log = logging.getLog( getClass().getSimpleName() );
7274
}
7375

7476
@Override
@@ -93,46 +95,33 @@ public void purge( BoltServerAddress address )
9395
}
9496

9597
@Override
96-
public void activate( BoltServerAddress address )
97-
{
98-
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
99-
if ( connectionQueue != null )
100-
{
101-
connectionQueue.activate();
102-
}
103-
}
104-
105-
@Override
106-
public void deactivate( BoltServerAddress address )
107-
{
108-
BlockingPooledConnectionQueue connections = pools.get( address );
109-
if ( connections != null )
110-
{
111-
connections.deactivate();
112-
}
113-
}
114-
115-
@Override
116-
public void compact()
98+
public void retainAll( Set<BoltServerAddress> addressesToRetain )
11799
{
118100
for ( Map.Entry<BoltServerAddress,BlockingPooledConnectionQueue> entry : pools.entrySet() )
119101
{
120102
BoltServerAddress address = entry.getKey();
121-
BlockingPooledConnectionQueue queue = entry.getValue();
103+
BlockingPooledConnectionQueue pool = entry.getValue();
122104

123-
if ( !queue.isActive() && queue.activeConnections() == 0 )
105+
if ( !addressesToRetain.contains( address ) && pool.activeConnections() == 0 )
124106
{
125-
// queue has been in deactivated state and has no open connections by now
126-
pools.remove( address );
107+
// address is not present in the updated routing table and has no active connections
108+
// it's now safe to terminate corresponding connection pool and forget about it
109+
110+
BlockingPooledConnectionQueue removedPool = pools.remove( address );
111+
if ( removedPool != null )
112+
{
113+
log.info( "Closing connection pool towards %s, it has no active connections " +
114+
"and is not in the routing table", address );
115+
removedPool.terminate();
116+
}
127117
}
128118
}
129119
}
130120

131121
@Override
132122
public boolean hasAddress( BoltServerAddress address )
133123
{
134-
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
135-
return connectionQueue != null && connectionQueue.isActive();
124+
return pools.containsKey( address );
136125
}
137126

138127
@Override
@@ -152,17 +141,7 @@ public void close()
152141
public int activeConnections( BoltServerAddress address )
153142
{
154143
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
155-
if ( connectionQueue == null || !connectionQueue.isActive() )
156-
{
157-
return 0;
158-
}
159-
return connectionQueue.activeConnections();
160-
}
161-
162-
// test-only accessor
163-
Set<BoltServerAddress> addresses()
164-
{
165-
return Collections.unmodifiableSet( pools.keySet() );
144+
return connectionQueue == null ? 0 : connectionQueue.activeConnections();
166145
}
167146

168147
private BlockingPooledConnectionQueue pool( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,8 @@ private HandshakeStatus wrap( ByteBuffer buffer ) throws IOException, ClientExce
372372
cipherOut.compact();
373373
}
374374
break;
375+
case CLOSED:
376+
throw new IOException( "TLS socket channel is closed" );
375377
default:
376378
throw new ClientException( "Got unexpected status " + status );
377379
}

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.spi;
2020

21+
import java.util.Set;
22+
2123
import org.neo4j.driver.internal.net.BoltServerAddress;
2224

2325
public interface ConnectionPool extends AutoCloseable
@@ -36,11 +38,7 @@ public interface ConnectionPool extends AutoCloseable
3638
*/
3739
void purge( BoltServerAddress address );
3840

39-
void activate( BoltServerAddress address );
40-
41-
void deactivate( BoltServerAddress address );
42-
43-
void compact();
41+
void retainAll( Set<BoltServerAddress> addressesToRetain );
4442

4543
boolean hasAddress( BoltServerAddress address );
4644
}

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.ArrayList;
21+
import java.util.Collections;
2222
import java.util.LinkedHashSet;
2323
import java.util.List;
2424
import java.util.Set;
@@ -41,7 +41,7 @@ private ClusterCompositionUtil() {}
4141
public static final BoltServerAddress E = new BoltServerAddress( "5555:55" );
4242
public static final BoltServerAddress F = new BoltServerAddress( "6666:66" );
4343

44-
public static final List<BoltServerAddress> EMPTY = new ArrayList<>();
44+
public static final List<BoltServerAddress> EMPTY = Collections.emptyList();
4545

4646
public static final ClusterComposition VALID_CLUSTER_COMPOSITION =
4747
createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) );

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.junit.Test;
2222

23+
import java.util.HashSet;
2324
import java.util.List;
25+
import java.util.Set;
2426

2527
import org.neo4j.driver.internal.net.BoltServerAddress;
2628
import org.neo4j.driver.internal.util.FakeClock;
@@ -231,4 +233,25 @@ public void shouldNotRemoveServerIfPreWriterNowReader()
231233
assertEquals( 2, change.removed().size() );
232234
assertThat( change.removed(), containsInAnyOrder( A, C ) );
233235
}
236+
237+
@Test
238+
public void shouldReturnNoServersWhenEmpty()
239+
{
240+
ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() );
241+
242+
Set<BoltServerAddress> servers = routingTable.servers();
243+
244+
assertEquals( 0, servers.size() );
245+
}
246+
247+
@Test
248+
public void shouldReturnAllServers()
249+
{
250+
ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() );
251+
routingTable.update( createClusterComposition( asList( A, B, C ), asList( B, C, D ), asList( C, D, E, F ) ) );
252+
253+
Set<BoltServerAddress> servers = routingTable.servers();
254+
255+
assertEquals( new HashSet<>( asList( A, B, C, D, E, F ) ), servers );
256+
}
234257
}

0 commit comments

Comments
 (0)