21
21
import io .netty .bootstrap .Bootstrap ;
22
22
import io .netty .channel .Channel ;
23
23
import io .netty .channel .EventLoopGroup ;
24
- import io .netty .channel .pool .ChannelPool ;
25
- import io .netty .util .concurrent .Future ;
26
24
27
25
import java .util .Set ;
28
26
import java .util .concurrent .CompletableFuture ;
49
47
50
48
import static java .lang .String .format ;
51
49
import static org .neo4j .driver .internal .util .Futures .combineErrors ;
52
- import static org .neo4j .driver .internal .util .Futures .completedWithNullIfNonError ;
50
+ import static org .neo4j .driver .internal .util .Futures .completeWithNullIfNoError ;
53
51
54
52
public class ConnectionPoolImpl implements ConnectionPool
55
53
{
@@ -98,9 +96,9 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
98
96
99
97
ListenerEvent acquireEvent = metricsListener .createListenerEvent ();
100
98
metricsListener .beforeAcquiringOrCreating ( pool .id (), acquireEvent );
101
- Future <Channel > connectionFuture = pool .acquire ();
99
+ CompletionStage <Channel > channelFuture = pool .acquire ();
102
100
103
- return Futures . asCompletionStage ( connectionFuture ) .handle ( ( channel , error ) ->
101
+ return channelFuture .handle ( ( channel , error ) ->
104
102
{
105
103
try
106
104
{
@@ -135,13 +133,7 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
135
133
{
136
134
log .info ( "Closing connection pool towards %s, it has no active connections " +
137
135
"and is not in the routing table registry." , address );
138
- // Close in the background
139
- closePool ( pool ).whenComplete ( ( ignored , error ) -> {
140
- if ( error != null )
141
- {
142
- log .warn ( format ( "An error occurred while closing connection pool towards %s." , address ), error );
143
- }
144
- } );
136
+ closePoolInBackground ( address , pool );
145
137
}
146
138
}
147
139
}
@@ -166,35 +158,19 @@ public CompletionStage<Void> close()
166
158
if ( closed .compareAndSet ( false , true ) )
167
159
{
168
160
nettyChannelTracker .prepareToCloseChannels ();
169
-
170
- CompletableFuture <Void > allPoolClosedFuture = CompletableFuture .allOf (
171
- pools .entrySet ().stream ().map ( entry -> {
172
- BoltServerAddress address = entry .getKey ();
173
- ExtendedChannelPool pool = entry .getValue ();
174
- log .info ( "Closing connection pool towards %s" , address );
175
- // Wait for all pools to be closed.
176
- return closePool ( pool ).toCompletableFuture ();
177
- } ).toArray ( CompletableFuture []::new ) );
161
+ CompletableFuture <Void > allPoolClosedFuture = closeAllPools ();
178
162
179
163
// We can only shutdown event loop group when all netty pools are fully closed,
180
164
// otherwise the netty pools might missing threads (from event loop group) to execute clean ups.
181
165
allPoolClosedFuture .whenComplete ( ( ignored , pollCloseError ) -> {
182
166
pools .clear ();
183
167
if ( !ownsEventLoopGroup )
184
168
{
185
- completedWithNullIfNonError ( closeFuture , pollCloseError );
169
+ completeWithNullIfNoError ( closeFuture , pollCloseError );
186
170
}
187
171
else
188
172
{
189
- // This is an attempt to speed up the shut down procedure of the driver
190
- // Feel free return this back to shutdownGracefully() method with default values
191
- // if this proves troublesome!!!
192
- eventLoopGroup ().shutdownGracefully ( 200 , 15_000 , TimeUnit .MILLISECONDS );
193
-
194
- Futures .asCompletionStage ( eventLoopGroup ().terminationFuture () ).whenComplete ( ( ignore , eventLoopGroupTerminationError ) -> {
195
- CompletionException combinedErrors = combineErrors ( pollCloseError , eventLoopGroupTerminationError );
196
- completedWithNullIfNonError ( closeFuture , combinedErrors );
197
- } );
173
+ shutdownEventLoopGroup ( pollCloseError );
198
174
}
199
175
} );
200
176
}
@@ -207,31 +183,10 @@ public boolean isOpen( BoltServerAddress address )
207
183
return pools .containsKey ( address );
208
184
}
209
185
210
- private ExtendedChannelPool getOrCreatePool ( BoltServerAddress address )
211
- {
212
- return pools .computeIfAbsent ( address , this ::newPool );
213
- }
214
-
215
- private CompletionStage <Void > closePool ( ExtendedChannelPool pool )
216
- {
217
- return pool .repeatableCloseAsync ().whenComplete ( ( ignored , error ) ->
218
- // after the connection pool is removed/close, I can remove its metrics.
219
- metricsListener .removePoolMetrics ( pool .id () ) );
220
- }
221
-
222
- ExtendedChannelPool newPool ( BoltServerAddress address )
223
- {
224
- NettyChannelPool pool =
225
- new NettyChannelPool ( address , connector , bootstrap , nettyChannelTracker , channelHealthChecker , settings .connectionAcquisitionTimeout (),
226
- settings .maxConnectionPoolSize () );
227
- // before the connection pool is added I can add the metrics for the pool.
228
- metricsListener .putPoolMetrics ( pool .id (), address , this );
229
- return pool ;
230
- }
231
-
232
- private EventLoopGroup eventLoopGroup ()
186
+ @ Override
187
+ public String toString ()
233
188
{
234
- return bootstrap . config (). group () ;
189
+ return "ConnectionPoolImpl{" + "pools=" + pools + '}' ;
235
190
}
236
191
237
192
private void processAcquisitionError ( ExtendedChannelPool pool , BoltServerAddress serverAddress , Throwable error )
@@ -271,26 +226,84 @@ private void assertNotClosed()
271
226
}
272
227
}
273
228
274
- private void assertNotClosed ( BoltServerAddress address , Channel channel , ChannelPool pool )
229
+ private void assertNotClosed ( BoltServerAddress address , Channel channel , ExtendedChannelPool pool )
275
230
{
276
231
if ( closed .get () )
277
232
{
278
233
pool .release ( channel );
279
- pool . close ( );
234
+ closePoolInBackground ( address , pool );
280
235
pools .remove ( address );
281
236
assertNotClosed ();
282
237
}
283
238
}
284
239
285
- @ Override
286
- public String toString ()
287
- {
288
- return "ConnectionPoolImpl{" + "pools=" + pools + '}' ;
289
- }
290
-
291
240
// for testing only
292
241
ExtendedChannelPool getPool ( BoltServerAddress address )
293
242
{
294
243
return pools .get ( address );
295
244
}
245
+
246
+ ExtendedChannelPool newPool ( BoltServerAddress address )
247
+ {
248
+ return new NettyChannelPool ( address , connector , bootstrap , nettyChannelTracker , channelHealthChecker , settings .connectionAcquisitionTimeout (),
249
+ settings .maxConnectionPoolSize () );
250
+ }
251
+
252
+ private ExtendedChannelPool getOrCreatePool ( BoltServerAddress address )
253
+ {
254
+ return pools .computeIfAbsent ( address , ignored -> {
255
+ ExtendedChannelPool pool = newPool ( address );
256
+ // before the connection pool is added I can add the metrics for the pool.
257
+ metricsListener .putPoolMetrics ( pool .id (), address , this );
258
+ return pool ;
259
+ } );
260
+ }
261
+
262
+ private CompletionStage <Void > closePool ( ExtendedChannelPool pool )
263
+ {
264
+ return pool .close ().whenComplete ( ( ignored , error ) ->
265
+ // after the connection pool is removed/close, I can remove its metrics.
266
+ metricsListener .removePoolMetrics ( pool .id () ) );
267
+ }
268
+
269
+ private void closePoolInBackground ( BoltServerAddress address , ExtendedChannelPool pool )
270
+ {
271
+ // Close in the background
272
+ closePool ( pool ).whenComplete ( ( ignored , error ) -> {
273
+ if ( error != null )
274
+ {
275
+ log .warn ( format ( "An error occurred while closing connection pool towards %s." , address ), error );
276
+ }
277
+ } );
278
+ }
279
+
280
+ private EventLoopGroup eventLoopGroup ()
281
+ {
282
+ return bootstrap .config ().group ();
283
+ }
284
+
285
+ private void shutdownEventLoopGroup ( Throwable pollCloseError )
286
+ {
287
+ // This is an attempt to speed up the shut down procedure of the driver
288
+ // This timeout is needed for `closePoolInBackground` to finish background job, especially for races between `acquire` and `close`.
289
+ eventLoopGroup ().shutdownGracefully ( 200 , 15_000 , TimeUnit .MILLISECONDS );
290
+
291
+ Futures .asCompletionStage ( eventLoopGroup ().terminationFuture () )
292
+ .whenComplete ( ( ignore , eventLoopGroupTerminationError ) -> {
293
+ CompletionException combinedErrors = combineErrors ( pollCloseError , eventLoopGroupTerminationError );
294
+ completeWithNullIfNoError ( closeFuture , combinedErrors );
295
+ } );
296
+ }
297
+
298
+ private CompletableFuture <Void > closeAllPools ()
299
+ {
300
+ return CompletableFuture .allOf (
301
+ pools .entrySet ().stream ().map ( entry -> {
302
+ BoltServerAddress address = entry .getKey ();
303
+ ExtendedChannelPool pool = entry .getValue ();
304
+ log .info ( "Closing connection pool towards %s" , address );
305
+ // Wait for all pools to be closed.
306
+ return closePool ( pool ).toCompletableFuture ();
307
+ } ).toArray ( CompletableFuture []::new ) );
308
+ }
296
309
}
0 commit comments