File tree 1 file changed +5
-3
lines changed
src/test/java/com/rabbitmq/stream/impl
1 file changed +5
-3
lines changed Original file line number Diff line number Diff line change @@ -243,9 +243,9 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
243
243
void asynchronousProcessingWithFlowControl () {
244
244
int messageCount = 100_000 ;
245
245
publishAndWaitForConfirms (cf , messageCount , stream );
246
-
247
- try ( ExecutorService executorService =
248
- Executors . newFixedThreadPool ( getRuntime (). availableProcessors ())) {
246
+ ExecutorService executorService =
247
+ Executors . newFixedThreadPool ( getRuntime (). availableProcessors ());
248
+ try {
249
249
CountDownLatch latch = new CountDownLatch (messageCount );
250
250
environment .consumerBuilder ().stream (stream )
251
251
.offset (OffsetSpecification .first ())
@@ -261,6 +261,8 @@ void asynchronousProcessingWithFlowControl() {
261
261
}))
262
262
.build ();
263
263
assertThat (latch ).is (completed ());
264
+ } finally {
265
+ executorService .shutdownNow ();
264
266
}
265
267
}
266
268
You can’t perform that action at this time.
0 commit comments