Skip to content

Commit 07d4c0c

Browse files
committed
Fixed issue with connection pool deactivation
1 parent 915d6a6 commit 07d4c0c

15 files changed

+376
-369
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public boolean isStaleFor( AccessMode mode )
6464
mode == AccessMode.WRITE && writers.size() == 0;
6565
}
6666

67-
private Set<BoltServerAddress> servers()
67+
@Override
68+
public Set<BoltServerAddress> servers()
6869
{
6970
Set<BoltServerAddress> servers = new HashSet<>();
7071
servers.addAll( readers.servers() );

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,13 @@ private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSe
110110

111111
private synchronized void forget( BoltServerAddress address )
112112
{
113-
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
113+
// remove from the routing table, to prevent concurrent threads from making connections to this address
114114
routingTable.forget( address );
115+
115116
if ( PURGE_ON_ERROR )
116117
{
117118
connections.purge( address );
118119
}
119-
else
120-
{
121-
connections.deactivate( address );
122-
}
123120
}
124121

125122
synchronized void ensureRouting( AccessMode mode )
@@ -153,15 +150,7 @@ private void updateConnectionPool( RoutingTableChange routingTableChange )
153150
}
154151
else
155152
{
156-
for ( BoltServerAddress addedAddress : routingTableChange.added() )
157-
{
158-
connections.activate( addedAddress );
159-
}
160-
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
161-
{
162-
connections.deactivate( removedAddress );
163-
}
164-
connections.compact();
153+
connections.retainAll( routingTable.servers() );
165154
}
166155
}
167156

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.Set;
22+
2123
import org.neo4j.driver.internal.net.BoltServerAddress;
2224
import org.neo4j.driver.v1.AccessMode;
2325

@@ -37,5 +39,7 @@ public interface RoutingTable
3739

3840
int routerSize();
3941

42+
Set<BoltServerAddress> servers();
43+
4044
void removeWriter( BoltServerAddress toRemove );
4145
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 6 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.concurrent.BlockingQueue;
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.LinkedBlockingQueue;
26-
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2727

2828
import org.neo4j.driver.internal.logging.DelegatingLogger;
2929
import org.neo4j.driver.internal.net.BoltServerAddress;
@@ -40,15 +40,11 @@ public class BlockingPooledConnectionQueue
4040
{
4141
public static final String LOG_NAME = "ConnectionQueue";
4242

43-
private static final int ACTIVE = 1;
44-
private static final int INACTIVE = 2;
45-
private static final int TERMINATED = 3;
46-
4743
/** The backing queue, keeps track of connections currently in queue */
4844
private final BlockingQueue<PooledConnection> queue;
4945
private final Logger logger;
5046

51-
private final AtomicInteger state = new AtomicInteger( ACTIVE );
47+
private final AtomicBoolean terminated = new AtomicBoolean();
5248

5349
/** Keeps track of acquired connections */
5450
private final Set<PooledConnection> acquiredConnections =
@@ -75,7 +71,7 @@ public boolean offer( PooledConnection pooledConnection )
7571
{
7672
disposeSafely( pooledConnection );
7773
}
78-
if ( state.get() != ACTIVE )
74+
if ( terminated.get() )
7975
{
8076
terminateIdleConnections();
8177
}
@@ -96,13 +92,11 @@ public PooledConnection acquire( Supplier<PooledConnection> supplier )
9692
}
9793
acquiredConnections.add( connection );
9894

99-
int poolState = state.get();
100-
if ( poolState != ACTIVE )
95+
if ( terminated.get() )
10196
{
10297
acquiredConnections.remove( connection );
10398
disposeSafely( connection );
104-
throw new IllegalStateException( "Pool is " + (poolState == INACTIVE ? "deactivated" : "terminated") +
105-
", new connections can't be acquired" );
99+
throw new IllegalStateException( "Pool is terminated, new connections can't be acquired" );
106100
}
107101
else
108102
{
@@ -131,24 +125,6 @@ public boolean contains( PooledConnection pooledConnection )
131125
return queue.contains( pooledConnection );
132126
}
133127

