Skip to content

Commit 065fd49

Browse files
committed
Prevent concurrent consumer recovery
Another recovery can kick in while a first one is in progress. As we added retry, the first one can retry and refresh its connection, so both will succeed and we end up with more consumers than expected.
1 parent 854850a commit 065fd49

File tree

9 files changed

+293
-122
lines changed

9 files changed

+293
-122
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import io.netty.channel.ChannelOption;
8787
import io.netty.channel.ChannelOutboundHandlerAdapter;
8888
import io.netty.channel.ChannelPromise;
89+
import io.netty.channel.ConnectTimeoutException;
8990
import io.netty.channel.EventLoopGroup;
9091
import io.netty.channel.nio.NioEventLoopGroup;
9192
import io.netty.channel.socket.SocketChannel;
@@ -324,6 +325,13 @@ public void write(
324325
this.host = parameters.host;
325326
this.port = parameters.port;
326327
} catch (Exception e) {
328+
if (e instanceof ConnectTimeoutException) {
329+
throw new TimeoutStreamException(
330+
String.format(
331+
"Error while creating stream connection to %s:%d",
332+
parameters.host, parameters.port),
333+
e);
334+
}
327335
throw new StreamException(e);
328336
}
329337

src/main/java/com/rabbitmq/stream/impl/ClientProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ final class ClientProperties {
4444
put("product", "RabbitMQ Stream");
4545
put("version", ClientProperties.VERSION);
4646
put("platform", "Java");
47-
put("copyright", "Copyright (c) 2020-2021 VMware, Inc. or its affiliates.");
47+
put("copyright", "Copyright (c) 2020-2022 VMware, Inc. or its affiliates.");
4848
put("information", "Licensed under the MPL 2.0. See https://www.rabbitmq.com/");
4949
}
5050
});

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 171 additions & 88 deletions
Large diffs are not rendered by default.

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ Registration registerTrackingConsumer(
9090
try {
9191
this.clock.setTime(System.nanoTime());
9292
LOGGER.debug(
93-
"Background offset tracking flushing, {} tracker(s) to check");
93+
"Background offset tracking flushing, {} tracker(s) to check",
94+
this.trackers.size());
9495
Iterator<Tracker> iterator = trackers.iterator();
9596
while (iterator.hasNext()) {
9697
if (Thread.currentThread().isInterrupted()) {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,13 @@ static <T> T locatorOperation(
695695
long start = System.nanoTime();
696696
while (attempt < maxAttempt) {
697697
try {
698-
result = operation.apply(clientSupplier.get());
698+
Client client = clientSupplier.get();
699+
LOGGER.debug(
700+
"Using locator on {}:{} to run operation '{}'",
701+
client.getHost(),
702+
client.getPort(),
703+
operation);
704+
result = operation.apply(client);
699705
LOGGER.debug(
700706
"Locator operation '{}' succeeded in {}",
701707
operation,

src/main/java/com/rabbitmq/stream/impl/TimeoutStreamException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,8 @@ class TimeoutStreamException extends StreamException {
2020
public TimeoutStreamException(String message) {
2121
super(message);
2222
}
23+
24+
public TimeoutStreamException(String message, Throwable cause) {
25+
super(message, cause);
26+
}
2327
}

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 83 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.rabbitmq.stream.impl.TestUtils.metadata;
2020
import static com.rabbitmq.stream.impl.TestUtils.namedConsumer;
2121
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
22+
import static java.lang.String.format;
2223
import static org.assertj.core.api.Assertions.assertThat;
2324
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2425
import static org.mockito.ArgumentMatchers.any;
@@ -146,6 +147,7 @@ public Client.ClientParameters shutdownListener(
146147
when(environment.clientParametersCopy()).thenReturn(clientParameters);
147148
when(environment.addressResolver()).thenReturn(address -> address);
148149
when(client.brokerVersion()).thenReturn("3.11.0");
150+
when(client.isOpen()).thenReturn(true);
149151

150152
coordinator =
151153
new ConsumersCoordinator(
@@ -586,6 +588,69 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception {
586588
assertThat(messageHandlerCalls.get()).isEqualTo(2);
587589
}
588590

591+
@Test
592+
void shouldSkipRecoveryIfRecoveryIsAlreadyInProgress() throws Exception {
593+
scheduledExecutorService = createScheduledExecutorService(2);
594+
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
595+
Duration retryDelay = Duration.ofMillis(100);
596+
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
597+
when(consumer.isOpen()).thenReturn(true);
598+
when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
599+
600+
when(clientFactory.client(any())).thenReturn(client);
601+
AtomicInteger subscriptionCount = new AtomicInteger(0);
602+
when(client.subscribe(
603+
subscriptionIdCaptor.capture(),
604+
anyString(),
605+
any(OffsetSpecification.class),
606+
anyInt(),
607+
anyMap()))
608+
.thenAnswer(
609+
invocation -> {
610+
subscriptionCount.incrementAndGet();
611+
return new Client.Response(Constants.RESPONSE_CODE_OK);
612+
});
613+
614+
String trackingReference = "reference";
615+
616+
when(client.queryOffset(trackingReference, "stream"))
617+
.thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L)) // first subscription
618+
.thenAnswer(
619+
invocation -> {
620+
// during recovery, we trigger another disconnection
621+
shutdownListener.handle(
622+
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
623+
Thread.sleep(retryDelay.multipliedBy(3).toMillis());
624+
throw new TimeoutStreamException("");
625+
})
626+
.thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L));
627+
628+
AtomicInteger messageHandlerCalls = new AtomicInteger();
629+
coordinator.subscribe(
630+
consumer,
631+
"stream",
632+
OffsetSpecification.first(),
633+
trackingReference,
634+
NO_OP_SUBSCRIPTION_LISTENER,
635+
NO_OP_TRACKING_CLOSING_CALLBACK,
636+
(offset, message) -> messageHandlerCalls.incrementAndGet(),
637+
Collections.emptyMap());
638+
verify(clientFactory, times(1)).client(any());
639+
verify(client, times(1))
640+
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
641+
642+
shutdownListener.handle(
643+
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
644+
645+
waitAtMost(
646+
() -> subscriptionCount.get() == 1 + 1,
647+
() -> format("Subscription count is %s", subscriptionCount.get()));
648+
649+
verify(consumer, times(1)).setSubscriptionClient(isNull());
650+
verify(client, times(1 + 1))
651+
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
652+
}
653+
589654
@Test
590655
void shouldRedistributeConsumerOnMetadataUpdate() throws Exception {
591656
BackOffDelayPolicy delayPolicy = fixedWithInitialDelay(ms(100), ms(100));
@@ -1317,39 +1382,35 @@ void shouldRetryAssignmentOnManagerClientTimeout() throws Exception {
13171382
return new Client.Response(Constants.RESPONSE_CODE_OK);
13181383
});
13191384

1320-
Runnable closingRunnable1 =
1321-
coordinator.subscribe(
1322-
consumer,
1323-
"stream-1",
1324-
null,
1325-
consumerName,
1326-
NO_OP_SUBSCRIPTION_LISTENER,
1327-
NO_OP_TRACKING_CLOSING_CALLBACK,
1328-
(offset, message) -> {},
1329-
Collections.emptyMap());
1385+
coordinator.subscribe(
1386+
consumer,
1387+
"stream-1",
1388+
null,
1389+
consumerName,
1390+
NO_OP_SUBSCRIPTION_LISTENER,
1391+
NO_OP_TRACKING_CLOSING_CALLBACK,
1392+
(offset, message) -> {},
1393+
Collections.emptyMap());
13301394
verify(clientFactory, times(1)).client(any());
13311395
verify(client, times(1))
13321396
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
13331397

1334-
Runnable closingRunnable2 =
1335-
coordinator.subscribe(
1336-
consumer,
1337-
"stream-2",
1338-
null,
1339-
consumerName,
1340-
NO_OP_SUBSCRIPTION_LISTENER,
1341-
NO_OP_TRACKING_CLOSING_CALLBACK,
1342-
(offset, message) -> {},
1343-
Collections.emptyMap());
1398+
coordinator.subscribe(
1399+
consumer,
1400+
"stream-2",
1401+
null,
1402+
consumerName,
1403+
NO_OP_SUBSCRIPTION_LISTENER,
1404+
NO_OP_TRACKING_CLOSING_CALLBACK,
1405+
(offset, message) -> {},
1406+
Collections.emptyMap());
13441407
verify(clientFactory, times(1)).client(any());
13451408
verify(client, times(1 + 1))
13461409
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
13471410

13481411
this.shutdownListener.handle(
13491412
new Client.ShutdownContext(Client.ShutdownContext.ShutdownReason.UNKNOWN));
13501413

1351-
Thread.sleep(retryDelay.toMillis() * 5);
1352-
13531414
waitAtMost(() -> subscriptionCount.get() == (1 + 1) * 2);
13541415

13551416
verify(locator, times(2)).metadata("stream-1");

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ void consumerShouldKeepConsumingAfterDisruption(
482482
java.util.function.Consumer<Object> disruption, TestInfo info) throws Exception {
483483
String s = streamName(info);
484484
environment.streamCreator().stream(s).create();
485+
StreamConsumer consumer = null;
485486
try {
486487
int messageCount = 10_000;
487488
CountDownLatch publishLatch = new CountDownLatch(messageCount);
@@ -499,7 +500,7 @@ void consumerShouldKeepConsumingAfterDisruption(
499500
AtomicInteger receivedMessageCount = new AtomicInteger(0);
500501
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
501502
CountDownLatch consumeLatchSecondWave = new CountDownLatch(messageCount * 2);
502-
StreamConsumer consumer =
503+
consumer =
503504
(StreamConsumer)
504505
environment.consumerBuilder().stream(s)
505506
.offset(OffsetSpecification.first())
@@ -519,7 +520,7 @@ void consumerShouldKeepConsumingAfterDisruption(
519520

520521
Client client = cf.get();
521522
TestUtils.waitAtMost(
522-
10,
523+
recoveryInitialDelay.plusSeconds(2),
523524
() -> {
524525
Client.StreamMetadata metadata = client.metadata(s).get(s);
525526
return metadata.getLeader() != null || !metadata.getReplicas().isEmpty();
@@ -537,13 +538,15 @@ void consumerShouldKeepConsumingAfterDisruption(
537538
assertThat(publishLatchSecondWave.await(10, TimeUnit.SECONDS)).isTrue();
538539
producerSecondWave.close();
539540

540-
assertThat(consumeLatchSecondWave.await(10, TimeUnit.SECONDS)).isTrue();
541+
latchAssert(consumeLatchSecondWave).completes(recoveryInitialDelay.plusSeconds(2));
541542
assertThat(receivedMessageCount.get())
542543
.isBetween(messageCount * 2, messageCount * 2 + 1); // there can be a duplicate
543544
assertThat(consumer.isOpen()).isTrue();
544545

545-
consumer.close();
546546
} finally {
547+
if (consumer != null) {
548+
consumer.close();
549+
}
547550
environment.deleteStream(s);
548551
}
549552
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.atomic.AtomicInteger;
3535
import java.util.concurrent.atomic.AtomicReference;
3636
import java.util.function.Function;
37+
import java.util.function.Supplier;
3738
import org.junit.jupiter.api.AfterEach;
3839
import org.junit.jupiter.api.BeforeEach;
3940
import org.junit.jupiter.api.Test;
@@ -191,7 +192,9 @@ void locatorOperationShouldReturnOperationResultIfNoProblem() {
191192
AtomicInteger counter = new AtomicInteger();
192193
int result =
193194
StreamEnvironment.locatorOperation(
194-
c -> counter.incrementAndGet(), () -> null, BackOffDelayPolicy.fixed(Duration.ZERO));
195+
c -> counter.incrementAndGet(),
196+
CLIENT_SUPPLIER,
197+
BackOffDelayPolicy.fixed(Duration.ZERO));
195198
assertThat(result).isEqualTo(1);
196199
}
197200

@@ -207,7 +210,7 @@ void locatorOperationShouldRetryAndReturnResultIfLocatorException() {
207210
return counter.get();
208211
}
209212
},
210-
() -> null,
213+
CLIENT_SUPPLIER,
211214
BackOffDelayPolicy.fixed(Duration.ofMillis(10)));
212215
assertThat(result).isEqualTo(2);
213216
}
@@ -222,7 +225,7 @@ void locatorOperationShouldThrowLocatorExceptionWhenRetryExhausts() {
222225
counter.incrementAndGet();
223226
throw new LocatorNotAvailableException();
224227
},
225-
() -> null,
228+
CLIENT_SUPPLIER,
226229
BackOffDelayPolicy.fixed(Duration.ofMillis(10))))
227230
.isInstanceOf(LocatorNotAvailableException.class);
228231
assertThat(counter).hasValue(3);
@@ -242,7 +245,7 @@ void locatorOperationShouldThrowInterruptedExceptionAsCauseIfInterrupted()
242245
latch.countDown();
243246
throw new LocatorNotAvailableException();
244247
},
245-
() -> null,
248+
CLIENT_SUPPLIER,
246249
BackOffDelayPolicy.fixed(Duration.ofMinutes(10)));
247250
} catch (StreamException e) {
248251
exception.set(e);
@@ -268,9 +271,11 @@ void locatorOperationShouldNotRetryAndReThrowUnexpectedException() {
268271
counter.incrementAndGet();
269272
throw new RuntimeException();
270273
},
271-
() -> null,
274+
CLIENT_SUPPLIER,
272275
BackOffDelayPolicy.fixed(Duration.ofMillis(10))))
273276
.isInstanceOf(RuntimeException.class);
274277
assertThat(counter).hasValue(1);
275278
}
279+
280+
private static final Supplier<Client> CLIENT_SUPPLIER = () -> mock(Client.class);
276281
}

0 commit comments

Comments
 (0)