20
20
21
21
import io .netty .channel .Channel ;
22
22
import io .netty .channel .pool .ChannelPoolHandler ;
23
- import io .netty .util .internal .ConcurrentSet ;
24
23
24
+ import java .util .Map ;
25
25
import java .util .concurrent .ConcurrentHashMap ;
26
- import java .util .concurrent .ConcurrentMap ;
26
+ import java .util .concurrent .atomic . AtomicInteger ;
27
27
28
28
import org .neo4j .driver .internal .BoltServerAddress ;
29
29
import org .neo4j .driver .v1 .Logger ;
33
33
34
34
public class ActiveChannelTracker implements ChannelPoolHandler
35
35
{
36
- private final ConcurrentMap <BoltServerAddress ,ConcurrentSet < Channel >> addressToActiveChannelCount ;
36
+ private final Map <BoltServerAddress ,AtomicInteger > addressToActiveChannelCount = new ConcurrentHashMap <>() ;
37
37
private final Logger log ;
38
38
39
39
public ActiveChannelTracker ( Logging logging )
40
40
{
41
- this .addressToActiveChannelCount = new ConcurrentHashMap <>();
42
41
this .log = logging .getLog ( getClass ().getSimpleName () );
43
42
}
44
43
@@ -65,52 +64,25 @@ public void channelCreated( Channel channel )
65
64
66
65
public int activeChannelCount ( BoltServerAddress address )
67
66
{
68
- ConcurrentSet <Channel > activeChannels = addressToActiveChannelCount .get ( address );
69
- return activeChannels == null ? 0 : activeChannels .size ();
70
- }
71
-
72
- public void purge ( BoltServerAddress address )
73
- {
74
- ConcurrentSet <Channel > activeChannels = addressToActiveChannelCount .remove ( address );
75
- if ( activeChannels != null )
76
- {
77
- for ( Channel channel : activeChannels )
78
- {
79
- channel .close ();
80
- }
81
- }
67
+ AtomicInteger count = addressToActiveChannelCount .get ( address );
68
+ return count == null ? 0 : count .get ();
82
69
}
83
70
84
71
private void channelActive ( Channel channel )
85
72
{
86
73
BoltServerAddress address = serverAddress ( channel );
87
- ConcurrentSet <Channel > activeChannels = addressToActiveChannelCount .get ( address );
88
- if ( activeChannels == null )
89
- {
90
- ConcurrentSet <Channel > newActiveChannels = new ConcurrentSet <>();
91
- ConcurrentSet <Channel > existingActiveChannels = addressToActiveChannelCount .putIfAbsent ( address ,
92
- newActiveChannels );
93
- if ( existingActiveChannels == null )
94
- {
95
- activeChannels = newActiveChannels ;
96
- }
97
- else
98
- {
99
- activeChannels = existingActiveChannels ;
100
- }
101
- }
102
-
103
- activeChannels .add ( channel );
74
+ AtomicInteger count = addressToActiveChannelCount .computeIfAbsent ( address , k -> new AtomicInteger () );
75
+ count .incrementAndGet ();
104
76
}
105
77
106
78
private void channelInactive ( Channel channel )
107
79
{
108
80
BoltServerAddress address = serverAddress ( channel );
109
- ConcurrentSet < Channel > activeChannels = addressToActiveChannelCount .get ( address );
110
- if ( activeChannels == null )
81
+ AtomicInteger count = addressToActiveChannelCount .get ( address );
82
+ if ( count == null )
111
83
{
112
- throw new IllegalStateException ( "No channels exist for address '" + address + "'" );
84
+ throw new IllegalStateException ( "No count exist for address '" + address + "'" );
113
85
}
114
- activeChannels . remove ( channel );
86
+ count . decrementAndGet ( );
115
87
}
116
88
}
0 commit comments