From c1545f6b38927a98ed4bd6d7263887046483c1e2 Mon Sep 17 00:00:00 2001 From: laststem Date: Thu, 24 Oct 2024 11:14:53 +0900 Subject: [PATCH 1/2] optional retry on recovery --- src/docs/asciidoc/api.adoc | 7 +- .../com/rabbitmq/stream/ProducerBuilder.java | 12 +++ .../rabbitmq/stream/impl/StreamProducer.java | 82 +++++++++++-------- .../stream/impl/StreamProducerBuilder.java | 9 ++ .../stream/impl/StreamProducerTest.java | 2 +- .../stream/impl/StreamProducerUnitTest.java | 3 + 6 files changed, 81 insertions(+), 34 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 66f678c0d3..951c0b0cfd 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -464,7 +464,12 @@ outstanding unconfirmed messages timed out. |Time before enqueueing of a message fail when the maximum number of unconfirmed is reached. The callback of the message will be called with a negative status. Set the value to `Duration.ZERO` if there should be no timeout. -|10 seconds. +|10 seconds + +|`retryOnRecovery` +|Re-publish unconfirmed messages when restoring a connection. +Set to false if do not want to re-publish unconfirmed messages when restoring a connection. +|true |=== ==== Sending Messages diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 1d340090b5..9c8b6c145d 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -133,6 +133,18 @@ public interface ProducerBuilder { */ ProducerBuilder enqueueTimeout(Duration timeout); + /** + * Re-publish unconfirmed messages when restoring a connection. + * + *

Default is true.

+ * + *

Set to false if do not want to re-publish unconfirmed messages when restoring a connection.

+ * + * @param retryOnRecovery + * @return this builder instance + */ + ProducerBuilder retryOnRecovery(boolean retryOnRecovery); + /** * Logic to extract a filter value from a message. * diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 7553e2c838..2d84a45a8d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -78,6 +78,7 @@ class StreamProducer implements Producer { entity -> ((AccumulatedEntity) entity).publishingId(); private final long enqueueTimeoutMs; private final boolean blockOnMaxUnconfirmed; + private final boolean retryOnRecovery; private volatile Client client; private volatile byte publisherId; private volatile Status status; @@ -95,6 +96,7 @@ class StreamProducer implements Producer { int maxUnconfirmedMessages, Duration confirmTimeout, Duration enqueueTimeout, + boolean retryOnRecovery, Function filterValueExtractor, StreamEnvironment environment) { if (filterValueExtractor != null && !environment.filteringSupported()) { @@ -107,6 +109,7 @@ class StreamProducer implements Producer { this.name = name; this.stream = stream; this.enqueueTimeoutMs = enqueueTimeout.toMillis(); + this.retryOnRecovery = retryOnRecovery; this.blockOnMaxUnconfirmed = enqueueTimeout.isZero(); this.closingCallback = environment.registerProducer(this, name, this.stream); final Client.OutboundEntityWriteCallback delegateWriteCallback; @@ -504,43 +507,58 @@ void unavailable() { void running() { synchronized (this) { - LOGGER.debug( - "Re-publishing {} unconfirmed message(s) and {} accumulated message(s)", - this.unconfirmedMessages.size(), - this.accumulator.size()); - if (!this.unconfirmedMessages.isEmpty()) { - Map messagesToResend = new TreeMap<>(this.unconfirmedMessages); + if (!this.retryOnRecovery) { + LOGGER.debug( + "Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)", + this.unconfirmedMessages.size(), + this.accumulator.size()); + this.unconfirmedMessages.clear(); - Iterator> resendIterator = + int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); + if (toRelease > 0) { + unconfirmedMessagesSemaphore.release(toRelease); + } + + publishBatch(false); + } else { + LOGGER.debug( + "Re-publishing {} unconfirmed message(s) and {} accumulated message(s)", + this.unconfirmedMessages.size(), + this.accumulator.size()); + if (!this.unconfirmedMessages.isEmpty()) { + Map messagesToResend = new TreeMap<>(this.unconfirmedMessages); + this.unconfirmedMessages.clear(); + Iterator> resendIterator = messagesToResend.entrySet().iterator(); - while (resendIterator.hasNext()) { - List messages = new ArrayList<>(this.batchSize); - int batchCount = 0; - while (batchCount != this.batchSize) { - Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null; - if (accMessage == null) { - break; + while (resendIterator.hasNext()) { + List messages = new ArrayList<>(this.batchSize); + int batchCount = 0; + while (batchCount != this.batchSize) { + Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null; + if (accMessage == null) { + break; + } + messages.add(accMessage); + batchCount++; } - messages.add(accMessage); - batchCount++; + client.publishInternal( + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); } - client.publishInternal( - this.publishVersion, - this.publisherId, - messages, - this.writeCallback, - this.publishSequenceFunction); } - } - publishBatch(false); - - int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); - if (toRelease > 0) { - unconfirmedMessagesSemaphore.release(toRelease); - if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) { - LOGGER.debug( - "Could not acquire {} permit(s) for message republishing", - this.unconfirmedMessages.size()); + publishBatch(false); + + int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); + if (toRelease > 0) { + unconfirmedMessagesSemaphore.release(toRelease); + if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) { + LOGGER.debug( + "Could not acquire {} permit(s) for message republishing", + this.unconfirmedMessages.size()); + } } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 0497c53e64..493426c67f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -47,6 +47,8 @@ class StreamProducerBuilder implements ProducerBuilder { private Duration enqueueTimeout = Duration.ofSeconds(10); + private boolean retryOnRecovery = true; + private DefaultRoutingConfiguration routingConfiguration; private Function filterValueExtractor; @@ -131,6 +133,12 @@ public ProducerBuilder enqueueTimeout(Duration timeout) { return this; } + @Override + public ProducerBuilder retryOnRecovery(boolean retryOnRecovery) { + this.retryOnRecovery = retryOnRecovery; + return this; + } + @Override public ProducerBuilder filterValue(Function filterValueExtractor) { this.filterValueExtractor = filterValueExtractor; @@ -195,6 +203,7 @@ public Producer build() { maxUnconfirmedMessages, confirmTimeout, enqueueTimeout, + retryOnRecovery, filterValueExtractor, environment); this.environment.addProducer((StreamProducer) producer); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 6cf06356d0..07a3132eca 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -243,7 +243,7 @@ void sendToNonExistingStreamShouldReturnUnconfirmedStatus() throws Exception { @TestUtils.DisabledIfRabbitMqCtlNotSet void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception { Producer producer = - environment.producerBuilder().subEntrySize(subEntrySize).stream(stream).build(); + environment.producerBuilder().subEntrySize(subEntrySize).retryOnRecovery(true).stream(stream).build(); AtomicInteger published = new AtomicInteger(0); AtomicInteger confirmed = new AtomicInteger(0); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 92fa41d124..28c87bfbc2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -177,6 +177,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( messageCount * 10, confirmTimeout, Duration.ofSeconds(10), + true, null, env); @@ -219,6 +220,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry 2, Duration.ofMinutes(1), enqueueTimeout, + true, null, env); @@ -258,6 +260,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize) 2, Duration.ofMinutes(1), enqueueTimeout, + true, null, env); From ce5b04b391c4b5f0cd6ffbab28a140e21ce22a93 Mon Sep 17 00:00:00 2001 From: laststem Date: Thu, 24 Oct 2024 12:01:18 +0900 Subject: [PATCH 2/2] fix description --- src/docs/asciidoc/api.adoc | 4 ++-- src/main/java/com/rabbitmq/stream/ProducerBuilder.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 951c0b0cfd..cef10e7bf6 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -467,8 +467,8 @@ Set the value to `Duration.ZERO` if there should be no timeout. |10 seconds |`retryOnRecovery` -|Re-publish unconfirmed messages when restoring a connection. -Set to false if do not want to re-publish unconfirmed messages when restoring a connection. +|Re-publish unconfirmed messages after recovering a connection. +Set to false if do not want to re-publish unconfirmed messages after recovering a connection. |true |=== diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 9c8b6c145d..853d01ddc5 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -134,11 +134,11 @@ public interface ProducerBuilder { ProducerBuilder enqueueTimeout(Duration timeout); /** - * Re-publish unconfirmed messages when restoring a connection. + * Re-publish unconfirmed messages after recovering a connection. * *

Default is true.

* - *

Set to false if do not want to re-publish unconfirmed messages when restoring a connection.

+ *

Set to false if do not want to re-publish unconfirmed messages after recovering a connection.

* * @param retryOnRecovery * @return this builder instance