Skip to content

Commit 05477a1

Browse files
committed
Retry locator operation if locator no available
1 parent 5fb0f47 commit 05477a1

13 files changed

+167
-38
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,11 @@ Runnable subscribe(
115115
return subscriptionTracker::cancel;
116116
}
117117

118-
private Client locator() {
119-
return environment.locator();
120-
}
121-
122118
// package protected for testing
123119
List<Client.Broker> findBrokersForStream(String stream) {
124120
// FIXME make sure locator is not null (retry)
125-
Map<String, Client.StreamMetadata> metadata = locator().metadata(stream);
121+
Map<String, Client.StreamMetadata> metadata =
122+
this.environment.locatorOperation(c -> c.metadata(stream));
126123
if (metadata.size() == 0 || metadata.get(stream) == null) {
127124
// this is not supposed to happen
128125
throw new StreamDoesNotExistException(stream);

src/main/java/com/rabbitmq/stream/impl/HashRoutingStrategy.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ class HashRoutingStrategy implements RoutingStrategy {
3838
ToIntFunction<String> hash) {
3939
this.routingKeyExtractor = routingKeyExtractor;
4040
this.env = env;
41-
// TODO use async retry to get locator
42-
List<String> ps = this.env.locator().partitions(superStream);
41+
List<String> ps = this.env.locatorOperation(c -> c.partitions(superStream));
4342
this.partitions =
4443
new CopyOnWriteArrayList<>(
4544
ps.stream().map(Collections::singletonList).collect(Collectors.toList()));

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
9595
}
9696

9797
private Client.Broker getBrokerForProducer(String stream) {
98-
Map<String, Client.StreamMetadata> metadata = this.environment.locator().metadata(stream);
98+
Map<String, Client.StreamMetadata> metadata =
99+
this.environment.locatorOperation(c -> c.metadata(stream));
99100
if (metadata.size() == 0 || metadata.get(stream) == null) {
100101
throw new StreamDoesNotExistException(stream);
101102
}

src/main/java/com/rabbitmq/stream/impl/RoutingKeyRoutingStrategy.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public List<String> route(Message message) {
4444
routingKey,
4545
routingKey1 -> {
4646
// TODO retry on locator lookup
47-
return env.locator().route(routingKey1, superStream);
47+
return env.locatorOperation(c -> c.route(routingKey1, superStream));
4848
});
4949
return streams;
5050
}

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+49-3
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.function.Consumer;
5656
import java.util.function.Function;
5757
import java.util.function.LongConsumer;
58+
import java.util.function.Supplier;
5859
import java.util.stream.Collectors;
5960
import javax.net.ssl.SSLException;
6061
import org.slf4j.Logger;
@@ -477,15 +478,53 @@ Runnable registerProducer(StreamProducer producer, String reference, String stre
477478
return producersCoordinator.registerProducer(producer, reference, stream);
478479
}
479480

480-
// FIXME make the locator available as a completable future (with retry)
481-
// this would make client code more robust
482481
Client locator() {
483482
if (this.locator == null) {
484-
throw new StreamException("No connection available");
483+
throw new LocatorNotAvailableException();
485484
}
486485
return this.locator;
487486
}
488487

488+
<T> T locatorOperation(Function<Client, T> operation) {
489+
return locatorOperation(operation, () -> locator(), this.recoveryBackOffDelayPolicy);
490+
}
491+
492+
static <T> T locatorOperation(
493+
Function<Client, T> operation,
494+
Supplier<Client> clientSupplier,
495+
BackOffDelayPolicy backOffDelayPolicy) {
496+
int maxAttempt = 3;
497+
int attempt = 0;
498+
boolean executed = false;
499+
Exception lastException = null;
500+
T result = null;
501+
while (attempt < maxAttempt) {
502+
try {
503+
result = operation.apply(clientSupplier.get());
504+
executed = true;
505+
break;
506+
} catch (LocatorNotAvailableException e) {
507+
attempt++;
508+
try {
509+
Thread.sleep(backOffDelayPolicy.delay(attempt).toMillis());
510+
} catch (InterruptedException ex) {
511+
lastException = ex;
512+
Thread.currentThread().interrupt();
513+
break;
514+
}
515+
}
516+
}
517+
if (!executed) {
518+
if (lastException == null) {
519+
throw new LocatorNotAvailableException();
520+
} else {
521+
throw new StreamException(
522+
"Could not execute operation after " + maxAttempt + " attempts", lastException);
523+
}
524+
}
525+
return result;
526+
}
527+
489528
Clock clock() {
490529
return this.clock;
491530
}
@@ -561,4 +600,11 @@ public Consumer<Context> postMessageProcessingCallback() {
561600
return postMessageProcessingCallback;
562601
}
563602
}
603+
604+
static class LocatorNotAvailableException extends StreamException {
605+
606+
public LocatorNotAvailableException() {
607+
super("Locator not available");
608+
}
609+
}
564610
}

