Skip to content

Improve channel tracking. #741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,29 @@
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.util.concurrent.EventExecutor;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.messaging.BoltProtocol;
import org.neo4j.driver.internal.metrics.ListenerEvent;
import org.neo4j.driver.internal.metrics.MetricsListener;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;

import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress;

public class NettyChannelTracker implements ChannelPoolHandler
{
private final Map<BoltServerAddress,AtomicInteger> addressToInUseChannelCount = new ConcurrentHashMap<>();
private final Map<BoltServerAddress,AtomicInteger> addressToIdleChannelCount = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock read = lock.readLock();
private final Lock write = lock.writeLock();
private final Map<BoltServerAddress,Integer> addressToInUseChannelCount = new HashMap<>();
private final Map<BoltServerAddress,Integer> addressToIdleChannelCount = new HashMap<>();
private final Logger log;
private final MetricsListener metricsListener;
private final ChannelFutureListener closeListener = future -> channelClosed( future.channel() );
Expand All @@ -60,24 +65,57 @@ public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channe
this.allChannels = channels;
}

private void doInWriteLock( Runnable work )
{
try
{
write.lock();
work.run();
}
finally
{
write.unlock();
}
}

private <T> T retrieveInReadLock( Supplier<T> work )
{
try
{
read.lock();
return work.get();
}
finally
{
read.unlock();
}
}

@Override
public void channelReleased( Channel channel )
{
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
decrementInUse( channel );
incrementIdle( channel );
doInWriteLock( () ->
{
decrementInUse( channel );
incrementIdle( channel );
} );

channel.closeFuture().addListener( closeListener );
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
}

@Override
public void channelAcquired( Channel channel )
{
log.debug( "Channel [0x%s] acquired from the pool. Local address: %s, remote address: %s",
channel.id(), channel.localAddress(), channel.remoteAddress() );
doInWriteLock( () ->
{
incrementInUse( channel );
decrementIdle( channel );
} );

incrementInUse( channel );
decrementIdle( channel );
channel.closeFuture().removeListener( closeListener );
log.debug( "Channel [0x%s] acquired from the pool. Local address: %s, remote address: %s", channel.id(), channel.localAddress(),
channel.remoteAddress() );
}

@Override
Expand All @@ -86,14 +124,14 @@ public void channelCreated( Channel channel )
throw new IllegalStateException( "Untraceable channel created." );
}

public synchronized void channelCreated( Channel channel, ListenerEvent creatingEvent )
public void channelCreated( Channel channel, ListenerEvent creatingEvent )
{
log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s",
channel.id(), channel.localAddress(), channel.remoteAddress() );
// when it is created, we count it as idle as it has not been acquired out of the pool
doInWriteLock( () -> incrementIdle( channel ) );

incrementIdle( channel ); // when it is created, we count it as idle as it has not been acquired out of the pool
metricsListener.afterCreated( poolId( channel ), creatingEvent );
allChannels.add( channel );
log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s", channel.id(), channel.localAddress(), channel.remoteAddress() );
}

public ListenerEvent channelCreating( String poolId )
Expand All @@ -110,20 +148,18 @@ public void channelFailedToCreate( String poolId )

public void channelClosed( Channel channel )
{
decrementIdle( channel );
doInWriteLock( () -> decrementIdle( channel ) );
metricsListener.afterClosed( poolId( channel ) );
}

public int inUseChannelCount( BoltServerAddress address )
{
AtomicInteger count = addressToInUseChannelCount.get( address );
return count == null ? 0 : count.get();
return retrieveInReadLock( () -> addressToInUseChannelCount.getOrDefault( address, 0 ) );
}

public int idleChannelCount( BoltServerAddress address )
{
AtomicInteger count = addressToIdleChannelCount.get( address );
return count == null ? 0 : count.get();
return retrieveInReadLock( () -> addressToIdleChannelCount.getOrDefault( address, 0 ) );
}

public void prepareToCloseChannels()
Expand All @@ -139,7 +175,8 @@ public void prepareToCloseChannels()
{
// only logging it
log.debug( "Failed to prepare to close Channel %s due to error %s. " +
"It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel, e.getMessage() );
"It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel,
e.getMessage() );
}
}
}
Expand All @@ -164,21 +201,21 @@ private void decrementIdle( Channel channel )
decrement( channel, addressToIdleChannelCount );
}

private void increment( Channel channel, Map<BoltServerAddress,AtomicInteger> countMap )
private void increment( Channel channel, Map<BoltServerAddress,Integer> countMap )
{
BoltServerAddress address = serverAddress( channel );
AtomicInteger count = countMap.computeIfAbsent( address, k -> new AtomicInteger() );
count.incrementAndGet();
Integer count = countMap.computeIfAbsent( address, k -> 0 );
countMap.put( address, count + 1 );
}

private void decrement( Channel channel, Map<BoltServerAddress,AtomicInteger> countMap )
private void decrement( Channel channel, Map<BoltServerAddress,Integer> countMap )
{
BoltServerAddress address = serverAddress( channel );
AtomicInteger count = countMap.get( address );
if ( count == null )
if ( !countMap.containsKey( address ) )
{
throw new IllegalStateException( "No count exist for address '" + address + "'" );
}
count.decrementAndGet();
Integer count = countMap.get( address );
countMap.put( address, count - 1 );
}
}