Skip to content

Commit 7fa8c17

Browse files
authored
Backport connection pool improvements, routing table handling, InternalSummaryCounters update and Testkit launch fix (neo4j#1052)
* Fix sync and async testkit backends launch (neo4j#1027) * Add toString to InternalSummaryCounters (neo4j#1031) * Make AddressSet retain resolved addresses (neo4j#1034) Routing table address set update may override resolved router address. This leads to routing connection pool closures. This update aims to optimise this. * Improve connection pool concurrent access (neo4j#1035) This update introduces read/write lock for internal pool map access management. For instance, the `retainAll` method is executed with write lock. * Replace AddressSet with standard Java collections (neo4j#1037) * Replace AddressSet with standard Java collections This update also prevents unnecessary connection pool closures by keeping `disused` addresses until next routing table update and reusing them. * Update reader, writer and router addresses handling and change type
1 parent 3741b80 commit 7fa8c17

File tree

19 files changed

+374
-513
lines changed

19 files changed

+374
-513
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 77 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,18 @@
2222
import io.netty.channel.Channel;
2323
import io.netty.channel.EventLoopGroup;
2424

25+
import java.util.HashMap;
26+
import java.util.Iterator;
27+
import java.util.Map;
2528
import java.util.Set;
2629
import java.util.concurrent.CompletableFuture;
2730
import java.util.concurrent.CompletionException;
2831
import java.util.concurrent.CompletionStage;
29-
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.ConcurrentMap;
3132
import java.util.concurrent.TimeUnit;
3233
import java.util.concurrent.TimeoutException;
3334
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.concurrent.locks.ReadWriteLock;
36+
import java.util.concurrent.locks.ReentrantReadWriteLock;
3437

3538
import org.neo4j.driver.Logger;
3639
import org.neo4j.driver.Logging;
@@ -49,6 +52,8 @@
4952
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.setAuthorizationStateListener;
5053
import static org.neo4j.driver.internal.util.Futures.combineErrors;
5154
import static org.neo4j.driver.internal.util.Futures.completeWithNullIfNoError;
55+
import static org.neo4j.driver.internal.util.LockUtil.executeWithLock;
56+
import static org.neo4j.driver.internal.util.LockUtil.executeWithLockAsync;
5257

5358
public class ConnectionPoolImpl implements ConnectionPool
5459
{
@@ -63,7 +68,8 @@ public class ConnectionPoolImpl implements ConnectionPool
6368
private final MetricsListener metricsListener;
6469
private final boolean ownsEventLoopGroup;
6570

66-
private final ConcurrentMap<BoltServerAddress,ExtendedChannelPool> pools = new ConcurrentHashMap<>();
71+
private final ReadWriteLock addressToPoolLock = new ReentrantReadWriteLock();
72+
private final Map<BoltServerAddress,ExtendedChannelPool> addressToPool = new HashMap<>();
6773
private final AtomicBoolean closed = new AtomicBoolean();
6874
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
6975
private final ConnectionFactory connectionFactory;
@@ -126,25 +132,32 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
126132
@Override
127133
public void retainAll( Set<BoltServerAddress> addressesToRetain )
128134
{
129-
for ( BoltServerAddress address : pools.keySet() )
135+
executeWithLock( addressToPoolLock.writeLock(), () ->
130136
{
131-
if ( !addressesToRetain.contains( address ) )
137+
Iterator<Map.Entry<BoltServerAddress,ExtendedChannelPool>> entryIterator = addressToPool.entrySet().iterator();
138+
while ( entryIterator.hasNext() )
132139
{
133-
int activeChannels = nettyChannelTracker.inUseChannelCount( address );
134-
if ( activeChannels == 0 )
140+
Map.Entry<BoltServerAddress,ExtendedChannelPool> entry = entryIterator.next();
141+
BoltServerAddress address = entry.getKey();
142+
if ( !addressesToRetain.contains( address ) )
135143
{
136-
// address is not present in updated routing table and has no active connections
137-
// it's now safe to terminate corresponding connection pool and forget about it
138-
ExtendedChannelPool pool = pools.remove( address );
139-
if ( pool != null )
144+
int activeChannels = nettyChannelTracker.inUseChannelCount( address );
145+
if ( activeChannels == 0 )
140146
{
141-
log.info( "Closing connection pool towards %s, it has no active connections " +
142-
"and is not in the routing table registry.", address );
143-
closePoolInBackground( address, pool );
147+
// address is not present in updated routing table and has no active connections
148+
// it's now safe to terminate corresponding connection pool and forget about it
149+
ExtendedChannelPool pool = entry.getValue();
150+
entryIterator.remove();
151+
if ( pool != null )
152+
{
153+
log.info( "Closing connection pool towards %s, it has no active connections " +
154+
"and is not in the routing table registry.", address );
155+
closePoolInBackground( address, pool );
156+
}
144157
}
145158
}
146159
}
147-
}
160+
} );
148161
}
149162

150163
@Override
@@ -165,35 +178,40 @@ public CompletionStage<Void> close()
165178
if ( closed.compareAndSet( false, true ) )
166179
{
167180
nettyChannelTracker.prepareToCloseChannels();
168-
CompletableFuture<Void> allPoolClosedFuture = closeAllPools();
169181

170-
// We can only shutdown event loop group when all netty pools are fully closed,
171-
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
172-
allPoolClosedFuture.whenComplete( ( ignored, pollCloseError ) -> {
173-
pools.clear();
174-
if ( !ownsEventLoopGroup )
175-
{
176-
completeWithNullIfNoError( closeFuture, pollCloseError );
177-
}
178-
else
179-
{
180-
shutdownEventLoopGroup( pollCloseError );
181-
}
182-
} );
182+
executeWithLockAsync( addressToPoolLock.writeLock(),
183+
() ->
184+
{
185+
// We can only shutdown event loop group when all netty pools are fully closed,
186+
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
187+
return closeAllPools().whenComplete(
188+
( ignored, pollCloseError ) ->
189+
{
190+
addressToPool.clear();
191+
if ( !ownsEventLoopGroup )
192+
{
193+
completeWithNullIfNoError( closeFuture, pollCloseError );
194+
}
195+
else
196+
{
197+
shutdownEventLoopGroup( pollCloseError );
198+
}
199+
} );
200+
} );
183201
}
184202
return closeFuture;
185203
}
186204

187205
@Override
188206
public boolean isOpen( BoltServerAddress address )
189207
{
190-
return pools.containsKey( address );
208+
return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.containsKey( address ) );
191209
}
192210

