Skip to content

Commit b329976

Browse files
authored
Merge pull request #245 from rabbitmq/recovery-improvements
Improve recovery
2 parents 2fa0482 + 53cda39 commit b329976

30 files changed

+2631
-984
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@
5151
<spotless.check.skip>true</spotless.check.skip>
5252
<slf4j.version>1.7.36</slf4j.version>
5353
<logback.version>1.2.11</logback.version>
54-
<slf4j.perftool.version>2.0.3</slf4j.perftool.version>
55-
<logback.perftool.version>1.3.3</logback.perftool.version>
54+
<slf4j.perftool.version>2.0.6</slf4j.perftool.version>
55+
<logback.perftool.version>1.3.5</logback.perftool.version>
5656
<netty.version>4.1.86.Final</netty.version>
5757
<proton-j.version>0.34.0</proton-j.version>
5858
<metrics.version>4.2.13</metrics.version>

src/main/java/com/rabbitmq/stream/Environment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ static EnvironmentBuilder builder() {
4242
.getConstructor()
4343
.newInstance();
4444
} catch (Exception e) {
45-
throw new StreamException(e);
45+
throw new StreamException("Error while creating stream environment builder", e);
4646
}
4747
}
4848

src/main/java/com/rabbitmq/stream/impl/AsyncRetry.java

Lines changed: 45 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
17+
1618
import com.rabbitmq.stream.BackOffDelayPolicy;
1719
import java.time.Duration;
1820
import java.util.concurrent.Callable;
@@ -42,47 +44,56 @@ private AsyncRetry(
4244
AtomicReference<Runnable> retryableTaskReference = new AtomicReference<>();
4345
AtomicInteger attempts = new AtomicInteger(0);
4446
Runnable retryableTask =
45-
() -> {
46-
if (Thread.currentThread().isInterrupted()) {
47-
LOGGER.debug("Task '{}' interrupted, failing future", description);
48-
this.completableFuture.completeExceptionally(new CancellationException());
49-
return;
50-
}
51-
try {
52-
V result = task.call();
53-
LOGGER.debug("Task '{}' succeeded, completing future", description);
54-
completableFuture.complete(result);
55-
} catch (Exception e) {
56-
int attemptCount = attempts.getAndIncrement();
57-
if (retry.test(e)) {
58-
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
59-
LOGGER.debug(
60-
"Retryable attempts for task '{}' timed out, failing future", description);
61-
this.completableFuture.completeExceptionally(new RetryTimeoutException());
62-
} else {
63-
LOGGER.debug(
64-
"Retryable exception ({}) for task '{}', scheduling another attempt",
65-
e.getClass().getSimpleName(),
66-
description);
67-
scheduler.schedule(
68-
retryableTaskReference.get(),
69-
delayPolicy.delay(attemptCount).toMillis(),
70-
TimeUnit.MILLISECONDS);
47+
namedRunnable(
48+
() -> {
49+
if (Thread.currentThread().isInterrupted()) {
50+
LOGGER.debug("Task '{}' interrupted, failing future", description);
51+
this.completableFuture.completeExceptionally(new CancellationException());
52+
return;
53+
}
54+
try {
55+
V result = task.call();
56+
LOGGER.debug("Task '{}' succeeded, completing future", description);
57+
completableFuture.complete(result);
58+
} catch (Exception e) {
59+
int attemptCount = attempts.getAndIncrement();
60+
if (retry.test(e)) {
61+
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
62+
LOGGER.debug(
63+
"Retryable attempts for task '{}' timed out, failing future", description);
64+
this.completableFuture.completeExceptionally(new RetryTimeoutException());
65+
} else {
66+
LOGGER.debug(
67+
"Retryable exception ({}) for task '{}', scheduling another attempt",
68+
e.getClass().getSimpleName(),
69+
description);
70+
schedule(
71+
scheduler, retryableTaskReference.get(), delayPolicy.delay(attemptCount));
72+
}
73+
} else {
74+
LOGGER.debug(
75+
"Non-retryable exception for task '{}', failing future", description);
76+
this.completableFuture.completeExceptionally(e);
77+
}
7178
}
72-
} else {
73-
LOGGER.debug("Non-retryable exception for task '{}', failing future", description);
74-
this.completableFuture.completeExceptionally(e);
75-
}
76-
}
77-
};
79+
},
80+
description);
7881
retryableTaskReference.set(retryableTask);
7982
Duration initialDelay = delayPolicy.delay(attempts.getAndIncrement());
8083
LOGGER.debug("Scheduling task '{}' with policy {}", description, delayPolicy);
8184
if (initialDelay.isZero()) {
8285
retryableTask.run();
8386
} else {
84-
scheduler.schedule(
85-
retryableTaskReference.get(), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
87+
schedule(scheduler, retryableTaskReference.get(), initialDelay);
88+
}
89+
}
90+
91+
private static void schedule(
92+
ScheduledExecutorService scheduler, Runnable command, Duration delay) {
93+
try {
94+
scheduler.schedule(command, delay.toMillis(), TimeUnit.MILLISECONDS);
95+
} catch (RuntimeException e) {
96+
LOGGER.debug("Error while scheduling command", e);
8697
}
8798
}
8899

0 commit comments

Comments
 (0)