From e3950ba9e0772a1a04d7dcaea22935673408c570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 23 Oct 2024 11:04:46 +0200 Subject: [PATCH] Recover producer and consumer on timeout Fixes #630 --- .../java/com/rabbitmq/stream/impl/Utils.java | 27 +++++----- .../stream/impl/ConsumersCoordinatorTest.java | 53 +++++++++++++++++++ .../stream/impl/ProducersCoordinatorTest.java | 36 +++++++++++++ 3 files changed, 103 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 43cddc1f1e..745c7778a1 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -145,6 +145,12 @@ static short encodeResponseCode(Short code) { } static ClientFactory coordinatorClientFactory(StreamEnvironment environment) { + String messageFormat = + "%s. %s. " + + "This may be due to the usage of a load balancer that makes topology discovery fail. " + + "Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. " + + "See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic " + + "and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer."; return context -> { ClientParameters parametersCopy = context.parameters().duplicate(); Address address = new Address(parametersCopy.host(), parametersCopy.port()); @@ -159,20 +165,15 @@ static ClientFactory coordinatorClientFactory(StreamEnvironment environment) { return Utils.connectToAdvertisedNodeClientFactory( context.key(), context1 -> new Client(context1.parameters())) .client(Utils.ClientFactoryContext.fromParameters(parametersCopy).key(context.key())); + } catch (TimeoutStreamException e) { + throw new TimeoutStreamException( + format(messageFormat, e.getMessage(), e.getCause().getMessage(), e.getCause())); } catch (StreamException e) { if (e.getCause() != null && (e.getCause() instanceof UnknownHostException || e.getCause() instanceof ConnectTimeoutException)) { - String message = - e.getMessage() - + ". " - + e.getCause().getMessage() - + ". " - + "This may be due to the usage of a load balancer that makes topology discovery fail. " - + "Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. " - + "See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic " - + "and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer."; - throw new StreamException(message, e.getCause()); + throw new StreamException( + format(messageFormat, e.getMessage(), e.getCause().getMessage()), e.getCause()); } else { throw e; } @@ -204,11 +205,11 @@ static ClientFactory connectToAdvertisedNodeClientFactory( } static Runnable namedRunnable(Runnable task, String format, Object... args) { - return new NamedRunnable(String.format(format, args), task); + return new NamedRunnable(format(format, args), task); } static Function namedFunction(Function task, String format, Object... args) { - return new NamedFunction<>(String.format(format, args), task); + return new NamedFunction<>(format(format, args), task); } static T callAndMaybeRetry( @@ -325,7 +326,7 @@ public Client client(ClientFactoryContext context) { try { Thread.sleep(this.retryInterval.toMillis()); } catch (InterruptedException e) { - Thread.interrupted(); + Thread.currentThread().interrupt(); return null; } } diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 110e5b946b..6b655e9141 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -41,6 +41,7 @@ import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo; import com.rabbitmq.stream.impl.Utils.ClientFactory; +import io.netty.channel.ConnectTimeoutException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -1711,6 +1712,58 @@ void shouldRetryAssignmentOnRecoveryCandidateLookupFailure() throws Exception { verify(locator, times(4)).metadata("stream"); } + @Test + @SuppressWarnings("unchecked") + void shouldRetryAssignmentOnRecoveryConnectionTimeout() throws Exception { + scheduledExecutorService = createScheduledExecutorService(2); + when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService); + Duration retryDelay = Duration.ofMillis(100); + when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay)); + when(environment.topologyUpdateBackOffDelayPolicy()) + .thenReturn(BackOffDelayPolicy.fixed(retryDelay)); + when(consumer.isOpen()).thenReturn(true); + when(locator.metadata("stream")).thenReturn(metadata("stream", null, replicas())); + + when(clientFactory.client(any())) + .thenReturn(client) + .thenThrow(new TimeoutStreamException("", new ConnectTimeoutException())) + .thenReturn(client); + + AtomicInteger subscriptionCount = new AtomicInteger(0); + when(client.subscribe( + subscriptionIdCaptor.capture(), + anyString(), + any(OffsetSpecification.class), + anyInt(), + anyMap())) + .thenAnswer( + invocation -> { + subscriptionCount.incrementAndGet(); + return responseOk(); + }); + + coordinator.subscribe( + consumer, + "stream", + null, + null, + NO_OP_SUBSCRIPTION_LISTENER, + NO_OP_TRACKING_CLOSING_CALLBACK, + (offset, message) -> {}, + Collections.emptyMap(), + flowStrategy()); + verify(clientFactory, times(1)).client(any()); + verify(client, times(1)) + .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); + + this.shutdownListener.handle( + new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN)); + + waitAtMost(() -> subscriptionCount.get() == 1 + 1); + + verify(locator, times(3)).metadata("stream"); + } + @Test void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() { when(locator.metadata("stream")).thenReturn(metadata(null, replicas())); diff --git a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java index 34817e6752..ca8c550b26 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java @@ -46,6 +46,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; + +import io.netty.channel.ConnectTimeoutException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -301,6 +303,40 @@ void shouldRedistributeProducerAndTrackingConsumerIfConnectionIsLost() throws Ex assertThat(coordinator.clientCount()).isEqualTo(1); } + @Test + void shouldRecoverOnConnectionTimeout() throws Exception { + scheduledExecutorService = createScheduledExecutorService(); + when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService); + Duration retryDelay = Duration.ofMillis(50); + when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay)); + when(locator.metadata("stream")) + .thenReturn(metadata(leader(), replicas())); + + when(clientFactory.client(any())) + .thenReturn(client) + .thenThrow(new TimeoutStreamException("", new ConnectTimeoutException())) + .thenReturn(client); + + when(producer.isOpen()).thenReturn(true); + + StreamProducer producer = mock(StreamProducer.class); + when(producer.isOpen()).thenReturn(true); + + CountDownLatch runningLatch = new CountDownLatch(1); + doAnswer(answer(runningLatch::countDown)).when(this.producer).running(); + + coordinator.registerProducer(this.producer, null, "stream"); + + verify(this.producer, times(1)).setClient(client); + + shutdownListener.handle( + new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN)); + + assertThat(runningLatch.await(5, TimeUnit.SECONDS)).isTrue(); + verify(this.producer, times(1)).unavailable(); + verify(this.producer, times(2)).setClient(client); + } + @Test void shouldDisposeProducerAndNotTrackingConsumerIfRecoveryTimesOut() throws Exception { scheduledExecutorService = createScheduledExecutorService();