Skip to content

Commit 48f58c2

Browse files
authored
Merge pull request #642 from laststem/producer-republish-optional
optional retry on recovery
2 parents 9293501 + ce5b04b commit 48f58c2

File tree

6 files changed

+81
-34
lines changed

6 files changed

+81
-34
lines changed

src/docs/asciidoc/api.adoc

+6-1
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,12 @@ outstanding unconfirmed messages timed out.
464464
|Time before enqueueing of a message fail when the maximum number of unconfirmed
465465
is reached. The callback of the message will be called with a negative status.
466466
Set the value to `Duration.ZERO` if there should be no timeout.
467-
|10 seconds.
467+
|10 seconds
468+
469+
|`retryOnRecovery`
470+
|Re-publish unconfirmed messages after recovering a connection.
471+
Set to false if do not want to re-publish unconfirmed messages after recovering a connection.
472+
|true
468473
|===
469474

470475
==== Sending Messages

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+12
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,18 @@ public interface ProducerBuilder {
133133
*/
134134
ProducerBuilder enqueueTimeout(Duration timeout);
135135

136+
/**
137+
* Re-publish unconfirmed messages after recovering a connection.
138+
*
139+
* <p>Default is true.</p>
140+
*
141+
* <p>Set to false if do not want to re-publish unconfirmed messages after recovering a connection.</p>
142+
*
143+
* @param retryOnRecovery
144+
* @return this builder instance
145+
*/
146+
ProducerBuilder retryOnRecovery(boolean retryOnRecovery);
147+
136148
/**
137149
* Logic to extract a filter value from a message.
138150
*

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

+50-32
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class StreamProducer implements Producer {
7878
entity -> ((AccumulatedEntity) entity).publishingId();
7979
private final long enqueueTimeoutMs;
8080
private final boolean blockOnMaxUnconfirmed;
81+
private final boolean retryOnRecovery;
8182
private volatile Client client;
8283
private volatile byte publisherId;
8384
private volatile Status status;
@@ -95,6 +96,7 @@ class StreamProducer implements Producer {
9596
int maxUnconfirmedMessages,
9697
Duration confirmTimeout,
9798
Duration enqueueTimeout,
99+
boolean retryOnRecovery,
98100
Function<Message, String> filterValueExtractor,
99101
StreamEnvironment environment) {
100102
if (filterValueExtractor != null && !environment.filteringSupported()) {
@@ -107,6 +109,7 @@ class StreamProducer implements Producer {
107109
this.name = name;
108110
this.stream = stream;
109111
this.enqueueTimeoutMs = enqueueTimeout.toMillis();
112+
this.retryOnRecovery = retryOnRecovery;
110113
this.blockOnMaxUnconfirmed = enqueueTimeout.isZero();
111114
this.closingCallback = environment.registerProducer(this, name, this.stream);
112115
final Client.OutboundEntityWriteCallback delegateWriteCallback;
@@ -504,43 +507,58 @@ void unavailable() {
504507

505508
void running() {
506509
synchronized (this) {
507-
LOGGER.debug(
508-
"Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
509-
this.unconfirmedMessages.size(),
510-
this.accumulator.size());
511-
if (!this.unconfirmedMessages.isEmpty()) {
512-
Map<Long, AccumulatedEntity> messagesToResend = new TreeMap<>(this.unconfirmedMessages);
510+
if (!this.retryOnRecovery) {
511+
LOGGER.debug(
512+
"Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)",
513+
this.unconfirmedMessages.size(),
514+
this.accumulator.size());
515+
513516
this.unconfirmedMessages.clear();
514-
Iterator<Entry<Long, AccumulatedEntity>> resendIterator =
517+
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
518+
if (toRelease > 0) {
519+
unconfirmedMessagesSemaphore.release(toRelease);
520+
}
521+
522+
publishBatch(false);
523+
} else {
524+
LOGGER.debug(
525+
"Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
526+
this.unconfirmedMessages.size(),
527+
this.accumulator.size());
528+
if (!this.unconfirmedMessages.isEmpty()) {
529+
Map<Long, AccumulatedEntity> messagesToResend = new TreeMap<>(this.unconfirmedMessages);
530+
this.unconfirmedMessages.clear();
531+
Iterator<Entry<Long, AccumulatedEntity>> resendIterator =
515532
messagesToResend.entrySet().iterator();
516-
while (resendIterator.hasNext()) {
517-
List<Object> messages = new ArrayList<>(this.batchSize);
518-
int batchCount = 0;
519-
while (batchCount != this.batchSize) {
520-
Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null;
521-
if (accMessage == null) {
522-
break;
533+
while (resendIterator.hasNext()) {
534+
List<Object> messages = new ArrayList<>(this.batchSize);
535+
int batchCount = 0;
536+
while (batchCount != this.batchSize) {
537+
Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null;
538+
if (accMessage == null) {
539+
break;
540+
}
541+
messages.add(accMessage);
542+
batchCount++;
523543
}
524-
messages.add(accMessage);
525-
batchCount++;
544+
client.publishInternal(
545+
this.publishVersion,
546+
this.publisherId,
547+
messages,
548+
this.writeCallback,
549+
this.publishSequenceFunction);
526550
}
527-
client.publishInternal(
528-
this.publishVersion,
529-
this.publisherId,
530-
messages,
531-
this.writeCallback,
532-
this.publishSequenceFunction);
533551
}
534-
}
535-
publishBatch(false);
536-
537-
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
538-
if (toRelease > 0) {
539-
unconfirmedMessagesSemaphore.release(toRelease);
540-
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
541-
LOGGER.debug(
542-
"Could not acquire {} permit(s) for message republishing",
543-
this.unconfirmedMessages.size());
552+
publishBatch(false);
553+
554+
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
555+
if (toRelease > 0) {
556+
unconfirmedMessagesSemaphore.release(toRelease);
557+
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
558+
LOGGER.debug(
559+
"Could not acquire {} permit(s) for message republishing",
560+
this.unconfirmedMessages.size());
561+
}
544562
}
545563
}
546564
}

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+9
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class StreamProducerBuilder implements ProducerBuilder {
4747

4848
private Duration enqueueTimeout = Duration.ofSeconds(10);
4949

50+
private boolean retryOnRecovery = true;
51+
5052
private DefaultRoutingConfiguration routingConfiguration;
5153

5254
private Function<Message, String> filterValueExtractor;
@@ -131,6 +133,12 @@ public ProducerBuilder enqueueTimeout(Duration timeout) {
131133
return this;
132134
}
133135

136+
@Override
137+
public ProducerBuilder retryOnRecovery(boolean retryOnRecovery) {
138+
this.retryOnRecovery = retryOnRecovery;
139+
return this;
140+
}
141+
134142
@Override
135143
public ProducerBuilder filterValue(Function<Message, String> filterValueExtractor) {
136144
this.filterValueExtractor = filterValueExtractor;
@@ -195,6 +203,7 @@ public Producer build() {
195203
maxUnconfirmedMessages,
196204
confirmTimeout,
197205
enqueueTimeout,
206+
retryOnRecovery,
198207
filterValueExtractor,
199208
environment);
200209
this.environment.addProducer((StreamProducer) producer);

src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void sendToNonExistingStreamShouldReturnUnconfirmedStatus() throws Exception {
243243
@TestUtils.DisabledIfRabbitMqCtlNotSet
244244
void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
245245
Producer producer =
246-
environment.producerBuilder().subEntrySize(subEntrySize).stream(stream).build();
246+
environment.producerBuilder().subEntrySize(subEntrySize).retryOnRecovery(true).stream(stream).build();
247247

248248
AtomicInteger published = new AtomicInteger(0);
249249
AtomicInteger confirmed = new AtomicInteger(0);

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

+3
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
177177
messageCount * 10,
178178
confirmTimeout,
179179
Duration.ofSeconds(10),
180+
true,
180181
null,
181182
env);
182183

@@ -219,6 +220,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
219220
2,
220221
Duration.ofMinutes(1),
221222
enqueueTimeout,
223+
true,
222224
null,
223225
env);
224226

@@ -258,6 +260,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
258260
2,
259261
Duration.ofMinutes(1),
260262
enqueueTimeout,
263+
true,
261264
null,
262265
env);
263266

0 commit comments

Comments
 (0)