src/main/java/com/rabbitmq/stream/impl/StreamStreamCreator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void create() {
6969
}
7070
this.environment.maybeInitializeLocator();
7171
Client.Response response =
72-
environment.locator().create(stream, streamParametersBuilder.build());
72+
environment.locatorOperation(c -> c.create(stream, streamParametersBuilder.build()));
7373
if (!response.isOk()
7474
&& response.getResponseCode() != Constants.RESPONSE_CODE_STREAM_ALREADY_EXISTS) {
7575
throw new StreamException(

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class SuperStreamConsumer implements Consumer {
2929
SuperStreamConsumer(
3030
StreamConsumerBuilder builder, String superStream, StreamEnvironment environment) {
3131
this.superStream = superStream;
32-
for (String partition : environment.locator().partitions(superStream)) {
32+
for (String partition : environment.locatorOperation(c -> c.partitions(superStream))) {
3333
Consumer consumer = builder.duplicate().superStream(null).stream(partition).build();
3434
consumers.put(partition, consumer);
3535
LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", partition, superStream);

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,12 @@ public MessageBuilder messageBuilder() {
6161
@Override
6262
public long getLastPublishingId() {
6363
if (this.name != null && !this.name.isEmpty()) {
64-
List<String> streams = this.environment.locator().partitions(superStream);
64+
List<String> streams = this.environment.locatorOperation(c -> c.partitions(superStream));
6565
long publishingId = 0;
6666
boolean first = true;
6767
for (String partition : streams) {
68-
long pubId = this.environment.locator().queryPublisherSequence(this.name, partition);
68+
long pubId =
69+
this.environment.locatorOperation(c -> c.queryPublisherSequence(this.name, partition));
6970
if (first) {
7071
publishingId = pubId;
7172
first = false;

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public Client.ClientParameters shutdownListener(
130130
};
131131
mocks = MockitoAnnotations.openMocks(this);
132132
when(environment.locator()).thenReturn(locator);
133+
when(environment.locatorOperation(any())).thenCallRealMethod();
133134
when(environment.clientParametersCopy()).thenReturn(clientParameters);
134135
when(environment.addressResolver()).thenReturn(address -> address);
135136

src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public Client.ClientParameters metadataListener(
105105
};
106106
mocks = MockitoAnnotations.openMocks(this);
107107
when(environment.locator()).thenReturn(locator);
108+
when(environment.locatorOperation(any())).thenCallRealMethod();
108109
when(environment.clientParametersCopy()).thenReturn(clientParameters);
109110
when(environment.addressResolver()).thenReturn(address -> address);
110111
when(trackingConsumer.stream()).thenReturn("stream");

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+9-22
Original file line numberDiff line numberDiff line change
@@ -373,34 +373,21 @@ void environmentPublishersConsumersShouldCloseSuccessfullyWhenBrokerIsDown() thr
373373
void locatorShouldReconnectIfConnectionIsLost(TestInfo info) throws Exception {
374374
try (Environment environment =
375375
environmentBuilder
376-
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
376+
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(1)))
377377
.build()) {
378378
String s = streamName(info);
379379
environment.streamCreator().stream(s).create();
380380
environment.deleteStream(s);
381381
Host.killConnection("rabbitmq-stream-locator");
382-
assertThatThrownBy(() -> environment.streamCreator().stream("whatever").create())
383-
.isInstanceOf(StreamException.class);
384-
assertThatThrownBy(() -> environment.deleteStream("whatever"))
385-
.isInstanceOf(StreamException.class);
386-
assertThatThrownBy(() -> environment.producerBuilder().stream(stream).build())
387-
.isInstanceOf(StreamException.class);
388-
assertThatThrownBy(() -> environment.consumerBuilder().stream(stream).build())
389-
.isInstanceOf(StreamException.class);
390-
391-
Producer producer = null;
392-
int timeout = 10_000;
393-
int waited = 0;
394-
int interval = 1_000;
395-
while (producer == null && waited < timeout) {
396-
try {
397-
Thread.sleep(interval);
398-
waited += interval;
399-
producer = environment.producerBuilder().stream(stream).build();
400-
} catch (StreamException e) {
401-
}
382+
environment.streamCreator().stream(s).create();
383+
try {
384+
Producer producer = environment.producerBuilder().stream(s).build();
385+
Consumer consumer = environment.consumerBuilder().stream(s).build();
386+
producer.close();
387+
consumer.close();
388+
} finally {
389+
environment.deleteStream(s);
402390
}
403-
assertThat(producer).isNotNull();
404391
}
405392
}
406393

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java

+95
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,25 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1619
import static org.mockito.ArgumentMatchers.any;
1720
import static org.mockito.Mockito.*;
1821

1922
import com.rabbitmq.stream.BackOffDelayPolicy;
23+
import com.rabbitmq.stream.StreamException;
2024
import com.rabbitmq.stream.impl.Client.ClientParameters;
25+
import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException;
2126
import io.netty.buffer.ByteBufAllocator;
2227
import java.net.URI;
2328
import java.time.Duration;
2429
import java.util.Arrays;
2530
import java.util.Collections;
31+
import java.util.concurrent.CountDownLatch;
2632
import java.util.concurrent.Executors;
2733
import java.util.concurrent.ScheduledExecutorService;
34+
import java.util.concurrent.atomic.AtomicInteger;
2835
import java.util.concurrent.atomic.AtomicReference;
2936
import java.util.function.Function;
3037
import org.junit.jupiter.api.AfterEach;
@@ -173,4 +180,92 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
173180
cf);
174181
verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
175182
}
183+
184+
@Test
185+
void locatorOperationShouldReturnOperationResultIfNoProblem() {
186+
AtomicInteger counter = new AtomicInteger();
187+
int result =
188+
StreamEnvironment.locatorOperation(
189+
c -> counter.incrementAndGet(), () -> null, BackOffDelayPolicy.fixed(Duration.ZERO));
190+
assertThat(result).isEqualTo(1);
191+
}
192+
193+
@Test
194+
void locatorOperationShouldRetryAndReturnResultIfLocatorException() {
195+
AtomicInteger counter = new AtomicInteger();
196+
int result =
197+
StreamEnvironment.locatorOperation(
198+
c -> {
199+
if (counter.incrementAndGet() < 2) {
200+
throw new LocatorNotAvailableException();
201+
} else {
202+
return counter.get();
203+
}
204+
},
205+
() -> null,
206+
BackOffDelayPolicy.fixed(Duration.ofMillis(10)));
207+
assertThat(result).isEqualTo(2);
208+
}
209+
210+
@Test
211+
void locatorOperationShouldThrowLocatorExceptionWhenRetryExhausts() {
212+
AtomicInteger counter = new AtomicInteger();
213+
assertThatThrownBy(
214+
() ->
215+
StreamEnvironment.locatorOperation(
216+
c -> {
217+
counter.incrementAndGet();
218+
throw new LocatorNotAvailableException();
219+
},
220+
() -> null,
221+
BackOffDelayPolicy.fixed(Duration.ofMillis(10))))
222+
.isInstanceOf(LocatorNotAvailableException.class);
223+
assertThat(counter).hasValue(3);
224+
}
225+
226+
@Test
227+
void locatorOperationShouldThrowInterruptedExceptionAsCauseIfInterrupted()
228+
throws InterruptedException {
229+
CountDownLatch latch = new CountDownLatch(1);
230+
AtomicReference<Exception> exception = new AtomicReference<>();
231+
Thread thread =
232+
new Thread(
233+
() -> {
234+
try {
235+
StreamEnvironment.locatorOperation(
236+
c -> {
237+
latch.countDown();
238+
throw new LocatorNotAvailableException();
239+
},
240+
() -> null,
241+
BackOffDelayPolicy.fixed(Duration.ofMinutes(10)));
242+
} catch (StreamException e) {
243+
exception.set(e);
244+
}
245+
});
246+
thread.start();
247+
latchAssert(latch).completes();
248+
Thread.sleep(100);
249+
thread.interrupt();
250+
Thread.sleep(100);
251+
assertThat(exception.get())
252+
.isInstanceOf(StreamException.class)
253+
.hasCauseInstanceOf(InterruptedException.class);
254+
}
255+
256+
@Test
257+
void locatorOperationShouldNotRetryAndReThrowUnexpectedException() {
258+
AtomicInteger counter = new AtomicInteger();
259+
assertThatThrownBy(
260+
() ->
261+
StreamEnvironment.locatorOperation(
262+
c -> {
263+
counter.incrementAndGet();
264+
throw new RuntimeException();
265+
},
266+
() -> null,
267+
BackOffDelayPolicy.fixed(Duration.ofMillis(10))))
268+
.isInstanceOf(RuntimeException.class);
269+
assertThat(counter).hasValue(1);
270+
}
176271
}

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ void init() {
111111
.thenCallRealMethod();
112112
when(env.scheduledExecutorService()).thenReturn(executorService);
113113
when(env.locator()).thenReturn(client);
114+
when(env.locatorOperation(any())).thenCallRealMethod();
114115
when(env.clock()).thenReturn(clock);
115116
when(env.codec()).thenReturn(new SimpleCodec());
116117
doAnswer(

0 commit comments

Comments
 (0)