From 60c1169b20249ecf21a88db32b2f1984c9527f33 Mon Sep 17 00:00:00 2001 From: Michael Simons Date: Wed, 10 Jun 2020 21:52:02 +0200 Subject: [PATCH] Improve channel tracking. This uses a reentrant read/write lock to avoid synchronizing and a mixture of concurrent hashmaps and atomic integers. --- .../async/pool/NettyChannelTracker.java | 97 +++++++++++++------ 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java index f8a8c76bc9..f58765e2c0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java @@ -25,24 +25,29 @@ import io.netty.channel.pool.ChannelPoolHandler; import io.netty.util.concurrent.EventExecutor; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; +import org.neo4j.driver.Logger; +import org.neo4j.driver.Logging; import org.neo4j.driver.internal.BoltServerAddress; import org.neo4j.driver.internal.messaging.BoltProtocol; import org.neo4j.driver.internal.metrics.ListenerEvent; import org.neo4j.driver.internal.metrics.MetricsListener; -import org.neo4j.driver.Logger; -import org.neo4j.driver.Logging; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress; public class NettyChannelTracker implements ChannelPoolHandler { - private final Map addressToInUseChannelCount = new ConcurrentHashMap<>(); - private final Map addressToIdleChannelCount = new ConcurrentHashMap<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock read = lock.readLock(); + private final Lock write = lock.writeLock(); + private final Map addressToInUseChannelCount = new HashMap<>(); + private final Map addressToIdleChannelCount = new HashMap<>(); private final Logger log; private final MetricsListener metricsListener; private final ChannelFutureListener closeListener = future -> channelClosed( future.channel() ); @@ -60,24 +65,57 @@ public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channe this.allChannels = channels; } + private void doInWriteLock( Runnable work ) + { + try + { + write.lock(); + work.run(); + } + finally + { + write.unlock(); + } + } + + private T retrieveInReadLock( Supplier work ) + { + try + { + read.lock(); + return work.get(); + } + finally + { + read.unlock(); + } + } + @Override public void channelReleased( Channel channel ) { - log.debug( "Channel [0x%s] released back to the pool", channel.id() ); - decrementInUse( channel ); - incrementIdle( channel ); + doInWriteLock( () -> + { + decrementInUse( channel ); + incrementIdle( channel ); + } ); + channel.closeFuture().addListener( closeListener ); + log.debug( "Channel [0x%s] released back to the pool", channel.id() ); } @Override public void channelAcquired( Channel channel ) { - log.debug( "Channel [0x%s] acquired from the pool. Local address: %s, remote address: %s", - channel.id(), channel.localAddress(), channel.remoteAddress() ); + doInWriteLock( () -> + { + incrementInUse( channel ); + decrementIdle( channel ); + } ); - incrementInUse( channel ); - decrementIdle( channel ); channel.closeFuture().removeListener( closeListener ); + log.debug( "Channel [0x%s] acquired from the pool. Local address: %s, remote address: %s", channel.id(), channel.localAddress(), + channel.remoteAddress() ); } @Override @@ -86,14 +124,14 @@ public void channelCreated( Channel channel ) throw new IllegalStateException( "Untraceable channel created." ); } - public synchronized void channelCreated( Channel channel, ListenerEvent creatingEvent ) + public void channelCreated( Channel channel, ListenerEvent creatingEvent ) { - log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s", - channel.id(), channel.localAddress(), channel.remoteAddress() ); + // when it is created, we count it as idle as it has not been acquired out of the pool + doInWriteLock( () -> incrementIdle( channel ) ); - incrementIdle( channel ); // when it is created, we count it as idle as it has not been acquired out of the pool metricsListener.afterCreated( poolId( channel ), creatingEvent ); allChannels.add( channel ); + log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s", channel.id(), channel.localAddress(), channel.remoteAddress() ); } public ListenerEvent channelCreating( String poolId ) @@ -110,20 +148,18 @@ public void channelFailedToCreate( String poolId ) public void channelClosed( Channel channel ) { - decrementIdle( channel ); + doInWriteLock( () -> decrementIdle( channel ) ); metricsListener.afterClosed( poolId( channel ) ); } public int inUseChannelCount( BoltServerAddress address ) { - AtomicInteger count = addressToInUseChannelCount.get( address ); - return count == null ? 0 : count.get(); + return retrieveInReadLock( () -> addressToInUseChannelCount.getOrDefault( address, 0 ) ); } public int idleChannelCount( BoltServerAddress address ) { - AtomicInteger count = addressToIdleChannelCount.get( address ); - return count == null ? 0 : count.get(); + return retrieveInReadLock( () -> addressToIdleChannelCount.getOrDefault( address, 0 ) ); } public void prepareToCloseChannels() @@ -139,7 +175,8 @@ public void prepareToCloseChannels() { // only logging it log.debug( "Failed to prepare to close Channel %s due to error %s. " + - "It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel, e.getMessage() ); + "It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel, + e.getMessage() ); } } } @@ -164,21 +201,21 @@ private void decrementIdle( Channel channel ) decrement( channel, addressToIdleChannelCount ); } - private void increment( Channel channel, Map countMap ) + private void increment( Channel channel, Map countMap ) { BoltServerAddress address = serverAddress( channel ); - AtomicInteger count = countMap.computeIfAbsent( address, k -> new AtomicInteger() ); - count.incrementAndGet(); + Integer count = countMap.computeIfAbsent( address, k -> 0 ); + countMap.put( address, count + 1 ); } - private void decrement( Channel channel, Map countMap ) + private void decrement( Channel channel, Map countMap ) { BoltServerAddress address = serverAddress( channel ); - AtomicInteger count = countMap.get( address ); - if ( count == null ) + if ( !countMap.containsKey( address ) ) { throw new IllegalStateException( "No count exist for address '" + address + "'" ); } - count.decrementAndGet(); + Integer count = countMap.get( address ); + countMap.put( address, count - 1 ); } }