21
21
import io .netty .bootstrap .Bootstrap ;
22
22
import io .netty .channel .Channel ;
23
23
import io .netty .channel .EventLoopGroup ;
24
- import io .netty .channel .pool .ChannelPool ;
25
- import io .netty .util .concurrent .Future ;
26
24
27
- import java .util .Map ;
28
25
import java .util .Set ;
26
+ import java .util .concurrent .CompletableFuture ;
29
27
import java .util .concurrent .CompletionException ;
30
28
import java .util .concurrent .CompletionStage ;
31
29
import java .util .concurrent .ConcurrentHashMap ;
48
46
import org .neo4j .driver .internal .util .Futures ;
49
47
50
48
import static java .lang .String .format ;
49
+ import static org .neo4j .driver .internal .util .Futures .combineErrors ;
50
+ import static org .neo4j .driver .internal .util .Futures .completeWithNullIfNoError ;
51
51
52
52
public class ConnectionPoolImpl implements ConnectionPool
53
53
{
@@ -62,6 +62,7 @@ public class ConnectionPoolImpl implements ConnectionPool
62
62
63
63
private final ConcurrentMap <BoltServerAddress ,ExtendedChannelPool > pools = new ConcurrentHashMap <>();
64
64
private final AtomicBoolean closed = new AtomicBoolean ();
65
+ private final CompletableFuture <Void > closeFuture = new CompletableFuture <>();
65
66
private final ConnectionFactory connectionFactory ;
66
67
67
68
public ConnectionPoolImpl ( ChannelConnector connector , Bootstrap bootstrap , PoolSettings settings , MetricsListener metricsListener , Logging logging ,
@@ -95,9 +96,9 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
95
96
96
97
ListenerEvent acquireEvent = metricsListener .createListenerEvent ();
97
98
metricsListener .beforeAcquiringOrCreating ( pool .id (), acquireEvent );
98
- Future <Channel > connectionFuture = pool .acquire ();
99
+ CompletionStage <Channel > channelFuture = pool .acquire ();
99
100
100
- return Futures . asCompletionStage ( connectionFuture ) .handle ( ( channel , error ) ->
101
+ return channelFuture .handle ( ( channel , error ) ->
101
102
{
102
103
try
103
104
{
@@ -131,8 +132,8 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
131
132
if ( pool != null )
132
133
{
133
134
log .info ( "Closing connection pool towards %s, it has no active connections " +
134
- "and is not in the routing table" , address );
135
- closePool ( pool );
135
+ "and is not in the routing table registry. " , address );
136
+ closePoolInBackground ( address , pool );
136
137
}
137
138
}
138
139
}
@@ -156,37 +157,24 @@ public CompletionStage<Void> close()
156
157
{
157
158
if ( closed .compareAndSet ( false , true ) )
158
159
{
159
- try
160
- {
161
- nettyChannelTracker .prepareToCloseChannels ();
162
- for ( Map .Entry <BoltServerAddress ,ExtendedChannelPool > entry : pools .entrySet () )
163
- {
164
- BoltServerAddress address = entry .getKey ();
165
- ExtendedChannelPool pool = entry .getValue ();
166
- log .info ( "Closing connection pool towards %s" , address );
167
- closePool ( pool );
168
- }
160
+ nettyChannelTracker .prepareToCloseChannels ();
161
+ CompletableFuture <Void > allPoolClosedFuture = closeAllPools ();
169
162
163
+ // We can only shutdown event loop group when all netty pools are fully closed,
164
+ // otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
165
+ allPoolClosedFuture .whenComplete ( ( ignored , pollCloseError ) -> {
170
166
pools .clear ();
171
- }
172
- finally
173
- {
174
-
175
- if (ownsEventLoopGroup ) {
176
- // This is an attempt to speed up the shut down procedure of the driver
177
- // Feel free return this back to shutdownGracefully() method with default values
178
- // if this proves troublesome!!!
179
- eventLoopGroup ().shutdownGracefully (200 , 15_000 , TimeUnit .MILLISECONDS );
167
+ if ( !ownsEventLoopGroup )
168
+ {
169
+ completeWithNullIfNoError ( closeFuture , pollCloseError );
180
170
}
181
- }
182
- }
183
- if (! ownsEventLoopGroup )
184
- {
185
- return Futures . completedWithNull ( );
171
+ else
172
+ {
173
+ shutdownEventLoopGroup ( pollCloseError );
174
+ }
175
+ } );
186
176
}
187
-
188
- return Futures .asCompletionStage ( eventLoopGroup ().terminationFuture () )
189
- .thenApply ( ignore -> null );
177
+ return closeFuture ;
190
178
}
191
179
192
180
@ Override
@@ -195,31 +183,10 @@ public boolean isOpen( BoltServerAddress address )
195
183
return pools .containsKey ( address );
196
184
}
197
185
198
- private ExtendedChannelPool getOrCreatePool ( BoltServerAddress address )
199
- {
200
- return pools .computeIfAbsent ( address , this ::newPool );
201
- }
202
-
203
- private void closePool ( ExtendedChannelPool pool )
204
- {
205
- pool .close ();
206
- // after the connection pool is removed/close, I can remove its metrics.
207
- metricsListener .removePoolMetrics ( pool .id () );
208
- }
209
-
210
- ExtendedChannelPool newPool ( BoltServerAddress address )
211
- {
212
- NettyChannelPool pool =
213
- new NettyChannelPool ( address , connector , bootstrap , nettyChannelTracker , channelHealthChecker , settings .connectionAcquisitionTimeout (),
214
- settings .maxConnectionPoolSize () );
215
- // before the connection pool is added I can add the metrics for the pool.
216
- metricsListener .putPoolMetrics ( pool .id (), address , this );
217
- return pool ;
218
- }
219
-
220
- private EventLoopGroup eventLoopGroup ()
186
+ @ Override
187
+ public String toString ()
221
188
{
222
- return bootstrap . config (). group () ;
189
+ return "ConnectionPoolImpl{" + "pools=" + pools + '}' ;
223
190
}
224
191
225
192
private void processAcquisitionError ( ExtendedChannelPool pool , BoltServerAddress serverAddress , Throwable error )
@@ -259,26 +226,84 @@ private void assertNotClosed()
259
226
}
260
227
}
261
228
262
- private void assertNotClosed ( BoltServerAddress address , Channel channel , ChannelPool pool )
229
+ private void assertNotClosed ( BoltServerAddress address , Channel channel , ExtendedChannelPool pool )
263
230
{
264
231
if ( closed .get () )
265
232
{
266
233
pool .release ( channel );
267
- pool . close ( );
234
+ closePoolInBackground ( address , pool );
268
235
pools .remove ( address );
269
236
assertNotClosed ();
270
237
}
271
238
}
272
239
273
- @ Override
274
- public String toString ()
275
- {
276
- return "ConnectionPoolImpl{" + "pools=" + pools + '}' ;
277
- }
278
-
279
240
// for testing only
280
241
ExtendedChannelPool getPool ( BoltServerAddress address )
281
242
{
282
243
return pools .get ( address );
283
244
}
245
+
246
+ ExtendedChannelPool newPool ( BoltServerAddress address )
247
+ {
248
+ return new NettyChannelPool ( address , connector , bootstrap , nettyChannelTracker , channelHealthChecker , settings .connectionAcquisitionTimeout (),
249
+ settings .maxConnectionPoolSize () );
250
+ }
251
+
252
+ private ExtendedChannelPool getOrCreatePool ( BoltServerAddress address )
253
+ {
254
+ return pools .computeIfAbsent ( address , ignored -> {
255
+ ExtendedChannelPool pool = newPool ( address );
256
+ // before the connection pool is added I can add the metrics for the pool.
257
+ metricsListener .putPoolMetrics ( pool .id (), address , this );
258
+ return pool ;
259
+ } );
260
+ }
261
+
262
+ private CompletionStage <Void > closePool ( ExtendedChannelPool pool )
263
+ {
264
+ return pool .close ().whenComplete ( ( ignored , error ) ->
265
+ // after the connection pool is removed/close, I can remove its metrics.
266
+ metricsListener .removePoolMetrics ( pool .id () ) );
267
+ }
268
+
269
+ private void closePoolInBackground ( BoltServerAddress address , ExtendedChannelPool pool )
270
+ {
271
+ // Close in the background
272
+ closePool ( pool ).whenComplete ( ( ignored , error ) -> {
273
+ if ( error != null )
274
+ {
275
+ log .warn ( format ( "An error occurred while closing connection pool towards %s." , address ), error );
276
+ }
277
+ } );
278
+ }
279
+
280
+ private EventLoopGroup eventLoopGroup ()
281
+ {
282
+ return bootstrap .config ().group ();
283
+ }
284
+
285
+ private void shutdownEventLoopGroup ( Throwable pollCloseError )
286
+ {
287
+ // This is an attempt to speed up the shut down procedure of the driver
288
+ // This timeout is needed for `closePoolInBackground` to finish background job, especially for races between `acquire` and `close`.
289
+ eventLoopGroup ().shutdownGracefully ( 200 , 15_000 , TimeUnit .MILLISECONDS );
290
+
291
+ Futures .asCompletionStage ( eventLoopGroup ().terminationFuture () )
292
+ .whenComplete ( ( ignore , eventLoopGroupTerminationError ) -> {
293
+ CompletionException combinedErrors = combineErrors ( pollCloseError , eventLoopGroupTerminationError );
294
+ completeWithNullIfNoError ( closeFuture , combinedErrors );
295
+ } );
296
+ }
297
+
298
+ private CompletableFuture <Void > closeAllPools ()
299
+ {
300
+ return CompletableFuture .allOf (
301
+ pools .entrySet ().stream ().map ( entry -> {
302
+ BoltServerAddress address = entry .getKey ();
303
+ ExtendedChannelPool pool = entry .getValue ();
304
+ log .info ( "Closing connection pool towards %s" , address );
305
+ // Wait for all pools to be closed.
306
+ return closePool ( pool ).toCompletableFuture ();
307
+ } ).toArray ( CompletableFuture []::new ) );
308
+ }
284
309
}
0 commit comments