From 3fa5a50724294be25924f49089f495acdfdab54b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 4 Oct 2023 15:17:38 +0200 Subject: [PATCH] Do not unsubscribe with closed client Check the client is open before trying to unsubscribe. --- .../java/com/rabbitmq/stream/impl/Client.java | 6 +++ .../stream/impl/ConsumersCoordinator.java | 12 +++++- .../java/com/rabbitmq/stream/impl/Utils.java | 1 + .../stream/impl/ConsumersCoordinatorTest.java | 41 +++++++++++++++++-- .../stream/impl/StreamConsumerTest.java | 1 - src/test/resources/logback-test.xml | 2 - 6 files changed, 55 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index a99580cfd6..86d3a518a4 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2029,8 +2029,14 @@ public void done() { } } + static Response responseOk() { + return Response.OK; + } + public static class Response { + private static final Response OK = new Response(RESPONSE_CODE_OK); + private final short responseCode; public Response(short responseCode) { diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 2314434ad3..858643d1ff 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -814,6 +814,10 @@ private void assignConsumersToStream( boolean maybeCloseClient) { Runnable consumersClosingCallback = () -> { + LOGGER.debug( + "Running consumer closing callback after recovery failure, " + + "closing {} subscription(s)", + subscriptions.size()); for (SubscriptionTracker affectedSubscription : subscriptions) { try { affectedSubscription.consumer.closeAfterStreamDeletion(); @@ -1078,7 +1082,13 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) { try { Client.Response unsubscribeResponse = Utils.callAndMaybeRetry( - () -> client.unsubscribe(subscriptionIdInClient), + () -> { + if (client.isOpen()) { + return client.unsubscribe(subscriptionIdInClient); + } else { + return Client.responseOk(); + } + }, RETRY_ON_TIMEOUT, "Unsubscribe request for consumer %d on stream '%s'", subscriptionTracker.consumer.id(), diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 6dfa23c719..1177c7544c 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -230,6 +230,7 @@ static T callAndMaybeRetry( while (keepTrying) { try { attempt++; + LOGGER.debug("Starting attempt #{} for operation '{}'", attempt, description); T result = operation.call(); Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime); LOGGER.debug( diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 06851a55c7..3d1603054f 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -29,10 +29,7 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import com.rabbitmq.stream.*; import com.rabbitmq.stream.codec.WrapperMessageBuilder; @@ -459,6 +456,42 @@ void subscribeShouldSubscribeToStreamAndDispatchMessage_UnsubscribeShouldUnsubsc assertThat(messageHandlerCalls.get()).isEqualTo(1); } + @Test + void shouldNotUnsubscribeIfClientIsClosed() { + when(locator.metadata("stream")).thenReturn(metadata(null, replicas())); + + when(clientFactory.client(any())).thenReturn(client); + when(client.subscribe( + subscriptionIdCaptor.capture(), + anyString(), + any(OffsetSpecification.class), + anyInt(), + anyMap())) + .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); + + Runnable closingRunnable = + coordinator.subscribe( + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + () -> {}, + (offset, message) -> {}, + Collections.emptyMap(), + flowStrategy()); + verify(clientFactory, times(1)).client(any()); + verify(client, times(1)) + .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); + + when(client.isOpen()).thenReturn(false); + when(client.unsubscribe(subscriptionIdCaptor.getValue())) + .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); + + closingRunnable.run(); + verify(client, never()).unsubscribe(subscriptionIdCaptor.getValue()); + } + @Test void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() { when(locator.metadata("stream")).thenReturn(metadata(leader(), null)); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index eefd70a7c6..a67e57d569 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -357,7 +357,6 @@ void consumerShouldBeClosedWhenStreamGetsDeleted(TestInfo info) throws Exception environment.deleteStream(s); TestUtils.waitAtMost(10, () -> !consumer.isOpen()); - assertThat(consumer.isOpen()).isFalse(); } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 52f101159a..4bec720537 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -6,10 +6,8 @@ - -