Skip to content

Commit 6664215

Browse files
Improve channel tracking. (#741)
This uses a reentrant read/write lock to avoid synchronizing and a mixture of concurrent hashmaps and atomic integers. Co-authored-by: Michael Simons <[email protected]>
1 parent a822166 commit 6664215

File tree

1 file changed

+67
-30
lines changed

1 file changed

+67
-30
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,29 @@
2525
import io.netty.channel.pool.ChannelPoolHandler;
2626
import io.netty.util.concurrent.EventExecutor;
2727

28+
import java.util.HashMap;
2829
import java.util.Map;
29-
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantReadWriteLock;
32+
import java.util.function.Supplier;
3133

34+
import org.neo4j.driver.Logger;
35+
import org.neo4j.driver.Logging;
3236
import org.neo4j.driver.internal.BoltServerAddress;
3337
import org.neo4j.driver.internal.messaging.BoltProtocol;
3438
import org.neo4j.driver.internal.metrics.ListenerEvent;
3539
import org.neo4j.driver.internal.metrics.MetricsListener;
36-
import org.neo4j.driver.Logger;
37-
import org.neo4j.driver.Logging;
3840

3941
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.poolId;
4042
import static org.neo4j.driver.internal.async.connection.ChannelAttributes.serverAddress;
4143

4244
public class NettyChannelTracker implements ChannelPoolHandler
4345
{
44-
private final Map<BoltServerAddress,AtomicInteger> addressToInUseChannelCount = new ConcurrentHashMap<>();
45-
private final Map<BoltServerAddress,AtomicInteger> addressToIdleChannelCount = new ConcurrentHashMap<>();
46+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
47+
private final Lock read = lock.readLock();
48+
private final Lock write = lock.writeLock();
49+
private final Map<BoltServerAddress,Integer> addressToInUseChannelCount = new HashMap<>();
50+
private final Map<BoltServerAddress,Integer> addressToIdleChannelCount = new HashMap<>();
4651
private final Logger log;
4752
private final MetricsListener metricsListener;
4853
private final ChannelFutureListener closeListener = future -> channelClosed( future.channel() );
@@ -60,24 +65,57 @@ public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channe
6065
this.allChannels = channels;
6166
}
6267

68+
private void doInWriteLock( Runnable work )
69+
{
70+
try
71+
{
72+
write.lock();
73+
work.run();
74+
}
75+
finally
76+
{
77+
write.unlock();
78+
}
79+
}
80+
81+
private <T> T retrieveInReadLock( Supplier<T> work )
82+
{
83+
try
84+
{
85+
read.lock();
86+
return work.get();
87+
}
88+
finally
89+
{
90+
read.unlock();
91+
}
92+
}
93+
6394
@Override
6495
public void channelReleased( Channel channel )
6596
{
66-
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
67-
decrementInUse( channel );
68-
incrementIdle( channel );
97+
doInWriteLock( () ->
98+
{
99+
decrementInUse( channel );
100+
incrementIdle( channel );
101+
} );
102+
69103
channel.closeFuture().addListener( closeListener );
104+
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
70105
}
71106

72107
@Override
73108
public void channelAcquired( Channel channel )
74109
{
75-
log.debug( "Channel [0x%s] acquired from the pool. Local address: %s, remote address: %s",
76-
channel.id(), channel.localAddress(), channel.remoteAddress() );
110+
doInWriteLock( () ->
111+
{
112+
incrementInUse( channel );
113+
decrementIdle( channel );
114+
} );
77115

78-
incrementInUse( channel );
79-
decrementIdle( channel );
80116
channel.closeFuture().removeListener( closeListener );
117+
log.debug( "Channel [0x%s] acquired from the pool. Local address: %s, remote address: %s", channel.id(), channel.localAddress(),
118+
channel.remoteAddress() );
81119
}
82120

83121
@Override
@@ -86,14 +124,14 @@ public void channelCreated( Channel channel )
86124
throw new IllegalStateException( "Untraceable channel created." );
87125
}
88126

89-
public synchronized void channelCreated( Channel channel, ListenerEvent creatingEvent )
127+
public void channelCreated( Channel channel, ListenerEvent creatingEvent )
90128
{
91-
log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s",
92-
channel.id(), channel.localAddress(), channel.remoteAddress() );
129+
// when it is created, we count it as idle as it has not been acquired out of the pool
130+
doInWriteLock( () -> incrementIdle( channel ) );
93131

94-
incrementIdle( channel ); // when it is created, we count it as idle as it has not been acquired out of the pool
95132
metricsListener.afterCreated( poolId( channel ), creatingEvent );
96133
allChannels.add( channel );
134+
log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s", channel.id(), channel.localAddress(), channel.remoteAddress() );
97135
}
98136

99137
public ListenerEvent channelCreating( String poolId )
@@ -110,20 +148,18 @@ public void channelFailedToCreate( String poolId )
110148

111149
public void channelClosed( Channel channel )
112150
{
113-
decrementIdle( channel );
151+
doInWriteLock( () -> decrementIdle( channel ) );
114152
metricsListener.afterClosed( poolId( channel ) );
115153
}
116154

117155
public int inUseChannelCount( BoltServerAddress address )
118156
{
119-
AtomicInteger count = addressToInUseChannelCount.get( address );
120-
return count == null ? 0 : count.get();
157+
return retrieveInReadLock( () -> addressToInUseChannelCount.getOrDefault( address, 0 ) );
121158
}
122159

123160
public int idleChannelCount( BoltServerAddress address )
124161
{
125-
AtomicInteger count = addressToIdleChannelCount.get( address );
126-
return count == null ? 0 : count.get();
162+
return retrieveInReadLock( () -> addressToIdleChannelCount.getOrDefault( address, 0 ) );
127163
}
128164

129165
public void prepareToCloseChannels()
@@ -139,7 +175,8 @@ public void prepareToCloseChannels()
139175
{
140176
// only logging it
141177
log.debug( "Failed to prepare to close Channel %s due to error %s. " +
142-
"It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel, e.getMessage() );
178+
"It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel,
179+
e.getMessage() );
143180
}
144181
}
145182
}
@@ -164,21 +201,21 @@ private void decrementIdle( Channel channel )
164201
decrement( channel, addressToIdleChannelCount );
165202
}
166203

167-
private void increment( Channel channel, Map<BoltServerAddress,AtomicInteger> countMap )
204+
private void increment( Channel channel, Map<BoltServerAddress,Integer> countMap )
168205
{
169206
BoltServerAddress address = serverAddress( channel );
170-
AtomicInteger count = countMap.computeIfAbsent( address, k -> new AtomicInteger() );
171-
count.incrementAndGet();
207+
Integer count = countMap.computeIfAbsent( address, k -> 0 );
208+
countMap.put( address, count + 1 );
172209
}
173210

174-
private void decrement( Channel channel, Map<BoltServerAddress,AtomicInteger> countMap )
211+
private void decrement( Channel channel, Map<BoltServerAddress,Integer> countMap )
175212
{
176213
BoltServerAddress address = serverAddress( channel );
177-
AtomicInteger count = countMap.get( address );
178-
if ( count == null )
214+
if ( !countMap.containsKey( address ) )
179215
{
180216
throw new IllegalStateException( "No count exist for address '" + address + "'" );
181217
}
182-
count.decrementAndGet();
218+
Integer count = countMap.get( address );
219+
countMap.put( address, count - 1 );
183220
}
184221
}

0 commit comments

Comments
 (0)