193211
@Override
194212
public String toString()
195213
{
196-
return "ConnectionPoolImpl{" + "pools=" + pools + '}';
214+
return executeWithLock( addressToPoolLock.readLock(), () -> "ConnectionPoolImpl{" + "pools=" + addressToPool + '}' );
197215
}
198216

199217
private void processAcquisitionError( ExtendedChannelPool pool, BoltServerAddress serverAddress, Throwable error )
@@ -239,15 +257,15 @@ private void assertNotClosed( BoltServerAddress address, Channel channel, Extend
239257
{
240258
pool.release( channel );
241259
closePoolInBackground( address, pool );
242-
pools.remove( address );
260+
executeWithLock( addressToPoolLock.writeLock(), () -> addressToPool.remove( address ) );
243261
assertNotClosed();
244262
}
245263
}
246264

247265
// for testing only
248266
ExtendedChannelPool getPool( BoltServerAddress address )
249267
{
250-
return pools.get( address );
268+
return executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
251269
}
252270

253271
ExtendedChannelPool newPool( BoltServerAddress address )
@@ -258,12 +276,22 @@ ExtendedChannelPool newPool( BoltServerAddress address )
258276

259277
private ExtendedChannelPool getOrCreatePool( BoltServerAddress address )
260278
{
261-
return pools.computeIfAbsent( address, ignored -> {
262-
ExtendedChannelPool pool = newPool( address );
263-
// before the connection pool is added I can add the metrics for the pool.
264-
metricsListener.putPoolMetrics( pool.id(), address, this );
265-
return pool;
266-
} );
279+
ExtendedChannelPool existingPool = executeWithLock( addressToPoolLock.readLock(), () -> addressToPool.get( address ) );
280+
return existingPool != null
281+
? existingPool
282+
: executeWithLock( addressToPoolLock.writeLock(),
283+
() ->
284+
{
285+
ExtendedChannelPool pool = addressToPool.get( address );
286+
if ( pool == null )
287+
{
288+
pool = newPool( address );
289+
// before the connection pool is added I can add the metrics for the pool.
290+
metricsListener.putPoolMetrics( pool.id(), address, this );
291+
addressToPool.put( address, pool );
292+
}
293+
return pool;
294+
} );
267295
}
268296

269297
private CompletionStage<Void> closePool( ExtendedChannelPool pool )
@@ -305,12 +333,15 @@ private void shutdownEventLoopGroup( Throwable pollCloseError )
305333
private CompletableFuture<Void> closeAllPools()
306334
{
307335
return CompletableFuture.allOf(
308-
pools.entrySet().stream().map( entry -> {
309-
BoltServerAddress address = entry.getKey();
310-
ExtendedChannelPool pool = entry.getValue();
311-
log.info( "Closing connection pool towards %s", address );
312-
// Wait for all pools to be closed.
313-
return closePool( pool ).toCompletableFuture();
314-
} ).toArray( CompletableFuture[]::new ) );
336+
addressToPool.entrySet().stream()
337+
.map( entry ->
338+
{
339+
BoltServerAddress address = entry.getKey();
340+
ExtendedChannelPool pool = entry.getValue();
341+
log.info( "Closing connection pool towards %s", address );
342+
// Wait for all pools to be closed.
343+
return closePool( pool ).toCompletableFuture();
344+
} )
345+
.toArray( CompletableFuture[]::new ) );
315346
}
316347
}

driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java

Lines changed: 0 additions & 103 deletions
This file was deleted.

0 commit comments

Comments
 (0)