Skip to content

Commit 7d336c1

Browse files
committed
Improve recovery
Create a locator connection for each provided URI. This way a connection can take over when the locator node goes down. This speeds up recovery. Track scheduled tasks. This is likely to be disabled in a stable release. Useful to track down unfinished tasks. Add retry to operations in the consumer coordinator. Refresh consumer candidate nodes if the re-assignment of a consumer times out. This improvements are based on the feedback from the effects of a rolling restart in K8S using stream-perf-test. Not all producers and consumers are recovered after all nodes have been restarted. The changes in this commit mitigates this problem.
1 parent ac6765a commit 7d336c1

22 files changed

+1304
-458
lines changed

src/main/java/com/rabbitmq/stream/impl/AsyncRetry.java

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
17+
1618
import com.rabbitmq.stream.BackOffDelayPolicy;
1719
import java.time.Duration;
1820
import java.util.concurrent.Callable;
@@ -42,47 +44,56 @@ private AsyncRetry(
4244
AtomicReference<Runnable> retryableTaskReference = new AtomicReference<>();
4345
AtomicInteger attempts = new AtomicInteger(0);
4446
Runnable retryableTask =
45-
() -> {
46-
if (Thread.currentThread().isInterrupted()) {
47-
LOGGER.debug("Task '{}' interrupted, failing future", description);
48-
this.completableFuture.completeExceptionally(new CancellationException());
49-
return;
50-
}
51-
try {
52-
V result = task.call();
53-
LOGGER.debug("Task '{}' succeeded, completing future", description);
54-
completableFuture.complete(result);
55-
} catch (Exception e) {
56-
int attemptCount = attempts.getAndIncrement();
57-
if (retry.test(e)) {
58-
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
59-
LOGGER.debug(
60-
"Retryable attempts for task '{}' timed out, failing future", description);
61-
this.completableFuture.completeExceptionally(new RetryTimeoutException());
62-
} else {
63-
LOGGER.debug(
64-
"Retryable exception ({}) for task '{}', scheduling another attempt",
65-
e.getClass().getSimpleName(),
66-
description);
67-
scheduler.schedule(
68-
retryableTaskReference.get(),
69-
delayPolicy.delay(attemptCount).toMillis(),
70-
TimeUnit.MILLISECONDS);
47+
namedRunnable(
48+
() -> {
49+
if (Thread.currentThread().isInterrupted()) {
50+
LOGGER.debug("Task '{}' interrupted, failing future", description);
51+
this.completableFuture.completeExceptionally(new CancellationException());
52+
return;
53+
}
54+
try {
55+
V result = task.call();
56+
LOGGER.debug("Task '{}' succeeded, completing future", description);
57+
completableFuture.complete(result);
58+
} catch (Exception e) {
59+
int attemptCount = attempts.getAndIncrement();
60+
if (retry.test(e)) {
61+
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
62+
LOGGER.debug(
63+
"Retryable attempts for task '{}' timed out, failing future", description);
64+
this.completableFuture.completeExceptionally(new RetryTimeoutException());
65+
} else {
66+
LOGGER.debug(
67+
"Retryable exception ({}) for task '{}', scheduling another attempt",
68+
e.getClass().getSimpleName(),
69+
description);
70+
schedule(
71+
scheduler, retryableTaskReference.get(), delayPolicy.delay(attemptCount));
72+
}
73+
} else {
74+
LOGGER.debug(
75+
"Non-retryable exception for task '{}', failing future", description);
76+
this.completableFuture.completeExceptionally(e);
77+
}
7178
}
72-
} else {
73-
LOGGER.debug("Non-retryable exception for task '{}', failing future", description);
74-
this.completableFuture.completeExceptionally(e);
75-
}
76-
}
77-
};
79+
},
80+
description);
7881
retryableTaskReference.set(retryableTask);
7982
Duration initialDelay = delayPolicy.delay(attempts.getAndIncrement());
8083
LOGGER.debug("Scheduling task '{}' with policy {}", description, delayPolicy);
8184
if (initialDelay.isZero()) {
8285
retryableTask.run();
8386
} else {
84-
scheduler.schedule(
85-
retryableTaskReference.get(), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
87+
schedule(scheduler, retryableTaskReference.get(), initialDelay);
88+
}
89+
}
90+
91+
private static void schedule(
92+
ScheduledExecutorService scheduler, Runnable command, Duration delay) {
93+
try {
94+
scheduler.schedule(command, delay.toMillis(), TimeUnit.MILLISECONDS);
95+
} catch (RuntimeException e) {
96+
LOGGER.debug("Error while scheduling command", e);
8697
}
8798
}
8899

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,18 @@ public void write(
308308
});
309309

310310
ChannelFuture f;
311+
String clientConnectionName =
312+
parameters.clientProperties == null
313+
? ""
314+
: (parameters.clientProperties.containsKey("connection_name")
315+
? parameters.clientProperties.get("connection_name")
316+
: "");
311317
try {
312-
LOGGER.debug("Trying to create stream connection to {}:{}", parameters.host, parameters.port);
318+
LOGGER.debug(
319+
"Trying to create stream connection to {}:{}, with client connection name '{}'",
320+
parameters.host,
321+
parameters.port,
322+
clientConnectionName);
313323
f = b.connect(parameters.host, parameters.port).sync();
314324
this.host = parameters.host;
315325
this.port = parameters.port;
@@ -1113,6 +1123,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
11131123
request.block();
11141124
QueryOffsetResponse response = request.response.get();
11151125
return response;
1126+
} catch (StreamException e) {
1127+
outstandingRequests.remove(correlationId);
1128+
throw e;
11161129
} catch (RuntimeException e) {
11171130
outstandingRequests.remove(correlationId);
11181131
throw new StreamException(e);
@@ -2284,7 +2297,7 @@ void block() {
22842297
throw new StreamException("Interrupted while waiting for response");
22852298
}
22862299
if (!completed) {
2287-
throw new StreamException("Could not get response in " + timeout.toMillis() + " ms");
2300+
throw new TimeoutStreamException("Could not get response in " + timeout.toMillis() + " ms");
22882301
}
22892302
}
22902303

0 commit comments

Comments
 (0)