Skip to content

Recover producer and consumer on timeout #641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ static short encodeResponseCode(Short code) {
}

static ClientFactory coordinatorClientFactory(StreamEnvironment environment) {
String messageFormat =
"%s. %s. "
+ "This may be due to the usage of a load balancer that makes topology discovery fail. "
+ "Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. "
+ "See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic "
+ "and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer.";
return context -> {
ClientParameters parametersCopy = context.parameters().duplicate();
Address address = new Address(parametersCopy.host(), parametersCopy.port());
Expand All @@ -159,20 +165,15 @@ static ClientFactory coordinatorClientFactory(StreamEnvironment environment) {
return Utils.connectToAdvertisedNodeClientFactory(
context.key(), context1 -> new Client(context1.parameters()))
.client(Utils.ClientFactoryContext.fromParameters(parametersCopy).key(context.key()));
} catch (TimeoutStreamException e) {
throw new TimeoutStreamException(
format(messageFormat, e.getMessage(), e.getCause().getMessage(), e.getCause()));
} catch (StreamException e) {
if (e.getCause() != null
&& (e.getCause() instanceof UnknownHostException
|| e.getCause() instanceof ConnectTimeoutException)) {
String message =
e.getMessage()
+ ". "
+ e.getCause().getMessage()
+ ". "
+ "This may be due to the usage of a load balancer that makes topology discovery fail. "
+ "Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. "
+ "See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic "
+ "and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer.";
throw new StreamException(message, e.getCause());
throw new StreamException(
format(messageFormat, e.getMessage(), e.getCause().getMessage()), e.getCause());
} else {
throw e;
}
Expand Down Expand Up @@ -204,11 +205,11 @@ static ClientFactory connectToAdvertisedNodeClientFactory(
}

static Runnable namedRunnable(Runnable task, String format, Object... args) {
return new NamedRunnable(String.format(format, args), task);
return new NamedRunnable(format(format, args), task);
}

static <T, R> Function<T, R> namedFunction(Function<T, R> task, String format, Object... args) {
return new NamedFunction<>(String.format(format, args), task);
return new NamedFunction<>(format(format, args), task);
}

static <T> T callAndMaybeRetry(
Expand Down Expand Up @@ -325,7 +326,7 @@ public Client client(ClientFactoryContext context) {
try {
Thread.sleep(this.retryInterval.toMillis());
} catch (InterruptedException e) {
Thread.interrupted();
Thread.currentThread().interrupt();
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo;
import com.rabbitmq.stream.impl.Utils.ClientFactory;
import io.netty.channel.ConnectTimeoutException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -1711,6 +1712,58 @@ void shouldRetryAssignmentOnRecoveryCandidateLookupFailure() throws Exception {
verify(locator, times(4)).metadata("stream");
}

@Test
@SuppressWarnings("unchecked")
void shouldRetryAssignmentOnRecoveryConnectionTimeout() throws Exception {
scheduledExecutorService = createScheduledExecutorService(2);
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(100);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
when(environment.topologyUpdateBackOffDelayPolicy())
.thenReturn(BackOffDelayPolicy.fixed(retryDelay));
when(consumer.isOpen()).thenReturn(true);
when(locator.metadata("stream")).thenReturn(metadata("stream", null, replicas()));

when(clientFactory.client(any()))
.thenReturn(client)
.thenThrow(new TimeoutStreamException("", new ConnectTimeoutException()))
.thenReturn(client);

AtomicInteger subscriptionCount = new AtomicInteger(0);
when(client.subscribe(
subscriptionIdCaptor.capture(),
anyString(),
any(OffsetSpecification.class),
anyInt(),
anyMap()))
.thenAnswer(
invocation -> {
subscriptionCount.incrementAndGet();
return responseOk();
});

coordinator.subscribe(
consumer,
"stream",
null,
null,
NO_OP_SUBSCRIPTION_LISTENER,
NO_OP_TRACKING_CLOSING_CALLBACK,
(offset, message) -> {},
Collections.emptyMap(),
flowStrategy());
verify(clientFactory, times(1)).client(any());
verify(client, times(1))
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());

this.shutdownListener.handle(
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));

waitAtMost(() -> subscriptionCount.get() == 1 + 1);

verify(locator, times(3)).metadata("stream");
}

@Test
void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() {
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

import io.netty.channel.ConnectTimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -301,6 +303,40 @@ void shouldRedistributeProducerAndTrackingConsumerIfConnectionIsLost() throws Ex
assertThat(coordinator.clientCount()).isEqualTo(1);
}

@Test
void shouldRecoverOnConnectionTimeout() throws Exception {
scheduledExecutorService = createScheduledExecutorService();
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
Duration retryDelay = Duration.ofMillis(50);
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
when(locator.metadata("stream"))
.thenReturn(metadata(leader(), replicas()));

when(clientFactory.client(any()))
.thenReturn(client)
.thenThrow(new TimeoutStreamException("", new ConnectTimeoutException()))
.thenReturn(client);

when(producer.isOpen()).thenReturn(true);

StreamProducer producer = mock(StreamProducer.class);
when(producer.isOpen()).thenReturn(true);

CountDownLatch runningLatch = new CountDownLatch(1);
doAnswer(answer(runningLatch::countDown)).when(this.producer).running();

coordinator.registerProducer(this.producer, null, "stream");

verify(this.producer, times(1)).setClient(client);

shutdownListener.handle(
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));

assertThat(runningLatch.await(5, TimeUnit.SECONDS)).isTrue();
verify(this.producer, times(1)).unavailable();
verify(this.producer, times(2)).setClient(client);
}

@Test
void shouldDisposeProducerAndNotTrackingConsumerIfRecoveryTimesOut() throws Exception {
scheduledExecutorService = createScheduledExecutorService();
Expand Down