Skip to content

Commit e3950ba

Browse files
committed
Recover producer and consumer on timeout
Fixes #630
1 parent 9f74095 commit e3950ba

File tree

3 files changed

+103
-13
lines changed

3 files changed

+103
-13
lines changed

src/main/java/com/rabbitmq/stream/impl/Utils.java

+14-13
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@ static short encodeResponseCode(Short code) {
145145
}
146146

147147
static ClientFactory coordinatorClientFactory(StreamEnvironment environment) {
148+
String messageFormat =
149+
"%s. %s. "
150+
+ "This may be due to the usage of a load balancer that makes topology discovery fail. "
151+
+ "Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. "
152+
+ "See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic "
153+
+ "and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer.";
148154
return context -> {
149155
ClientParameters parametersCopy = context.parameters().duplicate();
150156
Address address = new Address(parametersCopy.host(), parametersCopy.port());
@@ -159,20 +165,15 @@ static ClientFactory coordinatorClientFactory(StreamEnvironment environment) {
159165
return Utils.connectToAdvertisedNodeClientFactory(
160166
context.key(), context1 -> new Client(context1.parameters()))
161167
.client(Utils.ClientFactoryContext.fromParameters(parametersCopy).key(context.key()));
168+
} catch (TimeoutStreamException e) {
169+
throw new TimeoutStreamException(
170+
format(messageFormat, e.getMessage(), e.getCause().getMessage(), e.getCause()));
162171
} catch (StreamException e) {
163172
if (e.getCause() != null
164173
&& (e.getCause() instanceof UnknownHostException
165174
|| e.getCause() instanceof ConnectTimeoutException)) {
166-
String message =
167-
e.getMessage()
168-
+ ". "
169-
+ e.getCause().getMessage()
170-
+ ". "
171-
+ "This may be due to the usage of a load balancer that makes topology discovery fail. "
172-
+ "Use a custom AddressResolver or the --load-balancer flag if using StreamPerfTest. "
173-
+ "See https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#understanding-connection-logic "
174-
+ "and https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/#with-a-load-balancer.";
175-
throw new StreamException(message, e.getCause());
175+
throw new StreamException(
176+
format(messageFormat, e.getMessage(), e.getCause().getMessage()), e.getCause());
176177
} else {
177178
throw e;
178179
}
@@ -204,11 +205,11 @@ static ClientFactory connectToAdvertisedNodeClientFactory(
204205
}
205206

206207
static Runnable namedRunnable(Runnable task, String format, Object... args) {
207-
return new NamedRunnable(String.format(format, args), task);
208+
return new NamedRunnable(format(format, args), task);
208209
}
209210

210211
static <T, R> Function<T, R> namedFunction(Function<T, R> task, String format, Object... args) {
211-
return new NamedFunction<>(String.format(format, args), task);
212+
return new NamedFunction<>(format(format, args), task);
212213
}
213214

214215
static <T> T callAndMaybeRetry(
@@ -325,7 +326,7 @@ public Client client(ClientFactoryContext context) {
325326
try {
326327
Thread.sleep(this.retryInterval.toMillis());
327328
} catch (InterruptedException e) {
328-
Thread.interrupted();
329+
Thread.currentThread().interrupt();
329330
return null;
330331
}
331332
}

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

+53
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.rabbitmq.stream.impl.Client.Response;
4242
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo;
4343
import com.rabbitmq.stream.impl.Utils.ClientFactory;
44+
import io.netty.channel.ConnectTimeoutException;
4445
import java.time.Duration;
4546
import java.util.ArrayList;
4647
import java.util.Arrays;
@@ -1711,6 +1712,58 @@ void shouldRetryAssignmentOnRecoveryCandidateLookupFailure() throws Exception {
17111712
verify(locator, times(4)).metadata("stream");
17121713
}
17131714

1715+
@Test
1716+
@SuppressWarnings("unchecked")
1717+
void shouldRetryAssignmentOnRecoveryConnectionTimeout() throws Exception {
1718+
scheduledExecutorService = createScheduledExecutorService(2);
1719+
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
1720+
Duration retryDelay = Duration.ofMillis(100);
1721+
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
1722+
when(environment.topologyUpdateBackOffDelayPolicy())
1723+
.thenReturn(BackOffDelayPolicy.fixed(retryDelay));
1724+
when(consumer.isOpen()).thenReturn(true);
1725+
when(locator.metadata("stream")).thenReturn(metadata("stream", null, replicas()));
1726+
1727+
when(clientFactory.client(any()))
1728+
.thenReturn(client)
1729+
.thenThrow(new TimeoutStreamException("", new ConnectTimeoutException()))
1730+
.thenReturn(client);
1731+
1732+
AtomicInteger subscriptionCount = new AtomicInteger(0);
1733+
when(client.subscribe(
1734+
subscriptionIdCaptor.capture(),
1735+
anyString(),
1736+
any(OffsetSpecification.class),
1737+
anyInt(),
1738+
anyMap()))
1739+
.thenAnswer(
1740+
invocation -> {
1741+
subscriptionCount.incrementAndGet();
1742+
return responseOk();
1743+
});
1744+
1745+
coordinator.subscribe(
1746+
consumer,
1747+
"stream",
1748+
null,
1749+
null,
1750+
NO_OP_SUBSCRIPTION_LISTENER,
1751+
NO_OP_TRACKING_CLOSING_CALLBACK,
1752+
(offset, message) -> {},
1753+
Collections.emptyMap(),
1754+
flowStrategy());
1755+
verify(clientFactory, times(1)).client(any());
1756+
verify(client, times(1))
1757+
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
1758+
1759+
this.shutdownListener.handle(
1760+
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
1761+
1762+
waitAtMost(() -> subscriptionCount.get() == 1 + 1);
1763+
1764+
verify(locator, times(3)).metadata("stream");
1765+
}
1766+
17141767
@Test
17151768
void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() {
17161769
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));

src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.concurrent.atomic.AtomicInteger;
4747
import java.util.concurrent.atomic.AtomicReference;
4848
import java.util.stream.IntStream;
49+
50+
import io.netty.channel.ConnectTimeoutException;
4951
import org.junit.jupiter.api.AfterEach;
5052
import org.junit.jupiter.api.BeforeEach;
5153
import org.junit.jupiter.api.Test;
@@ -301,6 +303,40 @@ void shouldRedistributeProducerAndTrackingConsumerIfConnectionIsLost() throws Ex
301303
assertThat(coordinator.clientCount()).isEqualTo(1);
302304
}
303305

306+
@Test
307+
void shouldRecoverOnConnectionTimeout() throws Exception {
308+
scheduledExecutorService = createScheduledExecutorService();
309+
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
310+
Duration retryDelay = Duration.ofMillis(50);
311+
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
312+
when(locator.metadata("stream"))
313+
.thenReturn(metadata(leader(), replicas()));
314+
315+
when(clientFactory.client(any()))
316+
.thenReturn(client)
317+
.thenThrow(new TimeoutStreamException("", new ConnectTimeoutException()))
318+
.thenReturn(client);
319+
320+
when(producer.isOpen()).thenReturn(true);
321+
322+
StreamProducer producer = mock(StreamProducer.class);
323+
when(producer.isOpen()).thenReturn(true);
324+
325+
CountDownLatch runningLatch = new CountDownLatch(1);
326+
doAnswer(answer(runningLatch::countDown)).when(this.producer).running();
327+
328+
coordinator.registerProducer(this.producer, null, "stream");
329+
330+
verify(this.producer, times(1)).setClient(client);
331+
332+
shutdownListener.handle(
333+
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
334+
335+
assertThat(runningLatch.await(5, TimeUnit.SECONDS)).isTrue();
336+
verify(this.producer, times(1)).unavailable();
337+
verify(this.producer, times(2)).setClient(client);
338+
}
339+
304340
@Test
305341
void shouldDisposeProducerAndNotTrackingConsumerIfRecoveryTimesOut() throws Exception {
306342
scheduledExecutorService = createScheduledExecutorService();

0 commit comments

Comments
 (0)