// Copyright (c) $YEAR Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc
index 28eb063bc6..f7edd8fffd 100644
--- a/src/docs/asciidoc/api.adoc
+++ b/src/docs/asciidoc/api.adoc
@@ -455,6 +455,10 @@ blocking when the limit is reached.
|Period to send a batch of messages.
|100 ms
+|`dynamicBatch`
+|Adapt batch size depending on ingress rate.
+|false
+
|`confirmTimeout`
|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
outstanding unconfirmed messages timed out.
diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
index 5686a370cf..79bc9905d6 100644
--- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -97,6 +97,28 @@ public interface ProducerBuilder {
*/
ProducerBuilder batchPublishingDelay(Duration batchPublishingDelay);
+ /**
+ * Adapt batch size depending on ingress rate.
+ *
+ * A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
+ * for sustained high ingress rates.
+ *
+ *
Set this flag to true
if you want as little delay as possible before calling
+ * {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
+ *
+ *
Set this flag to false
if latency is not critical for your use case and you
+ * want the highest throughput possible for both publishing and consuming.
+ *
+ *
Dynamic batch is not activated by default (dynamicBatch = false
).
+ *
+ *
Dynamic batch is experimental.
+ *
+ * @param dynamicBatch
+ * @return this builder instance
+ * @since 0.20.0
+ */
+ ProducerBuilder dynamicBatch(boolean dynamicBatch);
+
/**
* The maximum number of unconfirmed outbound messages.
*
diff --git a/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java b/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java
new file mode 100644
index 0000000000..daf877f20b
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java
@@ -0,0 +1,60 @@
+// Copyright (c) 2024 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ConcurrencyUtils {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrencyUtils.class);
+
+ private static final ThreadFactory THREAD_FACTORY;
+
+ static {
+ if (isJava21OrMore()) {
+ LOGGER.debug("Running Java 21 or more, using virtual threads");
+ Class> builderClass =
+ Arrays.stream(Thread.class.getDeclaredClasses())
+ .filter(c -> "Builder".equals(c.getSimpleName()))
+ .findFirst()
+ .get();
+ // Reflection code is the same as:
+ // Thread.ofVirtual().factory();
+ try {
+ Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null);
+ THREAD_FACTORY = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
+ } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ THREAD_FACTORY = Executors.defaultThreadFactory();
+ }
+ }
+
+ private ConcurrencyUtils() {}
+
+ static ThreadFactory defaultThreadFactory() {
+ return THREAD_FACTORY;
+ }
+
+ private static boolean isJava21OrMore() {
+ return Utils.versionCompare(System.getProperty("java.version"), "21.0") >= 0;
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java
new file mode 100644
index 0000000000..ba52d23950
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java
@@ -0,0 +1,121 @@
+// Copyright (c) 2024 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DynamicBatch implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
+ private static final int MIN_BATCH_SIZE = 32;
+ private static final int MAX_BATCH_SIZE = 8192;
+
+ private final BlockingQueue requests = new LinkedBlockingQueue<>();
+ private final BiPredicate, Boolean> consumer;
+ private final int configuredBatchSize;
+ private final Thread thread;
+
+ DynamicBatch(BiPredicate, Boolean> consumer, int batchSize) {
+ this.consumer = consumer;
+ this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
+ this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
+ this.thread.start();
+ }
+
+ void add(T item) {
+ try {
+ requests.put(item);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void loop() {
+ State state = new State<>();
+ state.batchSize = this.configuredBatchSize;
+ state.items = new ArrayList<>(state.batchSize);
+ state.retry = false;
+ Thread currentThread = Thread.currentThread();
+ T item;
+ while (!currentThread.isInterrupted()) {
+ try {
+ item = this.requests.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ currentThread.interrupt();
+ return;
+ }
+ if (item != null) {
+ state.items.add(item);
+ if (state.items.size() >= state.batchSize) {
+ this.maybeCompleteBatch(state, true);
+ } else {
+ item = this.requests.poll();
+ if (item == null) {
+ this.maybeCompleteBatch(state, false);
+ } else {
+ state.items.add(item);
+ if (state.items.size() >= state.batchSize) {
+ this.maybeCompleteBatch(state, true);
+ }
+ }
+ }
+ } else {
+ this.maybeCompleteBatch(state, false);
+ }
+ }
+ }
+
+ private static final class State {
+
+ int batchSize;
+ List items;
+ boolean retry;
+ }
+
+ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) {
+ try {
+ boolean completed = this.consumer.test(state.items, state.retry);
+ if (completed) {
+ if (increaseIfCompleted) {
+ state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
+ } else {
+ state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
+ }
+ state.items = new ArrayList<>(state.batchSize);
+ state.retry = false;
+ } else {
+ state.retry = true;
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
+ state.retry = true;
+ }
+ }
+
+ @Override
+ public void close() {
+ this.thread.interrupt();
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java
new file mode 100644
index 0000000000..2b8d6241e9
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java
@@ -0,0 +1,152 @@
+// Copyright (c) 2024 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import com.rabbitmq.stream.Codec;
+import com.rabbitmq.stream.ConfirmationHandler;
+import com.rabbitmq.stream.Message;
+import com.rabbitmq.stream.ObservationCollector;
+import com.rabbitmq.stream.compression.Compression;
+import com.rabbitmq.stream.compression.CompressionCodec;
+import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity;
+import io.netty.buffer.ByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.ToLongFunction;
+
+final class DynamicBatchMessageAccumulator implements MessageAccumulator {
+
+ private final DynamicBatch dynamicBatch;
+ private final ObservationCollector observationCollector;
+ private final StreamProducer producer;
+ private final ProducerUtils.MessageAccumulatorHelper helper;
+
+ @SuppressWarnings("unchecked")
+ DynamicBatchMessageAccumulator(
+ int subEntrySize,
+ int batchSize,
+ Codec codec,
+ int maxFrameSize,
+ ToLongFunction publishSequenceFunction,
+ Function filterValueExtractor,
+ Clock clock,
+ String stream,
+ CompressionCodec compressionCodec,
+ ByteBufAllocator byteBufAllocator,
+ ObservationCollector> observationCollector,
+ StreamProducer producer) {
+ this.helper =
+ new ProducerUtils.MessageAccumulatorHelper(
+ codec,
+ maxFrameSize,
+ publishSequenceFunction,
+ filterValueExtractor,
+ clock,
+ stream,
+ observationCollector);
+ this.producer = producer;
+ this.observationCollector = (ObservationCollector) observationCollector;
+ if (subEntrySize <= 1) {
+ this.dynamicBatch =
+ new DynamicBatch<>(
+ (items, replay) -> {
+ if (!replay) {
+ items.forEach(
+ i -> {
+ AccumulatedEntity entity = (AccumulatedEntity) i;
+ this.observationCollector.published(
+ entity.observationContext(), entity.confirmationCallback().message());
+ });
+ }
+ return this.publish(items);
+ },
+ batchSize);
+ } else {
+ byte compressionCode =
+ compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
+ this.dynamicBatch =
+ new DynamicBatch<>(
+ (items, replay) -> {
+ List subBatches = new ArrayList<>();
+ int count = 0;
+ ProducerUtils.Batch batch =
+ this.helper.batch(
+ byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
+ AccumulatedEntity lastMessageInBatch = null;
+ for (Object msg : items) {
+ AccumulatedEntity message = (AccumulatedEntity) msg;
+ if (!replay) {
+ this.observationCollector.published(
+ message.observationContext(), message.confirmationCallback().message());
+ }
+ lastMessageInBatch = message;
+ batch.add(
+ (Codec.EncodedMessage) message.encodedEntity(),
+ message.confirmationCallback());
+ count++;
+ if (count == subEntrySize) {
+ batch.time = lastMessageInBatch.time();
+ batch.publishingId = lastMessageInBatch.publishingId();
+ batch.encodedMessageBatch.close();
+ subBatches.add(batch);
+ lastMessageInBatch = null;
+ batch =
+ this.helper.batch(
+ byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
+ count = 0;
+ }
+ }
+
+ if (!batch.isEmpty() && count < subEntrySize) {
+ batch.time = lastMessageInBatch.time();
+ batch.publishingId = lastMessageInBatch.publishingId();
+ batch.encodedMessageBatch.close();
+ subBatches.add(batch);
+ }
+ return this.publish(subBatches);
+ },
+ batchSize * subEntrySize);
+ }
+ }
+
+ @Override
+ public void add(Message message, ConfirmationHandler confirmationHandler) {
+ this.dynamicBatch.add(helper.entity(message, confirmationHandler));
+ }
+
+ @Override
+ public int size() {
+ // TODO compute dynamic batch message accumulator pending message count
+ return 0;
+ }
+
+ @Override
+ public void flush(boolean force) {}
+
+ private boolean publish(List entities) {
+ if (this.producer.canSend()) {
+ this.producer.publishInternal(entities);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() {
+ this.dynamicBatch.close();
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java
index c35271f29f..80e3bb516e 100644
--- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -17,28 +17,14 @@
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Message;
-interface MessageAccumulator {
+interface MessageAccumulator extends AutoCloseable {
- boolean add(Message message, ConfirmationHandler confirmationHandler);
-
- AccumulatedEntity get();
-
- boolean isEmpty();
+ void add(Message message, ConfirmationHandler confirmationHandler);
int size();
- interface AccumulatedEntity {
-
- long time();
-
- long publishingId();
-
- String filterValue();
-
- Object encodedEntity();
-
- StreamProducer.ConfirmationCallback confirmationCallback();
+ void flush(boolean force);
- Object observationContext();
- }
+ @Override
+ void close();
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java
new file mode 100644
index 0000000000..c74fa0102b
--- /dev/null
+++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java
@@ -0,0 +1,322 @@
+// Copyright (c) 2024 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import com.rabbitmq.stream.*;
+import com.rabbitmq.stream.compression.CompressionCodec;
+import io.netty.buffer.ByteBufAllocator;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.ToLongFunction;
+
+final class ProducerUtils {
+
+ private ProducerUtils() {}
+
+ static MessageAccumulator createMessageAccumulator(
+ boolean dynamicBatch,
+ int subEntrySize,
+ int batchSize,
+ CompressionCodec compressionCodec,
+ Codec codec,
+ ByteBufAllocator byteBufAllocator,
+ int maxFrameSize,
+ ToLongFunction publishSequenceFunction,
+ Function filterValueExtractor,
+ Clock clock,
+ String stream,
+ ObservationCollector> observationCollector,
+ StreamProducer producer) {
+ if (dynamicBatch) {
+ return new DynamicBatchMessageAccumulator(
+ subEntrySize,
+ batchSize,
+ codec,
+ maxFrameSize,
+ publishSequenceFunction,
+ filterValueExtractor,
+ clock,
+ stream,
+ compressionCodec,
+ byteBufAllocator,
+ observationCollector,
+ producer);
+ } else {
+ if (subEntrySize <= 1) {
+ return new SimpleMessageAccumulator(
+ batchSize,
+ codec,
+ maxFrameSize,
+ publishSequenceFunction,
+ filterValueExtractor,
+ clock,
+ stream,
+ observationCollector,
+ producer);
+ } else {
+ return new SubEntryMessageAccumulator(
+ subEntrySize,
+ batchSize,
+ compressionCodec,
+ codec,
+ byteBufAllocator,
+ maxFrameSize,
+ publishSequenceFunction,
+ clock,
+ stream,
+ observationCollector,
+ producer);
+ }
+ }
+ }
+
+ interface ConfirmationCallback {
+
+ int handle(boolean confirmed, short code);
+
+ Message message();
+ }
+
+ interface AccumulatedEntity {
+
+ long time();
+
+ long publishingId();
+
+ String filterValue();
+
+ Object encodedEntity();
+
+ ConfirmationCallback confirmationCallback();
+
+ Object observationContext();
+ }
+
+ static final class SimpleConfirmationCallback implements ConfirmationCallback {
+
+ private final Message message;
+ private final ConfirmationHandler confirmationHandler;
+
+ SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
+ this.message = message;
+ this.confirmationHandler = confirmationHandler;
+ }
+
+ @Override
+ public int handle(boolean confirmed, short code) {
+ confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code));
+ return 1;
+ }
+
+ @Override
+ public Message message() {
+ return this.message;
+ }
+ }
+
+ static final class SimpleAccumulatedEntity implements AccumulatedEntity {
+
+ private final long time;
+ private final long publishingId;
+ private final String filterValue;
+ private final Codec.EncodedMessage encodedMessage;
+ private final ConfirmationCallback confirmationCallback;
+ private final Object observationContext;
+
+ SimpleAccumulatedEntity(
+ long time,
+ long publishingId,
+ String filterValue,
+ Codec.EncodedMessage encodedMessage,
+ ConfirmationCallback confirmationCallback,
+ Object observationContext) {
+ this.time = time;
+ this.publishingId = publishingId;
+ this.encodedMessage = encodedMessage;
+ this.filterValue = filterValue;
+ this.confirmationCallback = confirmationCallback;
+ this.observationContext = observationContext;
+ }
+
+ @Override
+ public long publishingId() {
+ return publishingId;
+ }
+
+ @Override
+ public String filterValue() {
+ return filterValue;
+ }
+
+ @Override
+ public Object encodedEntity() {
+ return encodedMessage;
+ }
+
+ @Override
+ public long time() {
+ return time;
+ }
+
+ @Override
+ public ConfirmationCallback confirmationCallback() {
+ return confirmationCallback;
+ }
+
+ @Override
+ public Object observationContext() {
+ return this.observationContext;
+ }
+ }
+
+ static final class CompositeConfirmationCallback implements ConfirmationCallback {
+
+ private final List callbacks;
+
+ CompositeConfirmationCallback(List callbacks) {
+ this.callbacks = callbacks;
+ }
+
+ private void add(ConfirmationCallback confirmationCallback) {
+ this.callbacks.add(confirmationCallback);
+ }
+
+ @Override
+ public int handle(boolean confirmed, short code) {
+ for (ConfirmationCallback callback : callbacks) {
+ callback.handle(confirmed, code);
+ }
+ return callbacks.size();
+ }
+
+ @Override
+ public Message message() {
+ throw new UnsupportedOperationException(
+ "composite confirmation callback does not contain just one message");
+ }
+ }
+
+ static final class Batch implements AccumulatedEntity {
+
+ final Client.EncodedMessageBatch encodedMessageBatch;
+ private final CompositeConfirmationCallback confirmationCallback;
+ volatile long publishingId;
+ volatile long time;
+
+ Batch(
+ Client.EncodedMessageBatch encodedMessageBatch,
+ CompositeConfirmationCallback confirmationCallback) {
+ this.encodedMessageBatch = encodedMessageBatch;
+ this.confirmationCallback = confirmationCallback;
+ }
+
+ void add(Codec.EncodedMessage encodedMessage, ConfirmationCallback confirmationCallback) {
+ this.encodedMessageBatch.add(encodedMessage);
+ this.confirmationCallback.add(confirmationCallback);
+ }
+
+ boolean isEmpty() {
+ return this.confirmationCallback.callbacks.isEmpty();
+ }
+
+ @Override
+ public long publishingId() {
+ return publishingId;
+ }
+
+ @Override
+ public String filterValue() {
+ return null;
+ }
+
+ @Override
+ public Object encodedEntity() {
+ return encodedMessageBatch;
+ }
+
+ @Override
+ public long time() {
+ return time;
+ }
+
+ @Override
+ public ConfirmationCallback confirmationCallback() {
+ return confirmationCallback;
+ }
+
+ @Override
+ public Object observationContext() {
+ throw new UnsupportedOperationException(
+ "batch entity does not contain only one observation context");
+ }
+ }
+
+ static final class MessageAccumulatorHelper {
+
+ private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null;
+
+ private final ObservationCollector observationCollector;
+ private final ToLongFunction publishSequenceFunction;
+ private final String stream;
+ private final Codec codec;
+ private final int maxFrameSize;
+ private final Clock clock;
+ private final Function filterValueExtractor;
+
+ @SuppressWarnings("unchecked")
+ MessageAccumulatorHelper(
+ Codec codec,
+ int maxFrameSize,
+ ToLongFunction publishSequenceFunction,
+ Function filterValueExtractor,
+ Clock clock,
+ String stream,
+ ObservationCollector> observationCollector) {
+ this.publishSequenceFunction = publishSequenceFunction;
+ this.codec = codec;
+ this.clock = clock;
+ this.maxFrameSize = maxFrameSize;
+ this.filterValueExtractor =
+ filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
+ this.observationCollector = (ObservationCollector) observationCollector;
+ this.stream = stream;
+ }
+
+ AccumulatedEntity entity(Message message, ConfirmationHandler confirmationHandler) {
+ Object observationContext = this.observationCollector.prePublish(this.stream, message);
+ Codec.EncodedMessage encodedMessage = this.codec.encode(message);
+ Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
+ long publishingId = this.publishSequenceFunction.applyAsLong(message);
+ return new ProducerUtils.SimpleAccumulatedEntity(
+ this.clock.time(),
+ publishingId,
+ this.filterValueExtractor.apply(message),
+ this.codec.encode(message),
+ new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler),
+ observationContext);
+ }
+
+ Batch batch(
+ ByteBufAllocator bba,
+ byte compressionCode,
+ CompressionCodec compressionCodec,
+ int subEntrySize) {
+ return new ProducerUtils.Batch(
+ Client.EncodedMessageBatch.create(bba, compressionCode, compressionCodec, subEntrySize),
+ new ProducerUtils.CompositeConfirmationCallback(new ArrayList<>(subEntrySize)));
+ }
+ }
+}
diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java
index 718f253da7..97e99e4b9f 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -15,6 +15,9 @@
package com.rabbitmq.stream.impl;
import com.rabbitmq.stream.*;
+import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -23,17 +26,11 @@
class SimpleMessageAccumulator implements MessageAccumulator {
- private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null;
-
protected final BlockingQueue messages;
- protected final Clock clock;
private final int capacity;
- protected final Codec codec;
- private final int maxFrameSize;
- private final ToLongFunction publishSequenceFunction;
- private final Function filterValueExtractor;
- final String stream;
final ObservationCollector observationCollector;
+ private final StreamProducer producer;
+ final ProducerUtils.MessageAccumulatorHelper helper;
@SuppressWarnings("unchecked")
SimpleMessageAccumulator(
@@ -44,47 +41,39 @@ class SimpleMessageAccumulator implements MessageAccumulator {
Function filterValueExtractor,
Clock clock,
String stream,
- ObservationCollector> observationCollector) {
+ ObservationCollector> observationCollector,
+ StreamProducer producer) {
+ this.helper =
+ new ProducerUtils.MessageAccumulatorHelper(
+ codec,
+ maxFrameSize,
+ publishSequenceFunction,
+ filterValueExtractor,
+ clock,
+ stream,
+ observationCollector);
this.capacity = capacity;
- this.messages = new LinkedBlockingQueue<>(capacity);
- this.codec = codec;
- this.maxFrameSize = maxFrameSize;
- this.publishSequenceFunction = publishSequenceFunction;
- this.filterValueExtractor =
- filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
- this.clock = clock;
- this.stream = stream;
+ this.messages = new LinkedBlockingQueue<>(this.capacity);
this.observationCollector = (ObservationCollector) observationCollector;
+ this.producer = producer;
}
- public boolean add(Message message, ConfirmationHandler confirmationHandler) {
- Object observationContext = this.observationCollector.prePublish(this.stream, message);
- Codec.EncodedMessage encodedMessage = this.codec.encode(message);
- Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
- long publishingId = this.publishSequenceFunction.applyAsLong(message);
+ public void add(Message message, ConfirmationHandler confirmationHandler) {
+ AccumulatedEntity entity = this.helper.entity(message, confirmationHandler);
try {
- boolean offered =
- messages.offer(
- new SimpleAccumulatedEntity(
- clock.time(),
- publishingId,
- this.filterValueExtractor.apply(message),
- encodedMessage,
- new SimpleConfirmationCallback(message, confirmationHandler),
- observationContext),
- 60,
- TimeUnit.SECONDS);
+ boolean offered = messages.offer(entity, 60, TimeUnit.SECONDS);
if (!offered) {
throw new StreamException("Could not accumulate outbound message");
}
} catch (InterruptedException e) {
throw new StreamException("Error while accumulating outbound message", e);
}
- return this.messages.size() == this.capacity;
+ if (this.messages.size() == this.capacity) {
+ publishBatch(true);
+ }
}
- @Override
- public AccumulatedEntity get() {
+ AccumulatedEntity get() {
AccumulatedEntity entity = this.messages.poll();
if (entity != null) {
this.observationCollector.published(
@@ -93,91 +82,38 @@ public AccumulatedEntity get() {
return entity;
}
- @Override
- public boolean isEmpty() {
- return messages.isEmpty();
- }
-
@Override
public int size() {
return messages.size();
}
- private static final class SimpleAccumulatedEntity implements AccumulatedEntity {
-
- private final long time;
- private final long publishingId;
- private final String filterValue;
- private final Codec.EncodedMessage encodedMessage;
- private final StreamProducer.ConfirmationCallback confirmationCallback;
- private final Object observationContext;
-
- private SimpleAccumulatedEntity(
- long time,
- long publishingId,
- String filterValue,
- Codec.EncodedMessage encodedMessage,
- StreamProducer.ConfirmationCallback confirmationCallback,
- Object observationContext) {
- this.time = time;
- this.publishingId = publishingId;
- this.encodedMessage = encodedMessage;
- this.filterValue = filterValue;
- this.confirmationCallback = confirmationCallback;
- this.observationContext = observationContext;
- }
-
- @Override
- public long publishingId() {
- return publishingId;
- }
-
- @Override
- public String filterValue() {
- return filterValue;
- }
-
- @Override
- public Object encodedEntity() {
- return encodedMessage;
- }
-
- @Override
- public long time() {
- return time;
- }
-
- @Override
- public StreamProducer.ConfirmationCallback confirmationCallback() {
- return confirmationCallback;
- }
-
- @Override
- public Object observationContext() {
- return this.observationContext;
- }
+ @Override
+ public void flush(boolean force) {
+ boolean stateCheck = !force;
+ publishBatch(stateCheck);
}
- private static final class SimpleConfirmationCallback
- implements StreamProducer.ConfirmationCallback {
-
- private final Message message;
- private final ConfirmationHandler confirmationHandler;
-
- private SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
- this.message = message;
- this.confirmationHandler = confirmationHandler;
- }
-
- @Override
- public int handle(boolean confirmed, short code) {
- confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code));
- return 1;
- }
-
- @Override
- public Message message() {
- return this.message;
+ private void publishBatch(boolean stateCheck) {
+ this.producer.lock();
+ try {
+ if ((!stateCheck || this.producer.canSend()) && !this.messages.isEmpty()) {
+ List entities = new ArrayList<>(this.capacity);
+ int batchCount = 0;
+ while (batchCount != this.capacity) {
+ AccumulatedEntity entity = this.get();
+ if (entity == null) {
+ break;
+ }
+ entities.add(entity);
+ batchCount++;
+ }
+ producer.publishInternal(entities);
+ }
+ } finally {
+ this.producer.unlock();
}
}
+
+ @Override
+ public void close() {}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
index 4050fd0af7..e7e67be0be 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -18,17 +18,11 @@
import static com.rabbitmq.stream.impl.Utils.formatConstant;
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
-import com.rabbitmq.stream.Codec;
-import com.rabbitmq.stream.ConfirmationHandler;
-import com.rabbitmq.stream.ConfirmationStatus;
-import com.rabbitmq.stream.Constants;
-import com.rabbitmq.stream.Message;
-import com.rabbitmq.stream.MessageBuilder;
-import com.rabbitmq.stream.Producer;
-import com.rabbitmq.stream.StreamException;
+import com.rabbitmq.stream.*;
import com.rabbitmq.stream.compression.Compression;
+import com.rabbitmq.stream.compression.CompressionCodec;
import com.rabbitmq.stream.impl.Client.Response;
-import com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity;
+import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
@@ -49,6 +43,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;
@@ -84,6 +80,7 @@ class StreamProducer implements Producer {
private volatile Status status;
private volatile ScheduledFuture> confirmTimeoutFuture;
private final short publishVersion;
+ private final Lock lock = new ReentrantLock();
@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
StreamProducer(
@@ -91,6 +88,7 @@ class StreamProducer implements Producer {
String stream,
int subEntrySize,
int batchSize,
+ boolean dynamicBatch,
Compression compression,
Duration batchPublishingDelay,
int maxUnconfirmedMessages,
@@ -122,37 +120,14 @@ class StreamProducer implements Producer {
return publishingSequence.getAndIncrement();
}
};
+
if (subEntrySize <= 1) {
- this.accumulator =
- new SimpleMessageAccumulator(
- batchSize,
- environment.codec(),
- client.maxFrameSize(),
- accumulatorPublishSequenceFunction,
- filterValueExtractor,
- this.environment.clock(),
- stream,
- this.environment.observationCollector());
if (filterValueExtractor == null) {
delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK;
} else {
delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK;
}
} else {
- this.accumulator =
- new SubEntryMessageAccumulator(
- subEntrySize,
- batchSize,
- compression == Compression.NONE
- ? null
- : environment.compressionCodecFactory().get(compression),
- environment.codec(),
- this.environment.byteBufAllocator(),
- client.maxFrameSize(),
- accumulatorPublishSequenceFunction,
- this.environment.clock(),
- stream,
- environment.observationCollector());
delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK;
}
@@ -166,8 +141,7 @@ class StreamProducer implements Producer {
new Client.OutboundEntityWriteCallback() {
@Override
public int write(ByteBuf bb, Object entity, long publishingId) {
- MessageAccumulator.AccumulatedEntity accumulatedEntity =
- (MessageAccumulator.AccumulatedEntity) entity;
+ AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity;
unconfirmedMessages.put(publishingId, accumulatedEntity);
return delegateWriteCallback.write(
bb, accumulatedEntity.encodedEntity(), publishingId);
@@ -176,7 +150,7 @@ public int write(ByteBuf bb, Object entity, long publishingId) {
@Override
public int fragmentLength(Object entity) {
return delegateWriteCallback.fragmentLength(
- ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity());
+ ((AccumulatedEntity) entity).encodedEntity());
}
};
} else {
@@ -185,8 +159,7 @@ public int fragmentLength(Object entity) {
new Client.OutboundEntityWriteCallback() {
@Override
public int write(ByteBuf bb, Object entity, long publishingId) {
- MessageAccumulator.AccumulatedEntity accumulatedEntity =
- (MessageAccumulator.AccumulatedEntity) entity;
+ AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity;
unconfirmedMessages.put(publishingId, accumulatedEntity);
return delegateWriteCallback.write(bb, accumulatedEntity, publishingId);
}
@@ -198,14 +171,36 @@ public int fragmentLength(Object entity) {
};
}
- if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) {
+ CompressionCodec compressionCodec = null;
+ if (compression != null) {
+ compressionCodec = environment.compressionCodecFactory().get(compression);
+ }
+ this.accumulator =
+ ProducerUtils.createMessageAccumulator(
+ dynamicBatch,
+ subEntrySize,
+ batchSize,
+ compressionCodec,
+ environment.codec(),
+ environment.byteBufAllocator(),
+ client.maxFrameSize(),
+ accumulatorPublishSequenceFunction,
+ filterValueExtractor,
+ environment.clock(),
+ stream,
+ environment.observationCollector(),
+ this);
+
+ boolean backgroundBatchPublishingTaskRequired =
+ !dynamicBatch && batchPublishingDelay.toMillis() > 0;
+ LOGGER.debug(
+ "Background batch publishing task required? {}", backgroundBatchPublishingTaskRequired);
+ if (backgroundBatchPublishingTaskRequired) {
AtomicReference taskReference = new AtomicReference<>();
Runnable task =
() -> {
if (canSend()) {
- synchronized (StreamProducer.this) {
- publishBatch(true);
- }
+ this.accumulator.flush(false);
}
if (status != Status.CLOSED) {
environment
@@ -289,7 +284,7 @@ private Runnable confirmTimeoutTask(Duration confirmTimeout) {
error(unconfirmedEntry.getKey(), Constants.CODE_PUBLISH_CONFIRM_TIMEOUT);
count++;
} else {
- // everything else is after, so we can stop
+ // everything else is after, we can stop
break;
}
}
@@ -318,8 +313,10 @@ private long computeFirstValueOfPublishingSequence() {
}
}
+ // visible for testing
void confirm(long publishingId) {
AccumulatedEntity accumulatedEntity = this.unconfirmedMessages.remove(publishingId);
+
if (accumulatedEntity != null) {
int confirmedCount =
accumulatedEntity.confirmationCallback().handle(true, Constants.RESPONSE_CODE_OK);
@@ -329,6 +326,11 @@ void confirm(long publishingId) {
}
}
+ // for testing
+ int unconfirmedCount() {
+ return this.unconfirmedMessages.size();
+ }
+
void error(long publishingId, short errorCode) {
AccumulatedEntity accumulatedEntity = unconfirmedMessages.remove(publishingId);
if (accumulatedEntity != null) {
@@ -397,11 +399,7 @@ public void send(Message message, ConfirmationHandler confirmationHandler) {
private void doSend(Message message, ConfirmationHandler confirmationHandler) {
if (canSend()) {
- if (accumulator.add(message, confirmationHandler)) {
- synchronized (this) {
- publishBatch(true);
- }
- }
+ this.accumulator.add(message, confirmationHandler);
} else {
failPublishing(message, confirmationHandler);
}
@@ -419,7 +417,7 @@ private void failPublishing(Message message, ConfirmationHandler confirmationHan
}
}
- private boolean canSend() {
+ boolean canSend() {
return this.status == Status.RUNNING;
}
@@ -445,6 +443,7 @@ public void close() {
}
void closeFromEnvironment() {
+ this.accumulator.close();
this.closingCallback.run();
cancelConfirmTimeoutTask();
this.closed.set(true);
@@ -476,25 +475,13 @@ private void cancelConfirmTimeoutTask() {
}
}
- private void publishBatch(boolean stateCheck) {
- if ((!stateCheck || canSend()) && !accumulator.isEmpty()) {
- List messages = new ArrayList<>(this.batchSize);
- int batchCount = 0;
- while (batchCount != this.batchSize) {
- Object accMessage = accumulator.get();
- if (accMessage == null) {
- break;
- }
- messages.add(accMessage);
- batchCount++;
- }
- client.publishInternal(
- this.publishVersion,
- this.publisherId,
- messages,
- this.writeCallback,
- this.publishSequenceFunction);
- }
+ void publishInternal(List messages) {
+ client.publishInternal(
+ this.publishVersion,
+ this.publisherId,
+ messages,
+ this.writeCallback,
+ this.publishSequenceFunction);
}
boolean isOpen() {
@@ -506,76 +493,80 @@ void unavailable() {
}
void running() {
- synchronized (this) {
- LOGGER.debug(
- "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)",
- this.unconfirmedMessages.size(),
- this.accumulator.size());
- if (this.retryOnRecovery) {
- LOGGER.debug("Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size());
- if (!this.unconfirmedMessages.isEmpty()) {
- Map messagesToResend = new TreeMap<>(this.unconfirmedMessages);
- this.unconfirmedMessages.clear();
- Iterator> resendIterator =
- messagesToResend.entrySet().iterator();
- while (resendIterator.hasNext()) {
- List messages = new ArrayList<>(this.batchSize);
- int batchCount = 0;
- while (batchCount != this.batchSize) {
- Object accMessage =
- resendIterator.hasNext() ? resendIterator.next().getValue() : null;
- if (accMessage == null) {
- break;
+ this.executeInLock(
+ () -> {
+ LOGGER.debug(
+ "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)",
+ this.unconfirmedMessages.size(),
+ this.accumulator.size());
+ if (this.retryOnRecovery) {
+ LOGGER.debug(
+ "Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size());
+ if (!this.unconfirmedMessages.isEmpty()) {
+ Map messagesToResend =
+ new TreeMap<>(this.unconfirmedMessages);
+ this.unconfirmedMessages.clear();
+ Iterator> resendIterator =
+ messagesToResend.entrySet().iterator();
+ while (resendIterator.hasNext()) {
+ List messages = new ArrayList<>(this.batchSize);
+ int batchCount = 0;
+ while (batchCount != this.batchSize) {
+ Object accMessage =
+ resendIterator.hasNext() ? resendIterator.next().getValue() : null;
+ if (accMessage == null) {
+ break;
+ }
+ messages.add(accMessage);
+ batchCount++;
+ }
+ client.publishInternal(
+ this.publishVersion,
+ this.publisherId,
+ messages,
+ this.writeCallback,
+ this.publishSequenceFunction);
+ }
+ }
+ } else {
+ LOGGER.debug(
+ "Skipping republishing of {} unconfirmed messages",
+ this.unconfirmedMessages.size());
+ Map messagesToFail = new TreeMap<>(this.unconfirmedMessages);
+ this.unconfirmedMessages.clear();
+ for (AccumulatedEntity accumulatedEntity : messagesToFail.values()) {
+ try {
+ int permits =
+ accumulatedEntity
+ .confirmationCallback()
+ .handle(false, CODE_PUBLISH_CONFIRM_TIMEOUT);
+ this.unconfirmedMessagesSemaphore.release(permits);
+ } catch (Exception e) {
+ LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage());
+ this.unconfirmedMessagesSemaphore.release(1);
}
- messages.add(accMessage);
- batchCount++;
}
- client.publishInternal(
- this.publishVersion,
- this.publisherId,
- messages,
- this.writeCallback,
- this.publishSequenceFunction);
}
- }
- } else {
- LOGGER.debug(
- "Skipping republishing of {} unconfirmed messages", this.unconfirmedMessages.size());
- Map messagesToFail = new TreeMap<>(this.unconfirmedMessages);
- this.unconfirmedMessages.clear();
- for (AccumulatedEntity accumulatedEntity : messagesToFail.values()) {
- try {
- int permits =
- accumulatedEntity
- .confirmationCallback()
- .handle(false, CODE_PUBLISH_CONFIRM_TIMEOUT);
- this.unconfirmedMessagesSemaphore.release(permits);
- } catch (Exception e) {
- LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage());
- this.unconfirmedMessagesSemaphore.release(1);
+ this.accumulator.flush(true);
+ int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
+ if (toRelease > 0) {
+ unconfirmedMessagesSemaphore.release(toRelease);
+ if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
+ LOGGER.debug(
+ "Could not acquire {} permit(s) for message republishing",
+ this.unconfirmedMessages.size());
+ }
}
- }
- }
- publishBatch(false);
- int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
- if (toRelease > 0) {
- unconfirmedMessagesSemaphore.release(toRelease);
- if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
- LOGGER.debug(
- "Could not acquire {} permit(s) for message republishing",
- this.unconfirmedMessages.size());
- }
- }
- }
+ });
this.status = Status.RUNNING;
}
- synchronized void setClient(Client client) {
- this.client = client;
+ void setClient(Client client) {
+ this.executeInLock(() -> this.client = client);
}
- synchronized void setPublisherId(byte publisherId) {
- this.publisherId = publisherId;
+ void setPublisherId(byte publisherId) {
+ this.executeInLock(() -> this.publisherId = publisherId);
}
Status status() {
@@ -588,13 +579,6 @@ enum Status {
CLOSED
}
- interface ConfirmationCallback {
-
- int handle(boolean confirmed, short code);
-
- Message message();
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -668,4 +652,21 @@ public int fragmentLength(Object entity) {
}
}
}
+
+ void lock() {
+ this.lock.lock();
+ }
+
+ void unlock() {
+ this.lock.unlock();
+ }
+
+ private void executeInLock(Runnable action) {
+ this.lock();
+ try {
+ action.run();
+ } finally {
+ this.unlock();
+ }
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
index 493426c67f..54807489e2 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -21,12 +21,16 @@
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.Compression;
import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.ToIntFunction;
class StreamProducerBuilder implements ProducerBuilder {
+ static final boolean DEFAULT_DYNAMIC_BATCH =
+ Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false"));
+
private final StreamEnvironment environment;
private String name;
@@ -53,6 +57,8 @@ class StreamProducerBuilder implements ProducerBuilder {
private Function filterValueExtractor;
+ private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH;
+
StreamProducerBuilder(StreamEnvironment environment) {
this.environment = environment;
}
@@ -97,11 +103,18 @@ public ProducerBuilder compression(Compression compression) {
return this;
}
+ @Override
public StreamProducerBuilder batchPublishingDelay(Duration batchPublishingDelay) {
this.batchPublishingDelay = batchPublishingDelay;
return this;
}
+ @Override
+ public ProducerBuilder dynamicBatch(boolean dynamicBatch) {
+ this.dynamicBatch = dynamicBatch;
+ return this;
+ }
+
@Override
public ProducerBuilder maxUnconfirmedMessages(int maxUnconfirmedMessages) {
if (maxUnconfirmedMessages <= 0) {
@@ -198,6 +211,7 @@ public Producer build() {
stream,
subEntrySize,
batchSize,
+ dynamicBatch,
compression,
batchPublishingDelay,
maxUnconfirmedMessages,
@@ -229,11 +243,13 @@ public Producer build() {
StreamProducerBuilder duplicate() {
StreamProducerBuilder duplicate = new StreamProducerBuilder(this.environment);
for (Field field : StreamProducerBuilder.class.getDeclaredFields()) {
- field.setAccessible(true);
- try {
- field.set(duplicate, field.get(this));
- } catch (IllegalAccessException e) {
- throw new StreamException("Error while duplicating stream producer builder", e);
+ if (!Modifier.isStatic(field.getModifiers())) {
+ field.setAccessible(true);
+ try {
+ field.set(duplicate, field.get(this));
+ } catch (IllegalAccessException e) {
+ throw new StreamException("Error while duplicating stream producer builder", e);
+ }
}
}
return duplicate;
diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java
index 9693aea50c..14645074e6 100644
--- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -20,18 +20,15 @@
import com.rabbitmq.stream.ObservationCollector;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodec;
-import com.rabbitmq.stream.impl.Client.EncodedMessageBatch;
import io.netty.buffer.ByteBufAllocator;
-import java.util.ArrayList;
-import java.util.List;
import java.util.function.ToLongFunction;
-class SubEntryMessageAccumulator extends SimpleMessageAccumulator {
+final class SubEntryMessageAccumulator extends SimpleMessageAccumulator {
private final int subEntrySize;
private final CompressionCodec compressionCodec;
private final ByteBufAllocator byteBufAllocator;
- private final byte compression;
+ private final byte compressionCode;
public SubEntryMessageAccumulator(
int subEntrySize,
@@ -43,7 +40,8 @@ public SubEntryMessageAccumulator(
ToLongFunction publishSequenceFunction,
Clock clock,
String stream,
- ObservationCollector observationCollector) {
+ ObservationCollector> observationCollector,
+ StreamProducer producer) {
super(
subEntrySize * batchSize,
codec,
@@ -52,30 +50,30 @@ public SubEntryMessageAccumulator(
null,
clock,
stream,
- observationCollector);
+ observationCollector,
+ producer);
this.subEntrySize = subEntrySize;
this.compressionCodec = compressionCodec;
- this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
+ this.compressionCode =
+ compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
this.byteBufAllocator = byteBufAllocator;
}
- private Batch createBatch() {
- return new Batch(
- EncodedMessageBatch.create(
- byteBufAllocator, compression, compressionCodec, this.subEntrySize),
- new CompositeConfirmationCallback(new ArrayList<>(this.subEntrySize)));
+ private ProducerUtils.Batch createBatch() {
+ return this.helper.batch(
+ this.byteBufAllocator, this.compressionCode, this.compressionCodec, this.subEntrySize);
}
@Override
- public AccumulatedEntity get() {
+ protected ProducerUtils.AccumulatedEntity get() {
if (this.messages.isEmpty()) {
return null;
}
int count = 0;
- Batch batch = createBatch();
- AccumulatedEntity lastMessageInBatch = null;
+ ProducerUtils.Batch batch = this.createBatch();
+ ProducerUtils.AccumulatedEntity lastMessageInBatch = null;
while (count != this.subEntrySize) {
- AccumulatedEntity message = messages.poll();
+ ProducerUtils.AccumulatedEntity message = messages.poll();
if (message == null) {
break;
}
@@ -94,89 +92,4 @@ public AccumulatedEntity get() {
return batch;
}
}
-
- private static class Batch implements AccumulatedEntity {
-
- private final EncodedMessageBatch encodedMessageBatch;
- private final CompositeConfirmationCallback confirmationCallback;
- private volatile long publishingId;
- private volatile long time;
-
- private Batch(
- EncodedMessageBatch encodedMessageBatch,
- CompositeConfirmationCallback confirmationCallback) {
- this.encodedMessageBatch = encodedMessageBatch;
- this.confirmationCallback = confirmationCallback;
- }
-
- void add(
- Codec.EncodedMessage encodedMessage,
- StreamProducer.ConfirmationCallback confirmationCallback) {
- this.encodedMessageBatch.add(encodedMessage);
- this.confirmationCallback.add(confirmationCallback);
- }
-
- boolean isEmpty() {
- return this.confirmationCallback.callbacks.isEmpty();
- }
-
- @Override
- public long publishingId() {
- return publishingId;
- }
-
- @Override
- public String filterValue() {
- return null;
- }
-
- @Override
- public Object encodedEntity() {
- return encodedMessageBatch;
- }
-
- @Override
- public long time() {
- return time;
- }
-
- @Override
- public StreamProducer.ConfirmationCallback confirmationCallback() {
- return confirmationCallback;
- }
-
- @Override
- public Object observationContext() {
- throw new UnsupportedOperationException(
- "batch entity does not contain only one observation context");
- }
- }
-
- private static class CompositeConfirmationCallback
- implements StreamProducer.ConfirmationCallback {
-
- private final List callbacks;
-
- private CompositeConfirmationCallback(List callbacks) {
- this.callbacks = callbacks;
- }
-
- private void add(StreamProducer.ConfirmationCallback confirmationCallback) {
- this.callbacks.add(confirmationCallback);
- }
-
- @Override
- public int handle(boolean confirmed, short code) {
- for (StreamProducer.ConfirmationCallback callback : callbacks) {
- callback.handle(confirmed, code);
- }
- return callbacks.size();
- }
-
- @Override
- public Message message() {
- throw new UnsupportedOperationException(
- "composite confirmation callback does not contain just one message");
- }
- }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java
new file mode 100644
index 0000000000..2049915457
--- /dev/null
+++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java
@@ -0,0 +1,71 @@
+// Copyright (c) 2024 Broadcom. All Rights Reserved.
+// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+//
+// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
+// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
+// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+package com.rabbitmq.stream.impl;
+
+import com.codahale.metrics.ConsoleReporter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Test;
+
+public class DynamicBatchTest {
+
+ @Test
+ void test() {
+ MetricRegistry metrics = new MetricRegistry();
+ Histogram batchSizeMetrics = metrics.histogram("batch-size");
+ ConsoleReporter reporter =
+ ConsoleReporter.forRegistry(metrics)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .build();
+
+ int itemCount = 3000;
+ TestUtils.Sync sync = TestUtils.sync(itemCount);
+ Random random = new Random();
+ DynamicBatch batch =
+ new DynamicBatch<>(
+ (items, replay) -> {
+ batchSizeMetrics.update(items.size());
+ try {
+ Thread.sleep(random.nextInt(10) + 1);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ sync.down(items.size());
+ return true;
+ },
+ 100);
+ try {
+ RateLimiter rateLimiter = RateLimiter.create(3000);
+ long start = System.nanoTime();
+ IntStream.range(0, itemCount)
+ .forEach(
+ i -> {
+ rateLimiter.acquire();
+ batch.add(String.valueOf(i));
+ });
+ Assertions.assertThat(sync).completes();
+ long end = System.nanoTime();
+ // System.out.println("Done in " + Duration.ofNanos(end - start));
+ // reporter.report();
+ } finally {
+ batch.close();
+ }
+ }
+}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java
index 6cf06356d0..e900fb4990 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -14,19 +14,17 @@
// info@rabbitmq.com.
package com.rabbitmq.stream.impl;
-import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
-import static com.rabbitmq.stream.impl.TestUtils.localhost;
-import static com.rabbitmq.stream.impl.TestUtils.streamName;
-import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
+import static com.rabbitmq.stream.impl.Assertions.assertThat;
+import static com.rabbitmq.stream.impl.TestUtils.*;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import ch.qos.logback.classic.Level;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducerInfo;
import com.rabbitmq.stream.impl.StreamProducer.Status;
+import com.rabbitmq.stream.impl.TestUtils.Sync;
import io.netty.channel.ChannelOption;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
@@ -94,7 +92,7 @@ void tearDown() {
void send() throws Exception {
int batchSize = 10;
int messageCount = 10 * batchSize + 1; // don't want a multiple of batch size
- CountDownLatch publishLatch = new CountDownLatch(messageCount);
+ Sync confirmSync = sync(messageCount);
Producer producer = environment.producerBuilder().stream(stream).batchSize(batchSize).build();
AtomicLong count = new AtomicLong(0);
AtomicLong sequence = new AtomicLong(0);
@@ -117,13 +115,12 @@ void send() throws Exception {
idsConfirmed.add(
confirmationStatus.getMessage().getProperties().getMessageIdAsLong());
count.incrementAndGet();
- publishLatch.countDown();
+ confirmSync.down();
});
});
- boolean completed = publishLatch.await(10, TimeUnit.SECONDS);
+ assertThat(confirmSync).completes();
assertThat(idsSent).hasSameSizeAs(idsConfirmed);
idsSent.forEach(idSent -> assertThat(idsConfirmed).contains(idSent));
- assertThat(completed).isTrue();
ProducerInfo info = MonitoringTestUtils.extract(producer);
assertThat(info.getId()).isGreaterThanOrEqualTo(0);
@@ -349,63 +346,59 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
@ParameterizedTest
@ValueSource(ints = {1, 7})
void producerShouldBeClosedWhenStreamIsDeleted(int subEntrySize, TestInfo info) throws Exception {
- Level initialLogLevel = TestUtils.newLoggerLevel(ProducersCoordinator.class, Level.DEBUG);
- try {
- String s = streamName(info);
- environment.streamCreator().stream(s).create();
-
- StreamProducer producer =
- (StreamProducer)
- environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build();
-
- AtomicInteger published = new AtomicInteger(0);
- AtomicInteger confirmed = new AtomicInteger(0);
- AtomicInteger errored = new AtomicInteger(0);
- Set errorCodes = ConcurrentHashMap.newKeySet();
-
- AtomicBoolean continuePublishing = new AtomicBoolean(true);
- Thread publishThread =
- new Thread(
- () -> {
- ConfirmationHandler confirmationHandler =
- confirmationStatus -> {
- if (confirmationStatus.isConfirmed()) {
- confirmed.incrementAndGet();
- } else {
- errored.incrementAndGet();
- errorCodes.add(confirmationStatus.getCode());
- }
- };
- while (continuePublishing.get()) {
- try {
- producer.send(
- producer
- .messageBuilder()
- .addData("".getBytes(StandardCharsets.UTF_8))
- .build(),
- confirmationHandler);
- published.incrementAndGet();
- } catch (StreamException e) {
- // OK
- }
+ String s = streamName(info);
+ environment.streamCreator().stream(s).create();
+
+ StreamProducer producer =
+ (StreamProducer) environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build();
+
+ AtomicInteger published = new AtomicInteger(0);
+ AtomicInteger confirmed = new AtomicInteger(0);
+ AtomicInteger errored = new AtomicInteger(0);
+ Set errorCodes = ConcurrentHashMap.newKeySet();
+
+ AtomicBoolean continuePublishing = new AtomicBoolean(true);
+ Thread publishThread =
+ new Thread(
+ () -> {
+ ConfirmationHandler confirmationHandler =
+ confirmationStatus -> {
+ if (confirmationStatus.isConfirmed()) {
+ confirmed.incrementAndGet();
+ } else {
+ errored.incrementAndGet();
+ errorCodes.add(confirmationStatus.getCode());
+ }
+ };
+ while (continuePublishing.get()) {
+ try {
+ producer.send(
+ producer
+ .messageBuilder()
+ .addData("".getBytes(StandardCharsets.UTF_8))
+ .build(),
+ confirmationHandler);
+ published.incrementAndGet();
+ } catch (StreamException e) {
+ // OK
}
- });
- publishThread.start();
+ }
+ });
+ publishThread.start();
- Thread.sleep(1000L);
+ waitAtMost(() -> confirmed.get() > 100);
+ int confirmedNow = confirmed.get();
+ waitAtMost(() -> confirmed.get() > confirmedNow + 1000);
- assertThat(producer.isOpen()).isTrue();
+ assertThat(producer.isOpen()).isTrue();
- environment.deleteStream(s);
+ environment.deleteStream(s);
- waitAtMost(() -> !producer.isOpen());
- continuePublishing.set(false);
- waitAtMost(
- () -> !errorCodes.isEmpty(),
- () -> "The producer should have received negative publish confirms");
- } finally {
- TestUtils.newLoggerLevel(ProducersCoordinator.class, initialLogLevel);
- }
+ waitAtMost(() -> !producer.isOpen());
+ continuePublishing.set(false);
+ waitAtMost(
+ () -> !errorCodes.isEmpty(),
+ () -> "The producer should have received negative publish confirms");
}
@ParameterizedTest
@@ -415,15 +408,14 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize)
int firstWaveLineCount = lineCount / 5;
int backwardCount = firstWaveLineCount / 10;
SortedSet document = new TreeSet<>();
- IntStream.range(0, lineCount).forEach(i -> document.add(i));
+ IntStream.range(0, lineCount).forEach(document::add);
Producer producer =
environment.producerBuilder().name("producer-1").stream(stream)
.subEntrySize(subEntrySize)
.build();
- AtomicReference latch =
- new AtomicReference<>(new CountDownLatch(firstWaveLineCount));
- ConfirmationHandler confirmationHandler = confirmationStatus -> latch.get().countDown();
+ Sync confirmSync = sync(firstWaveLineCount);
+ ConfirmationHandler confirmationHandler = confirmationStatus -> confirmSync.down();
Consumer publishMessage =
i ->
producer.send(
@@ -433,15 +425,17 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize)
.addData(String.valueOf(i).getBytes())
.build(),
confirmationHandler);
+ // publish the first wave
document.headSet(firstWaveLineCount).forEach(publishMessage);
- assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
+ assertThat(confirmSync).completes();
- latch.set(new CountDownLatch(lineCount - firstWaveLineCount + backwardCount));
+ confirmSync.reset(lineCount - firstWaveLineCount + backwardCount);
+ // publish the rest, but with some overlap from the first wave
document.tailSet(firstWaveLineCount - backwardCount).forEach(publishMessage);
- assertThat(latch.get().await(5, TimeUnit.SECONDS)).isTrue();
+ assertThat(confirmSync).completes();
CountDownLatch consumeLatch = new CountDownLatch(lineCount);
AtomicInteger consumed = new AtomicInteger();
@@ -453,14 +447,17 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize)
consumeLatch.countDown();
})
.build();
- assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
- Thread.sleep(1000);
- // if we are using sub-entries, we cannot avoid duplicates.
- // here, a sub-entry in the second wave, right at the end of the re-submitted
- // values will contain those duplicates, because its publishing ID will be
- // the one of its last message, so the server will accept the whole sub-entry,
- // including the duplicates.
- assertThat(consumed.get()).isEqualTo(lineCount + backwardCount % subEntrySize);
+ assertThat(consumeLatch.await(5, TimeUnit.SECONDS)).isTrue();
+ if (subEntrySize == 1) {
+ assertThat(consumed.get()).isEqualTo(lineCount);
+ } else {
+ // if we are using sub-entries, we cannot avoid duplicates.
+ // here, a sub-entry in the second wave, right at the end of the re-submitted
+ // values will contain those duplicates, because its publishing ID will be
+ // the one of its last message, so the server will accept the whole sub-entry,
+ // including the duplicates.
+ assertThat(consumed.get()).isBetween(lineCount, lineCount + subEntrySize);
+ }
}
@ParameterizedTest
@@ -636,11 +633,10 @@ void subEntryBatchesSentCompressedShouldBeConsumedProperly() {
}
@Test
- void methodsShouldThrowExceptionWhenProducerIsClosed() throws InterruptedException {
+ void methodsShouldThrowExceptionWhenProducerIsClosed() {
Producer producer = environment.producerBuilder().stream(stream).build();
producer.close();
- assertThatThrownBy(() -> producer.getLastPublishingId())
- .isInstanceOf(IllegalStateException.class);
+ assertThatThrownBy(producer::getLastPublishingId).isInstanceOf(IllegalStateException.class);
}
@Test
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
index 28c87bfbc2..19390e6877 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2021-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2021-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -15,6 +15,7 @@
package com.rabbitmq.stream.impl;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
+import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.*;
@@ -27,6 +28,7 @@
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.codec.SimpleCodec;
import com.rabbitmq.stream.compression.Compression;
+import com.rabbitmq.stream.compression.DefaultCompressionCodecFactory;
import com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -42,8 +44,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToLongFunction;
-import java.util.stream.IntStream;
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
+import org.assertj.core.data.Offset;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -115,6 +117,8 @@ void init() {
when(env.clock()).thenReturn(clock);
when(env.codec()).thenReturn(new SimpleCodec());
when(env.observationCollector()).thenAnswer(invocation -> ObservationCollector.NO_OP);
+ DefaultCompressionCodecFactory ccf = new DefaultCompressionCodecFactory();
+ when(env.compressionCodecFactory()).thenReturn(ccf);
doAnswer(
(Answer)
invocationOnMock -> {
@@ -172,6 +176,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
"stream",
subEntrySize,
10,
+ StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH,
Compression.NONE,
Duration.ofMillis(100),
messageCount * 10,
@@ -181,26 +186,33 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
null,
env);
- IntStream.range(0, messageCount)
+ range(0, messageCount)
.forEach(
i ->
producer.send(
producer.messageBuilder().addData("".getBytes()).build(), confirmationHandler));
- IntStream.range(0, confirmedPart).forEach(publishingId -> producer.confirm(publishingId));
- assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed);
+ waitAtMost(() -> producer.unconfirmedCount() >= messageCount / subEntrySize);
+ range(0, confirmedPart).forEach(producer::confirm);
+ if (subEntrySize == 1) {
+ assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed);
+ } else {
+ assertThat(confirmedCount.get()).isCloseTo(confirmedCount.get(), Offset.offset(subEntrySize));
+ }
assertThat(erroredCount.get()).isZero();
+ int confirmedPreviously = confirmedCount.get();
executorService.scheduleAtFixedRate(() -> clock.refresh(), 100, 100, TimeUnit.MILLISECONDS);
Thread.sleep(waitTime.toMillis());
- assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed);
+ assertThat(confirmedCount.get()).isEqualTo(confirmedPreviously);
if (confirmTimeout.isZero()) {
assertThat(erroredCount.get()).isZero();
assertThat(responseCodes).isEmpty();
} else {
waitAtMost(
- waitTime.multipliedBy(2), () -> erroredCount.get() == (messageCount - expectedConfirmed));
+ waitTime.multipliedBy(2),
+ () -> erroredCount.get() == (messageCount - confirmedPreviously));
assertThat(responseCodes).hasSize(1).contains(Constants.CODE_PUBLISH_CONFIRM_TIMEOUT);
}
}
@@ -215,6 +227,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
"stream",
subEntrySize,
10,
+ StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH,
Compression.NONE,
Duration.ZERO,
2,
@@ -255,6 +268,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
"stream",
subEntrySize,
10,
+ StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH,
Compression.NONE,
Duration.ZERO,
2,
diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
index 9ab50aadf7..76c1418d36 100644
--- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
+++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -1138,6 +1138,10 @@ void down() {
this.latch.get().countDown();
}
+ void down(int count) {
+ IntStream.range(0, count).forEach(ignored -> this.latch.get().countDown());
+ }
+
boolean await(Duration timeout) {
try {
return this.latch.get().await(timeout.toMillis(), TimeUnit.MILLISECONDS);
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
index 6931d5f21a..4bec720537 100644
--- a/src/test/resources/logback-test.xml
+++ b/src/test/resources/logback-test.xml
@@ -6,7 +6,6 @@
-