Skip to content

Improve recovery #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 14, 2022
Merged
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
<spotless.check.skip>true</spotless.check.skip>
<slf4j.version>1.7.36</slf4j.version>
<logback.version>1.2.11</logback.version>
<slf4j.perftool.version>2.0.3</slf4j.perftool.version>
<logback.perftool.version>1.3.3</logback.perftool.version>
<slf4j.perftool.version>2.0.6</slf4j.perftool.version>
<logback.perftool.version>1.3.5</logback.perftool.version>
<netty.version>4.1.86.Final</netty.version>
<proton-j.version>0.34.0</proton-j.version>
<metrics.version>4.2.13</metrics.version>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/stream/Environment.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static EnvironmentBuilder builder() {
.getConstructor()
.newInstance();
} catch (Exception e) {
throw new StreamException(e);
throw new StreamException("Error while creating stream environment builder", e);
}
}

Expand Down
79 changes: 45 additions & 34 deletions src/main/java/com/rabbitmq/stream/impl/AsyncRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// [email protected].
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.Utils.namedRunnable;

import com.rabbitmq.stream.BackOffDelayPolicy;
import java.time.Duration;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -42,47 +44,56 @@ private AsyncRetry(
AtomicReference<Runnable> retryableTaskReference = new AtomicReference<>();
AtomicInteger attempts = new AtomicInteger(0);
Runnable retryableTask =
() -> {
if (Thread.currentThread().isInterrupted()) {
LOGGER.debug("Task '{}' interrupted, failing future", description);
this.completableFuture.completeExceptionally(new CancellationException());
return;
}
try {
V result = task.call();
LOGGER.debug("Task '{}' succeeded, completing future", description);
completableFuture.complete(result);
} catch (Exception e) {
int attemptCount = attempts.getAndIncrement();
if (retry.test(e)) {
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
LOGGER.debug(
"Retryable attempts for task '{}' timed out, failing future", description);
this.completableFuture.completeExceptionally(new RetryTimeoutException());
} else {
LOGGER.debug(
"Retryable exception ({}) for task '{}', scheduling another attempt",
e.getClass().getSimpleName(),
description);
scheduler.schedule(
retryableTaskReference.get(),
delayPolicy.delay(attemptCount).toMillis(),
TimeUnit.MILLISECONDS);
namedRunnable(
() -> {
if (Thread.currentThread().isInterrupted()) {
LOGGER.debug("Task '{}' interrupted, failing future", description);
this.completableFuture.completeExceptionally(new CancellationException());
return;
}
try {
V result = task.call();
LOGGER.debug("Task '{}' succeeded, completing future", description);
completableFuture.complete(result);
} catch (Exception e) {
int attemptCount = attempts.getAndIncrement();
if (retry.test(e)) {
if (delayPolicy.delay(attemptCount).equals(BackOffDelayPolicy.TIMEOUT)) {
LOGGER.debug(
"Retryable attempts for task '{}' timed out, failing future", description);
this.completableFuture.completeExceptionally(new RetryTimeoutException());
} else {
LOGGER.debug(
"Retryable exception ({}) for task '{}', scheduling another attempt",
e.getClass().getSimpleName(),
description);
schedule(
scheduler, retryableTaskReference.get(), delayPolicy.delay(attemptCount));
}
} else {
LOGGER.debug(
"Non-retryable exception for task '{}', failing future", description);
this.completableFuture.completeExceptionally(e);
}
}
} else {
LOGGER.debug("Non-retryable exception for task '{}', failing future", description);
this.completableFuture.completeExceptionally(e);
}
}
};
},
description);
retryableTaskReference.set(retryableTask);
Duration initialDelay = delayPolicy.delay(attempts.getAndIncrement());
LOGGER.debug("Scheduling task '{}' with policy {}", description, delayPolicy);
if (initialDelay.isZero()) {
retryableTask.run();
} else {
scheduler.schedule(
retryableTaskReference.get(), initialDelay.toMillis(), TimeUnit.MILLISECONDS);
schedule(scheduler, retryableTaskReference.get(), initialDelay);
}
}

private static void schedule(
ScheduledExecutorService scheduler, Runnable command, Duration delay) {
try {
scheduler.schedule(command, delay.toMillis(), TimeUnit.MILLISECONDS);
} catch (RuntimeException e) {
LOGGER.debug("Error while scheduling command", e);
}
}

Expand Down
Loading