Skip to content

Add flag to opt-out republishing unconfirmed messages on producer recovery #642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,12 @@ outstanding unconfirmed messages timed out.
|Time before enqueueing of a message fail when the maximum number of unconfirmed
is reached. The callback of the message will be called with a negative status.
Set the value to `Duration.ZERO` if there should be no timeout.
|10 seconds.
|10 seconds

|`retryOnRecovery`
|Re-publish unconfirmed messages after recovering a connection.
Set to false if do not want to re-publish unconfirmed messages after recovering a connection.
|true
|===

==== Sending Messages
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ public interface ProducerBuilder {
*/
ProducerBuilder enqueueTimeout(Duration timeout);

/**
* Re-publish unconfirmed messages after recovering a connection.
*
* <p>Default is true.</p>
*
* <p>Set to false if do not want to re-publish unconfirmed messages after recovering a connection.</p>
*
* @param retryOnRecovery
* @return this builder instance
*/
ProducerBuilder retryOnRecovery(boolean retryOnRecovery);

/**
* Logic to extract a filter value from a message.
*
Expand Down
82 changes: 50 additions & 32 deletions src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class StreamProducer implements Producer {
entity -> ((AccumulatedEntity) entity).publishingId();
private final long enqueueTimeoutMs;
private final boolean blockOnMaxUnconfirmed;
private final boolean retryOnRecovery;
private volatile Client client;
private volatile byte publisherId;
private volatile Status status;
Expand All @@ -95,6 +96,7 @@ class StreamProducer implements Producer {
int maxUnconfirmedMessages,
Duration confirmTimeout,
Duration enqueueTimeout,
boolean retryOnRecovery,
Function<Message, String> filterValueExtractor,
StreamEnvironment environment) {
if (filterValueExtractor != null && !environment.filteringSupported()) {
Expand All @@ -107,6 +109,7 @@ class StreamProducer implements Producer {
this.name = name;
this.stream = stream;
this.enqueueTimeoutMs = enqueueTimeout.toMillis();
this.retryOnRecovery = retryOnRecovery;
this.blockOnMaxUnconfirmed = enqueueTimeout.isZero();
this.closingCallback = environment.registerProducer(this, name, this.stream);
final Client.OutboundEntityWriteCallback delegateWriteCallback;
Expand Down Expand Up @@ -504,43 +507,58 @@ void unavailable() {

void running() {
synchronized (this) {
LOGGER.debug(
"Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
this.unconfirmedMessages.size(),
this.accumulator.size());
if (!this.unconfirmedMessages.isEmpty()) {
Map<Long, AccumulatedEntity> messagesToResend = new TreeMap<>(this.unconfirmedMessages);
if (!this.retryOnRecovery) {
LOGGER.debug(
"Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)",
this.unconfirmedMessages.size(),
this.accumulator.size());

this.unconfirmedMessages.clear();
Iterator<Entry<Long, AccumulatedEntity>> resendIterator =
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
if (toRelease > 0) {
unconfirmedMessagesSemaphore.release(toRelease);
}

publishBatch(false);
} else {
LOGGER.debug(
"Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
this.unconfirmedMessages.size(),
this.accumulator.size());
if (!this.unconfirmedMessages.isEmpty()) {
Map<Long, AccumulatedEntity> messagesToResend = new TreeMap<>(this.unconfirmedMessages);
this.unconfirmedMessages.clear();
Iterator<Entry<Long, AccumulatedEntity>> resendIterator =
messagesToResend.entrySet().iterator();
while (resendIterator.hasNext()) {
List<Object> messages = new ArrayList<>(this.batchSize);
int batchCount = 0;
while (batchCount != this.batchSize) {
Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null;
if (accMessage == null) {
break;
while (resendIterator.hasNext()) {
List<Object> messages = new ArrayList<>(this.batchSize);
int batchCount = 0;
while (batchCount != this.batchSize) {
Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null;
if (accMessage == null) {
break;
}
messages.add(accMessage);
batchCount++;
}
messages.add(accMessage);
batchCount++;
client.publishInternal(
this.publishVersion,
this.publisherId,
messages,
this.writeCallback,
this.publishSequenceFunction);
}
client.publishInternal(
this.publishVersion,
this.publisherId,
messages,
this.writeCallback,
this.publishSequenceFunction);
}
}
publishBatch(false);

int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
if (toRelease > 0) {
unconfirmedMessagesSemaphore.release(toRelease);
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
LOGGER.debug(
"Could not acquire {} permit(s) for message republishing",
this.unconfirmedMessages.size());
publishBatch(false);

int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
if (toRelease > 0) {
unconfirmedMessagesSemaphore.release(toRelease);
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
LOGGER.debug(
"Could not acquire {} permit(s) for message republishing",
this.unconfirmedMessages.size());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class StreamProducerBuilder implements ProducerBuilder {

private Duration enqueueTimeout = Duration.ofSeconds(10);

private boolean retryOnRecovery = true;

private DefaultRoutingConfiguration routingConfiguration;

private Function<Message, String> filterValueExtractor;
Expand Down Expand Up @@ -131,6 +133,12 @@ public ProducerBuilder enqueueTimeout(Duration timeout) {
return this;
}

@Override
public ProducerBuilder retryOnRecovery(boolean retryOnRecovery) {
this.retryOnRecovery = retryOnRecovery;
return this;
}

@Override
public ProducerBuilder filterValue(Function<Message, String> filterValueExtractor) {
this.filterValueExtractor = filterValueExtractor;
Expand Down Expand Up @@ -195,6 +203,7 @@ public Producer build() {
maxUnconfirmedMessages,
confirmTimeout,
enqueueTimeout,
retryOnRecovery,
filterValueExtractor,
environment);
this.environment.addProducer((StreamProducer) producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void sendToNonExistingStreamShouldReturnUnconfirmedStatus() throws Exception {
@TestUtils.DisabledIfRabbitMqCtlNotSet
void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
Producer producer =
environment.producerBuilder().subEntrySize(subEntrySize).stream(stream).build();
environment.producerBuilder().subEntrySize(subEntrySize).retryOnRecovery(true).stream(stream).build();

AtomicInteger published = new AtomicInteger(0);
AtomicInteger confirmed = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
messageCount * 10,
confirmTimeout,
Duration.ofSeconds(10),
true,
null,
env);

Expand Down Expand Up @@ -219,6 +220,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
2,
Duration.ofMinutes(1),
enqueueTimeout,
true,
null,
env);

Expand Down Expand Up @@ -258,6 +260,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
2,
Duration.ofMinutes(1),
enqueueTimeout,
true,
null,
env);

Expand Down