diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc
index 66f678c0d3..cef10e7bf6 100644
--- a/src/docs/asciidoc/api.adoc
+++ b/src/docs/asciidoc/api.adoc
@@ -464,7 +464,12 @@ outstanding unconfirmed messages timed out.
|Time before enqueueing of a message fail when the maximum number of unconfirmed
is reached. The callback of the message will be called with a negative status.
Set the value to `Duration.ZERO` if there should be no timeout.
-|10 seconds.
+|10 seconds
+
+|`retryOnRecovery`
+|Re-publish unconfirmed messages after recovering a connection.
+Set to false if do not want to re-publish unconfirmed messages after recovering a connection.
+|true
|===
==== Sending Messages
diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
index 1d340090b5..853d01ddc5 100644
--- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
@@ -133,6 +133,18 @@ public interface ProducerBuilder {
*/
ProducerBuilder enqueueTimeout(Duration timeout);
+ /**
+ * Re-publish unconfirmed messages after recovering a connection.
+ *
+ *
Default is true.
+ *
+ *
Set to false if do not want to re-publish unconfirmed messages after recovering a connection.
+ *
+ * @param retryOnRecovery
+ * @return this builder instance
+ */
+ ProducerBuilder retryOnRecovery(boolean retryOnRecovery);
+
/**
* Logic to extract a filter value from a message.
*
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
index 7553e2c838..2d84a45a8d 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
@@ -78,6 +78,7 @@ class StreamProducer implements Producer {
entity -> ((AccumulatedEntity) entity).publishingId();
private final long enqueueTimeoutMs;
private final boolean blockOnMaxUnconfirmed;
+ private final boolean retryOnRecovery;
private volatile Client client;
private volatile byte publisherId;
private volatile Status status;
@@ -95,6 +96,7 @@ class StreamProducer implements Producer {
int maxUnconfirmedMessages,
Duration confirmTimeout,
Duration enqueueTimeout,
+ boolean retryOnRecovery,
Function filterValueExtractor,
StreamEnvironment environment) {
if (filterValueExtractor != null && !environment.filteringSupported()) {
@@ -107,6 +109,7 @@ class StreamProducer implements Producer {
this.name = name;
this.stream = stream;
this.enqueueTimeoutMs = enqueueTimeout.toMillis();
+ this.retryOnRecovery = retryOnRecovery;
this.blockOnMaxUnconfirmed = enqueueTimeout.isZero();
this.closingCallback = environment.registerProducer(this, name, this.stream);
final Client.OutboundEntityWriteCallback delegateWriteCallback;
@@ -504,43 +507,58 @@ void unavailable() {
void running() {
synchronized (this) {
- LOGGER.debug(
- "Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
- this.unconfirmedMessages.size(),
- this.accumulator.size());
- if (!this.unconfirmedMessages.isEmpty()) {
- Map messagesToResend = new TreeMap<>(this.unconfirmedMessages);
+ if (!this.retryOnRecovery) {
+ LOGGER.debug(
+ "Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)",
+ this.unconfirmedMessages.size(),
+ this.accumulator.size());
+
this.unconfirmedMessages.clear();
- Iterator> resendIterator =
+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
+ if (toRelease > 0) {
+ unconfirmedMessagesSemaphore.release(toRelease);
+ }
+
+ publishBatch(false);
+ } else {
+ LOGGER.debug(
+ "Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
+ this.unconfirmedMessages.size(),
+ this.accumulator.size());
+ if (!this.unconfirmedMessages.isEmpty()) {
+ Map messagesToResend = new TreeMap<>(this.unconfirmedMessages);
+ this.unconfirmedMessages.clear();
+ Iterator> resendIterator =
messagesToResend.entrySet().iterator();
- while (resendIterator.hasNext()) {
- List