134-
public void activate()
135-
{
136-
state.compareAndSet( INACTIVE, ACTIVE );
137-
}
138-
139-
public void deactivate()
140-
{
141-
if ( state.compareAndSet( ACTIVE, INACTIVE ) )
142-
{
143-
terminateIdleConnections();
144-
}
145-
}
146-
147-
public boolean isActive()
148-
{
149-
return state.get() == ACTIVE;
150-
}
151-
152128
/**
153129
* Terminates all connections, both those that are currently in the queue as well
154130
* as those that have been acquired.
@@ -157,7 +133,7 @@ public boolean isActive()
157133
*/
158134
public void terminate()
159135
{
160-
if ( state.getAndSet( TERMINATED ) != TERMINATED )
136+
if ( terminated.compareAndSet( false, true ) )
161137
{
162138
terminateIdleConnections();
163139
terminateAcquiredConnections();

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 18 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.Collections;
2221
import java.util.Map;
2322
import java.util.Set;
2423
import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +32,7 @@
3332
import org.neo4j.driver.internal.spi.PooledConnection;
3433
import org.neo4j.driver.internal.util.Clock;
3534
import org.neo4j.driver.internal.util.Supplier;
35+
import org.neo4j.driver.v1.Logger;
3636
import org.neo4j.driver.v1.Logging;
3737

3838
/**
@@ -61,6 +61,7 @@ public class SocketConnectionPool implements ConnectionPool
6161
private final ConnectionValidator<PooledConnection> connectionValidator;
6262
private final Clock clock;
6363
private final Logging logging;
64+
private final Logger log;
6465

6566
public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clock clock, Logging logging )
6667
{
@@ -69,6 +70,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo
6970
this.connectionValidator = new PooledConnectionValidator( this );
7071
this.clock = clock;
7172
this.logging = logging;
73+
this.log = logging.getLog( getClass().getSimpleName() );
7274
}
7375

7476
@Override
@@ -93,46 +95,33 @@ public void purge( BoltServerAddress address )
9395
}
9496

9597
@Override
96-
public void activate( BoltServerAddress address )
97-
{
98-
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
99-
if ( connectionQueue != null )
100-
{
101-
connectionQueue.activate();
102-
}
103-
}
104-
105-
@Override
106-
public void deactivate( BoltServerAddress address )
107-
{
108-
BlockingPooledConnectionQueue connections = pools.get( address );
109-
if ( connections != null )
110-
{
111-
connections.deactivate();
112-
}
113-
}
114-
115-
@Override
116-
public void compact()
98+
public void retainAll( Set<BoltServerAddress> addressesToRetain )
11799
{
118100
for ( Map.Entry<BoltServerAddress,BlockingPooledConnectionQueue> entry : pools.entrySet() )
119101
{
120102
BoltServerAddress address = entry.getKey();
121-
BlockingPooledConnectionQueue queue = entry.getValue();
103+
BlockingPooledConnectionQueue pool = entry.getValue();
122104

123-
if ( !queue.isActive() && queue.activeConnections() == 0 )
105+
if ( !addressesToRetain.contains( address ) && pool.activeConnections() == 0 )
124106
{
125-
// queue has been in deactivated state and has no open connections by now
126-
pools.remove( address );
107+
// address is not present in the updated routing table and has no active connections
108+
// it's now safe to terminate corresponding connection pool and forget about it
109+
110+
BlockingPooledConnectionQueue removedPool = pools.remove( address );
111+
if ( removedPool != null )
112+
{
113+
log.info( "Closing connection pool towards %s, it has no active connections " +
114+
"and is not in the routing table", address );
115+
removedPool.terminate();
116+
}
127117
}
128118
}
129119
}
130120

131121
@Override
132122
public boolean hasAddress( BoltServerAddress address )
133123
{
134-
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
135-
return connectionQueue != null && connectionQueue.isActive();
124+
return pools.containsKey( address );
136125
}
137126

138127
@Override
@@ -152,17 +141,7 @@ public void close()
152141
public int activeConnections( BoltServerAddress address )
153142
{
154143
BlockingPooledConnectionQueue connectionQueue = pools.get( address );
155-
if ( connectionQueue == null || !connectionQueue.isActive() )
156-
{
157-
return 0;
158-
}
159-
return connectionQueue.activeConnections();
160-
}
161-
162-
// test-only accessor
163-
Set<BoltServerAddress> addresses()
164-
{
165-
return Collections.unmodifiableSet( pools.keySet() );
144+
return connectionQueue == null ? 0 : connectionQueue.activeConnections();
166145
}
167146

168147
private BlockingPooledConnectionQueue pool( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/security/TLSSocketChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,8 @@ private HandshakeStatus wrap( ByteBuffer buffer ) throws IOException, ClientExce
372372
cipherOut.compact();
373373
}
374374
break;
375+
case CLOSED:
376+
throw new IOException( "TLS socket channel is closed" );
375377
default:
376378
throw new ClientException( "Got unexpected status " + status );
377379
}

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.neo4j.driver.internal.spi;
2020

21+
import java.util.Set;
22+
2123
import org.neo4j.driver.internal.net.BoltServerAddress;
2224

2325
public interface ConnectionPool extends AutoCloseable
@@ -36,11 +38,7 @@ public interface ConnectionPool extends AutoCloseable
3638
*/
3739
void purge( BoltServerAddress address );
3840

39-
void activate( BoltServerAddress address );
40-
41-
void deactivate( BoltServerAddress address );
42-
43-
void compact();
41+
void retainAll( Set<BoltServerAddress> addressesToRetain );
4442

4543
boolean hasAddress( BoltServerAddress address );
4644
}

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.ArrayList;
21+
import java.util.Collections;
2222
import java.util.LinkedHashSet;
2323
import java.util.List;
2424
import java.util.Set;
@@ -41,7 +41,7 @@ private ClusterCompositionUtil() {}
4141
public static final BoltServerAddress E = new BoltServerAddress( "5555:55" );
4242
public static final BoltServerAddress F = new BoltServerAddress( "6666:66" );
4343

44-
public static final List<BoltServerAddress> EMPTY = new ArrayList<>();
44+
public static final List<BoltServerAddress> EMPTY = Collections.emptyList();
4545

4646
public static final ClusterComposition VALID_CLUSTER_COMPOSITION =
4747
createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) );

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import org.junit.Test;
2222

23+
import java.util.HashSet;
2324
import java.util.List;
25+
import java.util.Set;
2426

2527
import org.neo4j.driver.internal.net.BoltServerAddress;
2628
import org.neo4j.driver.internal.util.FakeClock;
@@ -231,4 +233,25 @@ public void shouldNotRemoveServerIfPreWriterNowReader()
231233
assertEquals( 2, change.removed().size() );
232234
assertThat( change.removed(), containsInAnyOrder( A, C ) );
233235
}
236+
237+
@Test
238+
public void shouldReturnNoServersWhenEmpty()
239+
{
240+
ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() );
241+
242+
Set<BoltServerAddress> servers = routingTable.servers();
243+
244+
assertEquals( 0, servers.size() );
245+
}
246+
247+
@Test
248+
public void shouldReturnAllServers()
249+
{
250+
ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() );
251+
routingTable.update( createClusterComposition( asList( A, B, C ), asList( B, C, D ), asList( C, D, E, F ) ) );
252+
253+
Set<BoltServerAddress> servers = routingTable.servers();
254+
255+
assertEquals( new HashSet<>( asList( A, B, C, D, E, F ) ), servers );
256+
}
234257
}

0 commit comments

Comments
 (0)