@@ -155,8 +155,12 @@ public class Client implements AutoCloseable {
155
155
final ConcurrentMap <Integer , OutstandingRequest <?>> outstandingRequests =
156
156
new ConcurrentHashMap <>();
157
157
final List <SubscriptionOffset > subscriptionOffsets = new CopyOnWriteArrayList <>();
158
+ // dispatches broker frames, except for delivery frames
158
159
final ExecutorService executorService ;
160
+ private final Consumer <ExecutorService > closeExecutorService ;
161
+ // dispatches delivery frames only
159
162
final ExecutorService dispatchingExecutorService ;
163
+ private final Consumer <ExecutorService > closeDispatchingExecutorService ;
160
164
final TuneState tuneState ;
161
165
final AtomicBoolean closing = new AtomicBoolean (false );
162
166
final AtomicBoolean shuttingDownDispatching = new AtomicBoolean (false );
@@ -174,7 +178,6 @@ public long applyAsLong(Object value) {
174
178
}
175
179
};
176
180
private final AtomicInteger correlationSequence = new AtomicInteger (0 );
177
- private final Runnable executorServiceClosing ;
178
181
private final SaslConfiguration saslConfiguration ;
179
182
private final CredentialsProvider credentialsProvider ;
180
183
private final Runnable nettyClosing ;
@@ -331,44 +334,58 @@ public void initChannel(SocketChannel ch) {
331
334
this .channel = f .channel ();
332
335
ExecutorServiceFactory executorServiceFactory = parameters .executorServiceFactory ;
333
336
if (executorServiceFactory == null ) {
337
+ this .closeExecutorService =
338
+ Utils .makeIdempotent (
339
+ es -> {
340
+ if (es != null ) {
341
+ es .shutdownNow ();
342
+ }
343
+ });
334
344
this .executorService =
335
345
Executors .newSingleThreadExecutor (threadFactory (clientConnectionName + "-" ));
336
346
} else {
347
+ this .closeExecutorService =
348
+ Utils .makeIdempotent (
349
+ es -> {
350
+ if (es != null ) {
351
+ executorServiceFactory .clientClosed (es );
352
+ }
353
+ });
337
354
this .executorService = executorServiceFactory .get ();
338
355
}
339
356
ExecutorServiceFactory dispatchingExecutorServiceFactory =
340
357
parameters .dispatchingExecutorServiceFactory ;
341
358
if (dispatchingExecutorServiceFactory == null ) {
359
+ this .closeDispatchingExecutorService =
360
+ Utils .makeIdempotent (
361
+ es -> {
362
+ if (es != null ) {
363
+ List <Runnable > outstandingTasks = es .shutdownNow ();
364
+ this .shuttingDownDispatching .set (true );
365
+ for (Runnable outstandingTask : outstandingTasks ) {
366
+ try {
367
+ outstandingTask .run ();
368
+ } catch (Exception e ) {
369
+ LOGGER .info (
370
+ "Error while releasing buffer in outstanding connection tasks: {}" ,
371
+ e .getMessage ());
372
+ }
373
+ }
374
+ }
375
+ });
342
376
this .dispatchingExecutorService =
343
377
Executors .newSingleThreadExecutor (
344
378
threadFactory ("dispatching-" + clientConnectionName + "-" ));
345
379
} else {
380
+ this .closeDispatchingExecutorService =
381
+ Utils .makeIdempotent (
382
+ es -> {
383
+ if (es != null ) {
384
+ dispatchingExecutorServiceFactory .clientClosed (es );
385
+ }
386
+ });
346
387
this .dispatchingExecutorService = dispatchingExecutorServiceFactory .get ();
347
388
}
348
- this .executorServiceClosing =
349
- Utils .makeIdempotent (
350
- () -> {
351
- if (dispatchingExecutorServiceFactory == null ) {
352
- List <Runnable > outstandingTasks = this .dispatchingExecutorService .shutdownNow ();
353
- this .shuttingDownDispatching .set (true );
354
- for (Runnable outstandingTask : outstandingTasks ) {
355
- try {
356
- outstandingTask .run ();
357
- } catch (Exception e ) {
358
- LOGGER .info (
359
- "Error while releasing buffer in outstanding connection tasks: {}" ,
360
- e .getMessage ());
361
- }
362
- }
363
- } else {
364
- dispatchingExecutorServiceFactory .clientClosed (this .dispatchingExecutorService );
365
- }
366
- if (executorServiceFactory == null ) {
367
- this .executorService .shutdownNow ();
368
- } else {
369
- executorServiceFactory .clientClosed (this .executorService );
370
- }
371
- });
372
389
try {
373
390
this .tuneState =
374
391
new TuneState (
@@ -1451,7 +1468,12 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
1451
1468
this .shutdownListenerCallback .accept (reason );
1452
1469
}
1453
1470
this .nettyClosing .run ();
1454
- this .executorServiceClosing .run ();
1471
+ if (this .closeDispatchingExecutorService != null ) {
1472
+ this .closeDispatchingExecutorService .accept (this .dispatchingExecutorService );
1473
+ }
1474
+ if (this .closeExecutorService != null ) {
1475
+ this .closeExecutorService .accept (this .executorService );
1476
+ }
1455
1477
}
1456
1478
1457
1479
private void closeNetty () {
0 commit comments