File tree 1 file changed +4
-7
lines changed
src/test/java/com/rabbitmq/stream/impl
1 file changed +4
-7
lines changed Original file line number Diff line number Diff line change @@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
203
203
environment .consumerBuilder ().stream (stream )
204
204
.offset (OffsetSpecification .first ())
205
205
.flow ()
206
- .strategy (creditWhenHalfMessagesProcessed ())
206
+ .strategy (creditWhenHalfMessagesProcessed (1 ))
207
207
.builder ();
208
208
209
209
List <MessageHandler .Context > messageContexts = synchronizedList (new ArrayList <>());
@@ -244,14 +244,13 @@ void asynchronousProcessingWithFlowControl() {
244
244
int messageCount = 100_000 ;
245
245
publishAndWaitForConfirms (cf , messageCount , stream );
246
246
247
- ExecutorService executorService =
248
- Executors .newFixedThreadPool (getRuntime ().availableProcessors ());
249
- try {
247
+ try (ExecutorService executorService =
248
+ Executors .newFixedThreadPool (getRuntime ().availableProcessors ())) {
250
249
CountDownLatch latch = new CountDownLatch (messageCount );
251
250
environment .consumerBuilder ().stream (stream )
252
251
.offset (OffsetSpecification .first ())
253
252
.flow ()
254
- .strategy (creditWhenHalfMessagesProcessed ())
253
+ .strategy (creditWhenHalfMessagesProcessed (1 ))
255
254
.builder ()
256
255
.messageHandler (
257
256
(ctx , message ) ->
@@ -262,8 +261,6 @@ void asynchronousProcessingWithFlowControl() {
262
261
}))
263
262
.build ();
264
263
assertThat (latch ).is (completed ());
265
- } finally {
266
- executorService .shutdownNow ();
267
264
}
268
265
}
269
266
You can’t perform that action at this time.
0 commit comments