Skip to content

Commit a9fa430

Browse files
gjmwoodsmichael-simons
authored andcommitted
Improve channel tracking. (neo4j#741)
This uses a reentrant read/write lock to avoid synchronizing and a mixture of concurrent hashmaps and atomic integers. Co-authored-by: Michael Simons <[email protected]>
1 parent a86eca9 commit a9fa430

File tree

1 file changed

+63
-28
lines changed

1 file changed

+63
-28
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelTracker.java

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import io.netty.channel.pool.ChannelPoolHandler;
2626
import io.netty.util.concurrent.EventExecutor;
2727

28+
import java.util.HashMap;
2829
import java.util.Map;
29-
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.locks.Lock;
31+
import java.util.concurrent.locks.ReentrantReadWriteLock;
32+
import java.util.function.Supplier;
3133

3234
import org.neo4j.driver.internal.BoltServerAddress;
3335
import org.neo4j.driver.internal.messaging.BoltProtocol;
@@ -40,8 +42,11 @@
4042

4143
public class NettyChannelTracker implements ChannelPoolHandler
4244
{
43-
private final Map<BoltServerAddress,AtomicInteger> addressToInUseChannelCount = new ConcurrentHashMap<>();
44-
private final Map<BoltServerAddress,AtomicInteger> addressToIdleChannelCount = new ConcurrentHashMap<>();
45+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
46+
private final Lock read = lock.readLock();
47+
private final Lock write = lock.writeLock();
48+
private final Map<BoltServerAddress,Integer> addressToInUseChannelCount = new HashMap<>();
49+
private final Map<BoltServerAddress,Integer> addressToIdleChannelCount = new HashMap<>();
4550
private final Logger log;
4651
private final MetricsListener metricsListener;
4752
private final ChannelFutureListener closeListener = future -> channelClosed( future.channel() );
@@ -62,21 +67,28 @@ public NettyChannelTracker( MetricsListener metricsListener, ChannelGroup channe
6267
@Override
6368
public void channelReleased( Channel channel )
6469
{
65-
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
66-
decrementInUse( channel );
67-
incrementIdle( channel );
70+
doInWriteLock( () ->
71+
{
72+
decrementInUse( channel );
73+
incrementIdle( channel );
74+
} );
75+
6876
channel.closeFuture().addListener( closeListener );
77+
log.debug( "Channel [0x%s] released back to the pool", channel.id() );
6978
}
7079

7180
@Override
7281
public void channelAcquired( Channel channel )
7382
{
74-
log.debug( "Channel [%s] acquired from the pool. Local address: %s, remote address: %s",
75-
channel.id(), channel.localAddress(), channel.remoteAddress() );
83+
doInWriteLock( () ->
84+
{
85+
incrementInUse( channel );
86+
decrementIdle( channel );
87+
} );
7688

77-
incrementInUse( channel );
78-
decrementIdle( channel );
7989
channel.closeFuture().removeListener( closeListener );
90+
log.debug( "Channel [0x%s] acquired from the pool. Local address: %s, remote address: %s", channel.id(), channel.localAddress(),
91+
channel.remoteAddress() );
8092
}
8193

8294
@Override
@@ -87,13 +99,12 @@ public void channelCreated( Channel channel )
8799

88100
public void channelCreated( Channel channel, ListenerEvent creatingEvent )
89101
{
90-
log.debug( "Channel [%s] created. Local address: %s, remote address: %s",
91-
channel.id(), channel.localAddress(), channel.remoteAddress() );
102+
doInWriteLock( () -> incrementInUse( channel ) );
92103

93-
incrementInUse( channel );
94104
metricsListener.afterCreated( serverAddress( channel ), creatingEvent );
95105

96106
allChannels.add( channel );
107+
log.debug( "Channel [0x%s] created. Local address: %s, remote address: %s", channel.id(), channel.localAddress(), channel.remoteAddress() );
97108
}
98109

99110
public ListenerEvent channelCreating( BoltServerAddress address )
@@ -110,20 +121,18 @@ public void channelFailedToCreate( BoltServerAddress address )
110121

111122
public void channelClosed( Channel channel )
112123
{
113-
decrementIdle( channel );
124+
doInWriteLock( () -> decrementIdle( channel ) );
114125
metricsListener.afterClosed( serverAddress( channel ) );
115126
}
116127

117128
public int inUseChannelCount( BoltServerAddress address )
118129
{
119-
AtomicInteger count = addressToInUseChannelCount.get( address );
120-
return count == null ? 0 : count.get();
130+
return retrieveInReadLock( () -> addressToInUseChannelCount.getOrDefault( address, 0 ) );
121131
}
122132

123133
public int idleChannelCount( BoltServerAddress address )
124134
{
125-
AtomicInteger count = addressToIdleChannelCount.get( address );
126-
return count == null ? 0 : count.get();
135+
return retrieveInReadLock( () -> addressToIdleChannelCount.getOrDefault( address, 0 ) );
127136
}
128137

129138
public void prepareToCloseChannels()
@@ -139,7 +148,8 @@ public void prepareToCloseChannels()
139148
{
140149
// only logging it
141150
log.debug( "Failed to prepare to close Channel %s due to error %s. " +
142-
"It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel, e.getMessage() );
151+
"It is safe to ignore this error as the channel will be closed despite if it is successfully prepared to close or not.", channel,
152+
e.getMessage() );
143153
}
144154
}
145155
}
@@ -164,22 +174,47 @@ private void decrementIdle( Channel channel )
164174
decrement( channel, addressToIdleChannelCount );
165175
}
166176

167-
private void increment( Channel channel, Map<BoltServerAddress,AtomicInteger> countMap )
177+
private void increment( Channel channel, Map<BoltServerAddress,Integer> countMap )
168178
{
169179
BoltServerAddress address = serverAddress( channel );
170-
AtomicInteger count = countMap.computeIfAbsent( address, k -> new AtomicInteger() );
171-
count.incrementAndGet();
180+
Integer count = countMap.computeIfAbsent( address, k -> 0 );
181+
countMap.put( address, count + 1 );
172182
}
173183

174-
private void decrement( Channel channel, Map<BoltServerAddress,AtomicInteger> countMap )
184+
private void decrement( Channel channel, Map<BoltServerAddress,Integer> countMap )
175185
{
176186
BoltServerAddress address = serverAddress( channel );
177-
AtomicInteger count = countMap.get( address );
178-
if ( count == null )
187+
if ( !countMap.containsKey( address ) )
179188
{
180-
System.out.println( countMap );
181189
throw new IllegalStateException( "No count exist for address '" + address + "'" );
182190
}
183-
count.decrementAndGet();
191+
Integer count = countMap.get( address );
192+
countMap.put( address, count - 1 );
193+
}
194+
195+
private void doInWriteLock( Runnable work )
196+
{
197+
try
198+
{
199+
write.lock();
200+
work.run();
201+
}
202+
finally
203+
{
204+
write.unlock();
205+
}
206+
}
207+
208+
private <T> T retrieveInReadLock( Supplier<T> work )
209+
{
210+
try
211+
{
212+
read.lock();
213+
return work.get();
214+
}
215+
finally
216+
{
217+
read.unlock();
218+
}
184219
}
185220
}

0 commit comments

Comments
 (0)