Skip to content

Commit 93a6c35

Browse files
committed
Don't check consumer is open in coordinator
It's already done in a decorating message handler, in the consumer itself. References #142
1 parent 4b897c5 commit 93a6c35

File tree

2 files changed

+1
-5
lines changed

2 files changed

+1
-5
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ private ClientSubscriptionsManager(
462462
SubscriptionTracker subscriptionTracker =
463463
subscriptionTrackers.get(subscriptionId & 0xFF);
464464

465-
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
465+
if (subscriptionTracker != null) {
466466
subscriptionTracker.offset = offset;
467467
subscriptionTracker.hasReceivedSomething = true;
468468
subscriptionTracker.messageHandler.handle(

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

-4
Original file line numberDiff line numberDiff line change
@@ -397,8 +397,6 @@ void subscribeShouldSubscribeToStreamAndDispatchMessage_UnsubscribeShouldUnsubsc
397397
anyMap()))
398398
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
399399

400-
when(consumer.isOpen()).thenReturn(true);
401-
402400
AtomicInteger messageHandlerCalls = new AtomicInteger();
403401
AtomicInteger trackingClosingCallbackCalls = new AtomicInteger();
404402
Runnable closingRunnable =
@@ -445,8 +443,6 @@ void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() {
445443
anyMap()))
446444
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
447445

448-
when(consumer.isOpen()).thenReturn(true);
449-
450446
Map<Byte, Integer> messageHandlerCalls = new ConcurrentHashMap<>();
451447
List<Runnable> closingRunnables = new ArrayList<>();
452448
for (int i = 0; i < ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {

0 commit comments

Comments
 (0)