Skip to content

Commit 6df1e05

Browse files
committed
Refine producer retryOnRecovery support
References #642
1 parent 48f58c2 commit 6df1e05

File tree

5 files changed

+47
-40
lines changed

5 files changed

+47
-40
lines changed

src/docs/asciidoc/api.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -467,8 +467,8 @@ Set the value to `Duration.ZERO` if there should be no timeout.
467467
|10 seconds
468468

469469
|`retryOnRecovery`
470-
|Re-publish unconfirmed messages after recovering a connection.
471-
Set to false if do not want to re-publish unconfirmed messages after recovering a connection.
470+
|Whether to republish unconfirmed messages after recovery.
471+
Set to `false` to not republish unconfirmed messages and get a negative `ConfirmationStatus` for unconfirmed messages.
472472
|true
473473
|===
474474

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,17 @@ public interface ProducerBuilder {
134134
ProducerBuilder enqueueTimeout(Duration timeout);
135135

136136
/**
137-
* Re-publish unconfirmed messages after recovering a connection.
137+
* Whether to republish unconfirmed messages after recovery.
138138
*
139-
* <p>Default is true.</p>
139+
* <p>Default is <code>true</code> (unconfirmed messages are republished after recovery).
140140
*
141-
* <p>Set to false if do not want to re-publish unconfirmed messages after recovering a connection.</p>
141+
* <p>Set to <code>false</code> to not republish unconfirmed messages and get a negative {@link
142+
* ConfirmationStatus} for unconfirmed messages.
142143
*
143-
* @param retryOnRecovery
144+
* <p>Note setting this flag to <code>false</code> translates to at-most-once semantics, that is
145+
* published messages may be lost, unless the publishing application retries publishing them.
146+
*
147+
* @param retryOnRecovery retry flag
144148
* @return this builder instance
145149
*/
146150
ProducerBuilder retryOnRecovery(boolean retryOnRecovery);

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

+34-29
Original file line numberDiff line numberDiff line change
@@ -507,34 +507,23 @@ void unavailable() {
507507

508508
void running() {
509509
synchronized (this) {
510-
if (!this.retryOnRecovery) {
511-
LOGGER.debug(
512-
"Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)",
513-
this.unconfirmedMessages.size(),
514-
this.accumulator.size());
515-
516-
this.unconfirmedMessages.clear();
517-
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
518-
if (toRelease > 0) {
519-
unconfirmedMessagesSemaphore.release(toRelease);
520-
}
521-
522-
publishBatch(false);
523-
} else {
524-
LOGGER.debug(
525-
"Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
526-
this.unconfirmedMessages.size(),
527-
this.accumulator.size());
510+
LOGGER.debug(
511+
"Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)",
512+
this.unconfirmedMessages.size(),
513+
this.accumulator.size());
514+
if (this.retryOnRecovery) {
515+
LOGGER.debug("Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size());
528516
if (!this.unconfirmedMessages.isEmpty()) {
529517
Map<Long, AccumulatedEntity> messagesToResend = new TreeMap<>(this.unconfirmedMessages);
530518
this.unconfirmedMessages.clear();
531519
Iterator<Entry<Long, AccumulatedEntity>> resendIterator =
532-
messagesToResend.entrySet().iterator();
520+
messagesToResend.entrySet().iterator();
533521
while (resendIterator.hasNext()) {
534522
List<Object> messages = new ArrayList<>(this.batchSize);
535523
int batchCount = 0;
536524
while (batchCount != this.batchSize) {
537-
Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null;
525+
Object accMessage =
526+
resendIterator.hasNext() ? resendIterator.next().getValue() : null;
538527
if (accMessage == null) {
539528
break;
540529
}
@@ -549,18 +538,34 @@ void running() {
549538
this.publishSequenceFunction);
550539
}
551540
}
552-
publishBatch(false);
553-
554-
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
555-
if (toRelease > 0) {
556-
unconfirmedMessagesSemaphore.release(toRelease);
557-
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
558-
LOGGER.debug(
559-
"Could not acquire {} permit(s) for message republishing",
560-
this.unconfirmedMessages.size());
541+
} else {
542+
LOGGER.debug(
543+
"Skipping republishing of {} unconfirmed messages", this.unconfirmedMessages.size());
544+
Map<Long, AccumulatedEntity> messagesToFail = new TreeMap<>(this.unconfirmedMessages);
545+
this.unconfirmedMessages.clear();
546+
for (AccumulatedEntity accumulatedEntity : messagesToFail.values()) {
547+
try {
548+
int permits =
549+
accumulatedEntity
550+
.confirmationCallback()
551+
.handle(false, CODE_PUBLISH_CONFIRM_TIMEOUT);
552+
this.unconfirmedMessagesSemaphore.release(permits);
553+
} catch (Exception e) {
554+
LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage());
555+
this.unconfirmedMessagesSemaphore.release(1);
561556
}
562557
}
563558
}
559+
publishBatch(false);
560+
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
561+
if (toRelease > 0) {
562+
unconfirmedMessagesSemaphore.release(toRelease);
563+
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
564+
LOGGER.debug(
565+
"Could not acquire {} permit(s) for message republishing",
566+
this.unconfirmedMessages.size());
567+
}
568+
}
564569
}
565570
this.status = Status.RUNNING;
566571
}

src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.stream.StreamDoesNotExistException;
3838
import com.rabbitmq.stream.impl.Client.Response;
3939
import com.rabbitmq.stream.impl.Utils.ClientFactory;
40+
import io.netty.channel.ConnectTimeoutException;
4041
import java.time.Duration;
4142
import java.util.ArrayList;
4243
import java.util.Arrays;
@@ -46,8 +47,6 @@
4647
import java.util.concurrent.atomic.AtomicInteger;
4748
import java.util.concurrent.atomic.AtomicReference;
4849
import java.util.stream.IntStream;
49-
50-
import io.netty.channel.ConnectTimeoutException;
5150
import org.junit.jupiter.api.AfterEach;
5251
import org.junit.jupiter.api.BeforeEach;
5352
import org.junit.jupiter.api.Test;
@@ -309,8 +308,7 @@ void shouldRecoverOnConnectionTimeout() throws Exception {
309308
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
310309
Duration retryDelay = Duration.ofMillis(50);
311310
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
312-
when(locator.metadata("stream"))
313-
.thenReturn(metadata(leader(), replicas()));
311+
when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
314312

315313
when(clientFactory.client(any()))
316314
.thenReturn(client)

src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void sendToNonExistingStreamShouldReturnUnconfirmedStatus() throws Exception {
243243
@TestUtils.DisabledIfRabbitMqCtlNotSet
244244
void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
245245
Producer producer =
246-
environment.producerBuilder().subEntrySize(subEntrySize).retryOnRecovery(true).stream(stream).build();
246+
environment.producerBuilder().subEntrySize(subEntrySize).stream(stream).build();
247247

248248
AtomicInteger published = new AtomicInteger(0);
249249
AtomicInteger confirmed = new AtomicInteger(0);

0 commit comments

Comments
 (0)