diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 0af64477fa..7543f2f175 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -24,9 +24,17 @@ jobs: cache: 'maven' - name: Start broker run: ci/start-broker.sh - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem diff --git a/.github/workflows/test-rabbitmq-alphas.yml b/.github/workflows/test-rabbitmq-alphas.yml index 4409947e27..2a89ab39ff 100644 --- a/.github/workflows/test-rabbitmq-alphas.yml +++ b/.github/workflows/test-rabbitmq-alphas.yml @@ -32,11 +32,19 @@ jobs: run: ci/start-broker.sh env: RABBITMQ_IMAGE: ${{ matrix.rabbitmq-image }} - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - name: Stop broker run: docker stop rabbitmq && docker rm rabbitmq diff --git a/.github/workflows/test-supported-java-versions.yml b/.github/workflows/test-supported-java-versions.yml index 05983059c4..8e66bcb89d 100644 --- a/.github/workflows/test-supported-java-versions.yml +++ b/.github/workflows/test-supported-java-versions.yml @@ -33,12 +33,21 @@ jobs: run: ci/start-broker.sh - name: Display Java version run: ./mvnw --version - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \ -Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \ + -Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true - name: Stop broker run: docker stop rabbitmq && docker rm rabbitmq diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7081a78ee5..91a432ef39 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -33,12 +33,20 @@ jobs: gpg-passphrase: MAVEN_GPG_PASSPHRASE - name: Start broker run: ci/start-broker.sh - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - name: Stop broker run: docker stop rabbitmq && docker rm rabbitmq - name: Upload Codecov report diff --git a/pom.xml b/pom.xml index b4e60445f3..e8f3751d30 100644 --- a/pom.xml +++ b/pom.xml @@ -533,7 +533,7 @@ - + origin/main // Copyright (c) $YEAR Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 28eb063bc6..f7edd8fffd 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -455,6 +455,10 @@ blocking when the limit is reached. |Period to send a batch of messages. |100 ms +|`dynamicBatch` +|Adapt batch size depending on ingress rate. +|false + |`confirmTimeout` |[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal outstanding unconfirmed messages timed out. diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 5686a370cf..79bc9905d6 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -97,6 +97,28 @@ public interface ProducerBuilder { */ ProducerBuilder batchPublishingDelay(Duration batchPublishingDelay); + /** + * Adapt batch size depending on ingress rate. + * + *

A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive + * for sustained high ingress rates. + * + *

Set this flag to true if you want as little delay as possible before calling + * {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker. + * + *

Set this flag to false if latency is not critical for your use case and you + * want the highest throughput possible for both publishing and consuming. + * + *

Dynamic batch is not activated by default (dynamicBatch = false). + * + *

Dynamic batch is experimental. + * + * @param dynamicBatch + * @return this builder instance + * @since 0.20.0 + */ + ProducerBuilder dynamicBatch(boolean dynamicBatch); + /** * The maximum number of unconfirmed outbound messages. * diff --git a/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java b/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java new file mode 100644 index 0000000000..daf877f20b --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java @@ -0,0 +1,60 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ConcurrencyUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrencyUtils.class); + + private static final ThreadFactory THREAD_FACTORY; + + static { + if (isJava21OrMore()) { + LOGGER.debug("Running Java 21 or more, using virtual threads"); + Class builderClass = + Arrays.stream(Thread.class.getDeclaredClasses()) + .filter(c -> "Builder".equals(c.getSimpleName())) + .findFirst() + .get(); + // Reflection code is the same as: + // Thread.ofVirtual().factory(); + try { + Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null); + THREAD_FACTORY = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder); + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } else { + THREAD_FACTORY = Executors.defaultThreadFactory(); + } + } + + private ConcurrencyUtils() {} + + static ThreadFactory defaultThreadFactory() { + return THREAD_FACTORY; + } + + private static boolean isJava21OrMore() { + return Utils.versionCompare(System.getProperty("java.version"), "21.0") >= 0; + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java new file mode 100644 index 0000000000..ba52d23950 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -0,0 +1,121 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static java.lang.Math.max; +import static java.lang.Math.min; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DynamicBatch implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); + private static final int MIN_BATCH_SIZE = 32; + private static final int MAX_BATCH_SIZE = 8192; + + private final BlockingQueue requests = new LinkedBlockingQueue<>(); + private final BiPredicate, Boolean> consumer; + private final int configuredBatchSize; + private final Thread thread; + + DynamicBatch(BiPredicate, Boolean> consumer, int batchSize) { + this.consumer = consumer; + this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); + this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); + this.thread.start(); + } + + void add(T item) { + try { + requests.put(item); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void loop() { + State state = new State<>(); + state.batchSize = this.configuredBatchSize; + state.items = new ArrayList<>(state.batchSize); + state.retry = false; + Thread currentThread = Thread.currentThread(); + T item; + while (!currentThread.isInterrupted()) { + try { + item = this.requests.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + currentThread.interrupt(); + return; + } + if (item != null) { + state.items.add(item); + if (state.items.size() >= state.batchSize) { + this.maybeCompleteBatch(state, true); + } else { + item = this.requests.poll(); + if (item == null) { + this.maybeCompleteBatch(state, false); + } else { + state.items.add(item); + if (state.items.size() >= state.batchSize) { + this.maybeCompleteBatch(state, true); + } + } + } + } else { + this.maybeCompleteBatch(state, false); + } + } + } + + private static final class State { + + int batchSize; + List items; + boolean retry; + } + + private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { + try { + boolean completed = this.consumer.test(state.items, state.retry); + if (completed) { + if (increaseIfCompleted) { + state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE); + } else { + state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE); + } + state.items = new ArrayList<>(state.batchSize); + state.retry = false; + } else { + state.retry = true; + } + } catch (Exception e) { + LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage()); + state.retry = true; + } + } + + @Override + public void close() { + this.thread.interrupt(); + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java new file mode 100644 index 0000000000..2b8d6241e9 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -0,0 +1,152 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.ConfirmationHandler; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.ObservationCollector; +import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.compression.CompressionCodec; +import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; +import io.netty.buffer.ByteBufAllocator; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.ToLongFunction; + +final class DynamicBatchMessageAccumulator implements MessageAccumulator { + + private final DynamicBatch dynamicBatch; + private final ObservationCollector observationCollector; + private final StreamProducer producer; + private final ProducerUtils.MessageAccumulatorHelper helper; + + @SuppressWarnings("unchecked") + DynamicBatchMessageAccumulator( + int subEntrySize, + int batchSize, + Codec codec, + int maxFrameSize, + ToLongFunction publishSequenceFunction, + Function filterValueExtractor, + Clock clock, + String stream, + CompressionCodec compressionCodec, + ByteBufAllocator byteBufAllocator, + ObservationCollector observationCollector, + StreamProducer producer) { + this.helper = + new ProducerUtils.MessageAccumulatorHelper( + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + observationCollector); + this.producer = producer; + this.observationCollector = (ObservationCollector) observationCollector; + if (subEntrySize <= 1) { + this.dynamicBatch = + new DynamicBatch<>( + (items, replay) -> { + if (!replay) { + items.forEach( + i -> { + AccumulatedEntity entity = (AccumulatedEntity) i; + this.observationCollector.published( + entity.observationContext(), entity.confirmationCallback().message()); + }); + } + return this.publish(items); + }, + batchSize); + } else { + byte compressionCode = + compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); + this.dynamicBatch = + new DynamicBatch<>( + (items, replay) -> { + List subBatches = new ArrayList<>(); + int count = 0; + ProducerUtils.Batch batch = + this.helper.batch( + byteBufAllocator, compressionCode, compressionCodec, subEntrySize); + AccumulatedEntity lastMessageInBatch = null; + for (Object msg : items) { + AccumulatedEntity message = (AccumulatedEntity) msg; + if (!replay) { + this.observationCollector.published( + message.observationContext(), message.confirmationCallback().message()); + } + lastMessageInBatch = message; + batch.add( + (Codec.EncodedMessage) message.encodedEntity(), + message.confirmationCallback()); + count++; + if (count == subEntrySize) { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + subBatches.add(batch); + lastMessageInBatch = null; + batch = + this.helper.batch( + byteBufAllocator, compressionCode, compressionCodec, subEntrySize); + count = 0; + } + } + + if (!batch.isEmpty() && count < subEntrySize) { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + subBatches.add(batch); + } + return this.publish(subBatches); + }, + batchSize * subEntrySize); + } + } + + @Override + public void add(Message message, ConfirmationHandler confirmationHandler) { + this.dynamicBatch.add(helper.entity(message, confirmationHandler)); + } + + @Override + public int size() { + // TODO compute dynamic batch message accumulator pending message count + return 0; + } + + @Override + public void flush(boolean force) {} + + private boolean publish(List entities) { + if (this.producer.canSend()) { + this.producer.publishInternal(entities); + return true; + } else { + return false; + } + } + + @Override + public void close() { + this.dynamicBatch.close(); + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java index c35271f29f..80e3bb516e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -17,28 +17,14 @@ import com.rabbitmq.stream.ConfirmationHandler; import com.rabbitmq.stream.Message; -interface MessageAccumulator { +interface MessageAccumulator extends AutoCloseable { - boolean add(Message message, ConfirmationHandler confirmationHandler); - - AccumulatedEntity get(); - - boolean isEmpty(); + void add(Message message, ConfirmationHandler confirmationHandler); int size(); - interface AccumulatedEntity { - - long time(); - - long publishingId(); - - String filterValue(); - - Object encodedEntity(); - - StreamProducer.ConfirmationCallback confirmationCallback(); + void flush(boolean force); - Object observationContext(); - } + @Override + void close(); } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java new file mode 100644 index 0000000000..c74fa0102b --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -0,0 +1,322 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.*; +import com.rabbitmq.stream.compression.CompressionCodec; +import io.netty.buffer.ByteBufAllocator; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.ToLongFunction; + +final class ProducerUtils { + + private ProducerUtils() {} + + static MessageAccumulator createMessageAccumulator( + boolean dynamicBatch, + int subEntrySize, + int batchSize, + CompressionCodec compressionCodec, + Codec codec, + ByteBufAllocator byteBufAllocator, + int maxFrameSize, + ToLongFunction publishSequenceFunction, + Function filterValueExtractor, + Clock clock, + String stream, + ObservationCollector observationCollector, + StreamProducer producer) { + if (dynamicBatch) { + return new DynamicBatchMessageAccumulator( + subEntrySize, + batchSize, + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + compressionCodec, + byteBufAllocator, + observationCollector, + producer); + } else { + if (subEntrySize <= 1) { + return new SimpleMessageAccumulator( + batchSize, + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + observationCollector, + producer); + } else { + return new SubEntryMessageAccumulator( + subEntrySize, + batchSize, + compressionCodec, + codec, + byteBufAllocator, + maxFrameSize, + publishSequenceFunction, + clock, + stream, + observationCollector, + producer); + } + } + } + + interface ConfirmationCallback { + + int handle(boolean confirmed, short code); + + Message message(); + } + + interface AccumulatedEntity { + + long time(); + + long publishingId(); + + String filterValue(); + + Object encodedEntity(); + + ConfirmationCallback confirmationCallback(); + + Object observationContext(); + } + + static final class SimpleConfirmationCallback implements ConfirmationCallback { + + private final Message message; + private final ConfirmationHandler confirmationHandler; + + SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) { + this.message = message; + this.confirmationHandler = confirmationHandler; + } + + @Override + public int handle(boolean confirmed, short code) { + confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code)); + return 1; + } + + @Override + public Message message() { + return this.message; + } + } + + static final class SimpleAccumulatedEntity implements AccumulatedEntity { + + private final long time; + private final long publishingId; + private final String filterValue; + private final Codec.EncodedMessage encodedMessage; + private final ConfirmationCallback confirmationCallback; + private final Object observationContext; + + SimpleAccumulatedEntity( + long time, + long publishingId, + String filterValue, + Codec.EncodedMessage encodedMessage, + ConfirmationCallback confirmationCallback, + Object observationContext) { + this.time = time; + this.publishingId = publishingId; + this.encodedMessage = encodedMessage; + this.filterValue = filterValue; + this.confirmationCallback = confirmationCallback; + this.observationContext = observationContext; + } + + @Override + public long publishingId() { + return publishingId; + } + + @Override + public String filterValue() { + return filterValue; + } + + @Override + public Object encodedEntity() { + return encodedMessage; + } + + @Override + public long time() { + return time; + } + + @Override + public ConfirmationCallback confirmationCallback() { + return confirmationCallback; + } + + @Override + public Object observationContext() { + return this.observationContext; + } + } + + static final class CompositeConfirmationCallback implements ConfirmationCallback { + + private final List callbacks; + + CompositeConfirmationCallback(List callbacks) { + this.callbacks = callbacks; + } + + private void add(ConfirmationCallback confirmationCallback) { + this.callbacks.add(confirmationCallback); + } + + @Override + public int handle(boolean confirmed, short code) { + for (ConfirmationCallback callback : callbacks) { + callback.handle(confirmed, code); + } + return callbacks.size(); + } + + @Override + public Message message() { + throw new UnsupportedOperationException( + "composite confirmation callback does not contain just one message"); + } + } + + static final class Batch implements AccumulatedEntity { + + final Client.EncodedMessageBatch encodedMessageBatch; + private final CompositeConfirmationCallback confirmationCallback; + volatile long publishingId; + volatile long time; + + Batch( + Client.EncodedMessageBatch encodedMessageBatch, + CompositeConfirmationCallback confirmationCallback) { + this.encodedMessageBatch = encodedMessageBatch; + this.confirmationCallback = confirmationCallback; + } + + void add(Codec.EncodedMessage encodedMessage, ConfirmationCallback confirmationCallback) { + this.encodedMessageBatch.add(encodedMessage); + this.confirmationCallback.add(confirmationCallback); + } + + boolean isEmpty() { + return this.confirmationCallback.callbacks.isEmpty(); + } + + @Override + public long publishingId() { + return publishingId; + } + + @Override + public String filterValue() { + return null; + } + + @Override + public Object encodedEntity() { + return encodedMessageBatch; + } + + @Override + public long time() { + return time; + } + + @Override + public ConfirmationCallback confirmationCallback() { + return confirmationCallback; + } + + @Override + public Object observationContext() { + throw new UnsupportedOperationException( + "batch entity does not contain only one observation context"); + } + } + + static final class MessageAccumulatorHelper { + + private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; + + private final ObservationCollector observationCollector; + private final ToLongFunction publishSequenceFunction; + private final String stream; + private final Codec codec; + private final int maxFrameSize; + private final Clock clock; + private final Function filterValueExtractor; + + @SuppressWarnings("unchecked") + MessageAccumulatorHelper( + Codec codec, + int maxFrameSize, + ToLongFunction publishSequenceFunction, + Function filterValueExtractor, + Clock clock, + String stream, + ObservationCollector observationCollector) { + this.publishSequenceFunction = publishSequenceFunction; + this.codec = codec; + this.clock = clock; + this.maxFrameSize = maxFrameSize; + this.filterValueExtractor = + filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; + this.observationCollector = (ObservationCollector) observationCollector; + this.stream = stream; + } + + AccumulatedEntity entity(Message message, ConfirmationHandler confirmationHandler) { + Object observationContext = this.observationCollector.prePublish(this.stream, message); + Codec.EncodedMessage encodedMessage = this.codec.encode(message); + Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); + long publishingId = this.publishSequenceFunction.applyAsLong(message); + return new ProducerUtils.SimpleAccumulatedEntity( + this.clock.time(), + publishingId, + this.filterValueExtractor.apply(message), + this.codec.encode(message), + new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), + observationContext); + } + + Batch batch( + ByteBufAllocator bba, + byte compressionCode, + CompressionCodec compressionCodec, + int subEntrySize) { + return new ProducerUtils.Batch( + Client.EncodedMessageBatch.create(bba, compressionCode, compressionCodec, subEntrySize), + new ProducerUtils.CompositeConfirmationCallback(new ArrayList<>(subEntrySize))); + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index 718f253da7..97e99e4b9f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -15,6 +15,9 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.*; +import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -23,17 +26,11 @@ class SimpleMessageAccumulator implements MessageAccumulator { - private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; - protected final BlockingQueue messages; - protected final Clock clock; private final int capacity; - protected final Codec codec; - private final int maxFrameSize; - private final ToLongFunction publishSequenceFunction; - private final Function filterValueExtractor; - final String stream; final ObservationCollector observationCollector; + private final StreamProducer producer; + final ProducerUtils.MessageAccumulatorHelper helper; @SuppressWarnings("unchecked") SimpleMessageAccumulator( @@ -44,47 +41,39 @@ class SimpleMessageAccumulator implements MessageAccumulator { Function filterValueExtractor, Clock clock, String stream, - ObservationCollector observationCollector) { + ObservationCollector observationCollector, + StreamProducer producer) { + this.helper = + new ProducerUtils.MessageAccumulatorHelper( + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + observationCollector); this.capacity = capacity; - this.messages = new LinkedBlockingQueue<>(capacity); - this.codec = codec; - this.maxFrameSize = maxFrameSize; - this.publishSequenceFunction = publishSequenceFunction; - this.filterValueExtractor = - filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; - this.clock = clock; - this.stream = stream; + this.messages = new LinkedBlockingQueue<>(this.capacity); this.observationCollector = (ObservationCollector) observationCollector; + this.producer = producer; } - public boolean add(Message message, ConfirmationHandler confirmationHandler) { - Object observationContext = this.observationCollector.prePublish(this.stream, message); - Codec.EncodedMessage encodedMessage = this.codec.encode(message); - Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); - long publishingId = this.publishSequenceFunction.applyAsLong(message); + public void add(Message message, ConfirmationHandler confirmationHandler) { + AccumulatedEntity entity = this.helper.entity(message, confirmationHandler); try { - boolean offered = - messages.offer( - new SimpleAccumulatedEntity( - clock.time(), - publishingId, - this.filterValueExtractor.apply(message), - encodedMessage, - new SimpleConfirmationCallback(message, confirmationHandler), - observationContext), - 60, - TimeUnit.SECONDS); + boolean offered = messages.offer(entity, 60, TimeUnit.SECONDS); if (!offered) { throw new StreamException("Could not accumulate outbound message"); } } catch (InterruptedException e) { throw new StreamException("Error while accumulating outbound message", e); } - return this.messages.size() == this.capacity; + if (this.messages.size() == this.capacity) { + publishBatch(true); + } } - @Override - public AccumulatedEntity get() { + AccumulatedEntity get() { AccumulatedEntity entity = this.messages.poll(); if (entity != null) { this.observationCollector.published( @@ -93,91 +82,38 @@ public AccumulatedEntity get() { return entity; } - @Override - public boolean isEmpty() { - return messages.isEmpty(); - } - @Override public int size() { return messages.size(); } - private static final class SimpleAccumulatedEntity implements AccumulatedEntity { - - private final long time; - private final long publishingId; - private final String filterValue; - private final Codec.EncodedMessage encodedMessage; - private final StreamProducer.ConfirmationCallback confirmationCallback; - private final Object observationContext; - - private SimpleAccumulatedEntity( - long time, - long publishingId, - String filterValue, - Codec.EncodedMessage encodedMessage, - StreamProducer.ConfirmationCallback confirmationCallback, - Object observationContext) { - this.time = time; - this.publishingId = publishingId; - this.encodedMessage = encodedMessage; - this.filterValue = filterValue; - this.confirmationCallback = confirmationCallback; - this.observationContext = observationContext; - } - - @Override - public long publishingId() { - return publishingId; - } - - @Override - public String filterValue() { - return filterValue; - } - - @Override - public Object encodedEntity() { - return encodedMessage; - } - - @Override - public long time() { - return time; - } - - @Override - public StreamProducer.ConfirmationCallback confirmationCallback() { - return confirmationCallback; - } - - @Override - public Object observationContext() { - return this.observationContext; - } + @Override + public void flush(boolean force) { + boolean stateCheck = !force; + publishBatch(stateCheck); } - private static final class SimpleConfirmationCallback - implements StreamProducer.ConfirmationCallback { - - private final Message message; - private final ConfirmationHandler confirmationHandler; - - private SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) { - this.message = message; - this.confirmationHandler = confirmationHandler; - } - - @Override - public int handle(boolean confirmed, short code) { - confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code)); - return 1; - } - - @Override - public Message message() { - return this.message; + private void publishBatch(boolean stateCheck) { + this.producer.lock(); + try { + if ((!stateCheck || this.producer.canSend()) && !this.messages.isEmpty()) { + List entities = new ArrayList<>(this.capacity); + int batchCount = 0; + while (batchCount != this.capacity) { + AccumulatedEntity entity = this.get(); + if (entity == null) { + break; + } + entities.add(entity); + batchCount++; + } + producer.publishInternal(entities); + } + } finally { + this.producer.unlock(); } } + + @Override + public void close() {} } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 4050fd0af7..e7e67be0be 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -18,17 +18,11 @@ import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedRunnable; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.ConfirmationHandler; -import com.rabbitmq.stream.ConfirmationStatus; -import com.rabbitmq.stream.Constants; -import com.rabbitmq.stream.Message; -import com.rabbitmq.stream.MessageBuilder; -import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.compression.CompressionCodec; import com.rabbitmq.stream.impl.Client.Response; -import com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity; +import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBuf; import java.nio.charset.StandardCharsets; @@ -49,6 +43,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.ToLongFunction; import org.slf4j.Logger; @@ -84,6 +80,7 @@ class StreamProducer implements Producer { private volatile Status status; private volatile ScheduledFuture confirmTimeoutFuture; private final short publishVersion; + private final Lock lock = new ReentrantLock(); @SuppressFBWarnings("CT_CONSTRUCTOR_THROW") StreamProducer( @@ -91,6 +88,7 @@ class StreamProducer implements Producer { String stream, int subEntrySize, int batchSize, + boolean dynamicBatch, Compression compression, Duration batchPublishingDelay, int maxUnconfirmedMessages, @@ -122,37 +120,14 @@ class StreamProducer implements Producer { return publishingSequence.getAndIncrement(); } }; + if (subEntrySize <= 1) { - this.accumulator = - new SimpleMessageAccumulator( - batchSize, - environment.codec(), - client.maxFrameSize(), - accumulatorPublishSequenceFunction, - filterValueExtractor, - this.environment.clock(), - stream, - this.environment.observationCollector()); if (filterValueExtractor == null) { delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; } else { delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK; } } else { - this.accumulator = - new SubEntryMessageAccumulator( - subEntrySize, - batchSize, - compression == Compression.NONE - ? null - : environment.compressionCodecFactory().get(compression), - environment.codec(), - this.environment.byteBufAllocator(), - client.maxFrameSize(), - accumulatorPublishSequenceFunction, - this.environment.clock(), - stream, - environment.observationCollector()); delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK; } @@ -166,8 +141,7 @@ class StreamProducer implements Producer { new Client.OutboundEntityWriteCallback() { @Override public int write(ByteBuf bb, Object entity, long publishingId) { - MessageAccumulator.AccumulatedEntity accumulatedEntity = - (MessageAccumulator.AccumulatedEntity) entity; + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; unconfirmedMessages.put(publishingId, accumulatedEntity); return delegateWriteCallback.write( bb, accumulatedEntity.encodedEntity(), publishingId); @@ -176,7 +150,7 @@ public int write(ByteBuf bb, Object entity, long publishingId) { @Override public int fragmentLength(Object entity) { return delegateWriteCallback.fragmentLength( - ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity()); + ((AccumulatedEntity) entity).encodedEntity()); } }; } else { @@ -185,8 +159,7 @@ public int fragmentLength(Object entity) { new Client.OutboundEntityWriteCallback() { @Override public int write(ByteBuf bb, Object entity, long publishingId) { - MessageAccumulator.AccumulatedEntity accumulatedEntity = - (MessageAccumulator.AccumulatedEntity) entity; + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; unconfirmedMessages.put(publishingId, accumulatedEntity); return delegateWriteCallback.write(bb, accumulatedEntity, publishingId); } @@ -198,14 +171,36 @@ public int fragmentLength(Object entity) { }; } - if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) { + CompressionCodec compressionCodec = null; + if (compression != null) { + compressionCodec = environment.compressionCodecFactory().get(compression); + } + this.accumulator = + ProducerUtils.createMessageAccumulator( + dynamicBatch, + subEntrySize, + batchSize, + compressionCodec, + environment.codec(), + environment.byteBufAllocator(), + client.maxFrameSize(), + accumulatorPublishSequenceFunction, + filterValueExtractor, + environment.clock(), + stream, + environment.observationCollector(), + this); + + boolean backgroundBatchPublishingTaskRequired = + !dynamicBatch && batchPublishingDelay.toMillis() > 0; + LOGGER.debug( + "Background batch publishing task required? {}", backgroundBatchPublishingTaskRequired); + if (backgroundBatchPublishingTaskRequired) { AtomicReference taskReference = new AtomicReference<>(); Runnable task = () -> { if (canSend()) { - synchronized (StreamProducer.this) { - publishBatch(true); - } + this.accumulator.flush(false); } if (status != Status.CLOSED) { environment @@ -289,7 +284,7 @@ private Runnable confirmTimeoutTask(Duration confirmTimeout) { error(unconfirmedEntry.getKey(), Constants.CODE_PUBLISH_CONFIRM_TIMEOUT); count++; } else { - // everything else is after, so we can stop + // everything else is after, we can stop break; } } @@ -318,8 +313,10 @@ private long computeFirstValueOfPublishingSequence() { } } + // visible for testing void confirm(long publishingId) { AccumulatedEntity accumulatedEntity = this.unconfirmedMessages.remove(publishingId); + if (accumulatedEntity != null) { int confirmedCount = accumulatedEntity.confirmationCallback().handle(true, Constants.RESPONSE_CODE_OK); @@ -329,6 +326,11 @@ void confirm(long publishingId) { } } + // for testing + int unconfirmedCount() { + return this.unconfirmedMessages.size(); + } + void error(long publishingId, short errorCode) { AccumulatedEntity accumulatedEntity = unconfirmedMessages.remove(publishingId); if (accumulatedEntity != null) { @@ -397,11 +399,7 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { private void doSend(Message message, ConfirmationHandler confirmationHandler) { if (canSend()) { - if (accumulator.add(message, confirmationHandler)) { - synchronized (this) { - publishBatch(true); - } - } + this.accumulator.add(message, confirmationHandler); } else { failPublishing(message, confirmationHandler); } @@ -419,7 +417,7 @@ private void failPublishing(Message message, ConfirmationHandler confirmationHan } } - private boolean canSend() { + boolean canSend() { return this.status == Status.RUNNING; } @@ -445,6 +443,7 @@ public void close() { } void closeFromEnvironment() { + this.accumulator.close(); this.closingCallback.run(); cancelConfirmTimeoutTask(); this.closed.set(true); @@ -476,25 +475,13 @@ private void cancelConfirmTimeoutTask() { } } - private void publishBatch(boolean stateCheck) { - if ((!stateCheck || canSend()) && !accumulator.isEmpty()) { - List messages = new ArrayList<>(this.batchSize); - int batchCount = 0; - while (batchCount != this.batchSize) { - Object accMessage = accumulator.get(); - if (accMessage == null) { - break; - } - messages.add(accMessage); - batchCount++; - } - client.publishInternal( - this.publishVersion, - this.publisherId, - messages, - this.writeCallback, - this.publishSequenceFunction); - } + void publishInternal(List messages) { + client.publishInternal( + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); } boolean isOpen() { @@ -506,76 +493,80 @@ void unavailable() { } void running() { - synchronized (this) { - LOGGER.debug( - "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", - this.unconfirmedMessages.size(), - this.accumulator.size()); - if (this.retryOnRecovery) { - LOGGER.debug("Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size()); - if (!this.unconfirmedMessages.isEmpty()) { - Map messagesToResend = new TreeMap<>(this.unconfirmedMessages); - this.unconfirmedMessages.clear(); - Iterator> resendIterator = - messagesToResend.entrySet().iterator(); - while (resendIterator.hasNext()) { - List messages = new ArrayList<>(this.batchSize); - int batchCount = 0; - while (batchCount != this.batchSize) { - Object accMessage = - resendIterator.hasNext() ? resendIterator.next().getValue() : null; - if (accMessage == null) { - break; + this.executeInLock( + () -> { + LOGGER.debug( + "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", + this.unconfirmedMessages.size(), + this.accumulator.size()); + if (this.retryOnRecovery) { + LOGGER.debug( + "Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size()); + if (!this.unconfirmedMessages.isEmpty()) { + Map messagesToResend = + new TreeMap<>(this.unconfirmedMessages); + this.unconfirmedMessages.clear(); + Iterator> resendIterator = + messagesToResend.entrySet().iterator(); + while (resendIterator.hasNext()) { + List messages = new ArrayList<>(this.batchSize); + int batchCount = 0; + while (batchCount != this.batchSize) { + Object accMessage = + resendIterator.hasNext() ? resendIterator.next().getValue() : null; + if (accMessage == null) { + break; + } + messages.add(accMessage); + batchCount++; + } + client.publishInternal( + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); + } + } + } else { + LOGGER.debug( + "Skipping republishing of {} unconfirmed messages", + this.unconfirmedMessages.size()); + Map messagesToFail = new TreeMap<>(this.unconfirmedMessages); + this.unconfirmedMessages.clear(); + for (AccumulatedEntity accumulatedEntity : messagesToFail.values()) { + try { + int permits = + accumulatedEntity + .confirmationCallback() + .handle(false, CODE_PUBLISH_CONFIRM_TIMEOUT); + this.unconfirmedMessagesSemaphore.release(permits); + } catch (Exception e) { + LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage()); + this.unconfirmedMessagesSemaphore.release(1); } - messages.add(accMessage); - batchCount++; } - client.publishInternal( - this.publishVersion, - this.publisherId, - messages, - this.writeCallback, - this.publishSequenceFunction); } - } - } else { - LOGGER.debug( - "Skipping republishing of {} unconfirmed messages", this.unconfirmedMessages.size()); - Map messagesToFail = new TreeMap<>(this.unconfirmedMessages); - this.unconfirmedMessages.clear(); - for (AccumulatedEntity accumulatedEntity : messagesToFail.values()) { - try { - int permits = - accumulatedEntity - .confirmationCallback() - .handle(false, CODE_PUBLISH_CONFIRM_TIMEOUT); - this.unconfirmedMessagesSemaphore.release(permits); - } catch (Exception e) { - LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage()); - this.unconfirmedMessagesSemaphore.release(1); + this.accumulator.flush(true); + int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); + if (toRelease > 0) { + unconfirmedMessagesSemaphore.release(toRelease); + if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) { + LOGGER.debug( + "Could not acquire {} permit(s) for message republishing", + this.unconfirmedMessages.size()); + } } - } - } - publishBatch(false); - int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); - if (toRelease > 0) { - unconfirmedMessagesSemaphore.release(toRelease); - if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) { - LOGGER.debug( - "Could not acquire {} permit(s) for message republishing", - this.unconfirmedMessages.size()); - } - } - } + }); this.status = Status.RUNNING; } - synchronized void setClient(Client client) { - this.client = client; + void setClient(Client client) { + this.executeInLock(() -> this.client = client); } - synchronized void setPublisherId(byte publisherId) { - this.publisherId = publisherId; + void setPublisherId(byte publisherId) { + this.executeInLock(() -> this.publisherId = publisherId); } Status status() { @@ -588,13 +579,6 @@ enum Status { CLOSED } - interface ConfirmationCallback { - - int handle(boolean confirmed, short code); - - Message message(); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -668,4 +652,21 @@ public int fragmentLength(Object entity) { } } } + + void lock() { + this.lock.lock(); + } + + void unlock() { + this.lock.unlock(); + } + + private void executeInLock(Runnable action) { + this.lock(); + try { + action.run(); + } finally { + this.unlock(); + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 493426c67f..54807489e2 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -21,12 +21,16 @@ import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.compression.Compression; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.time.Duration; import java.util.function.Function; import java.util.function.ToIntFunction; class StreamProducerBuilder implements ProducerBuilder { + static final boolean DEFAULT_DYNAMIC_BATCH = + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false")); + private final StreamEnvironment environment; private String name; @@ -53,6 +57,8 @@ class StreamProducerBuilder implements ProducerBuilder { private Function filterValueExtractor; + private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH; + StreamProducerBuilder(StreamEnvironment environment) { this.environment = environment; } @@ -97,11 +103,18 @@ public ProducerBuilder compression(Compression compression) { return this; } + @Override public StreamProducerBuilder batchPublishingDelay(Duration batchPublishingDelay) { this.batchPublishingDelay = batchPublishingDelay; return this; } + @Override + public ProducerBuilder dynamicBatch(boolean dynamicBatch) { + this.dynamicBatch = dynamicBatch; + return this; + } + @Override public ProducerBuilder maxUnconfirmedMessages(int maxUnconfirmedMessages) { if (maxUnconfirmedMessages <= 0) { @@ -198,6 +211,7 @@ public Producer build() { stream, subEntrySize, batchSize, + dynamicBatch, compression, batchPublishingDelay, maxUnconfirmedMessages, @@ -229,11 +243,13 @@ public Producer build() { StreamProducerBuilder duplicate() { StreamProducerBuilder duplicate = new StreamProducerBuilder(this.environment); for (Field field : StreamProducerBuilder.class.getDeclaredFields()) { - field.setAccessible(true); - try { - field.set(duplicate, field.get(this)); - } catch (IllegalAccessException e) { - throw new StreamException("Error while duplicating stream producer builder", e); + if (!Modifier.isStatic(field.getModifiers())) { + field.setAccessible(true); + try { + field.set(duplicate, field.get(this)); + } catch (IllegalAccessException e) { + throw new StreamException("Error while duplicating stream producer builder", e); + } } } return duplicate; diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index 9693aea50c..14645074e6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -20,18 +20,15 @@ import com.rabbitmq.stream.ObservationCollector; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.compression.CompressionCodec; -import com.rabbitmq.stream.impl.Client.EncodedMessageBatch; import io.netty.buffer.ByteBufAllocator; -import java.util.ArrayList; -import java.util.List; import java.util.function.ToLongFunction; -class SubEntryMessageAccumulator extends SimpleMessageAccumulator { +final class SubEntryMessageAccumulator extends SimpleMessageAccumulator { private final int subEntrySize; private final CompressionCodec compressionCodec; private final ByteBufAllocator byteBufAllocator; - private final byte compression; + private final byte compressionCode; public SubEntryMessageAccumulator( int subEntrySize, @@ -43,7 +40,8 @@ public SubEntryMessageAccumulator( ToLongFunction publishSequenceFunction, Clock clock, String stream, - ObservationCollector observationCollector) { + ObservationCollector observationCollector, + StreamProducer producer) { super( subEntrySize * batchSize, codec, @@ -52,30 +50,30 @@ public SubEntryMessageAccumulator( null, clock, stream, - observationCollector); + observationCollector, + producer); this.subEntrySize = subEntrySize; this.compressionCodec = compressionCodec; - this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); + this.compressionCode = + compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); this.byteBufAllocator = byteBufAllocator; } - private Batch createBatch() { - return new Batch( - EncodedMessageBatch.create( - byteBufAllocator, compression, compressionCodec, this.subEntrySize), - new CompositeConfirmationCallback(new ArrayList<>(this.subEntrySize))); + private ProducerUtils.Batch createBatch() { + return this.helper.batch( + this.byteBufAllocator, this.compressionCode, this.compressionCodec, this.subEntrySize); } @Override - public AccumulatedEntity get() { + protected ProducerUtils.AccumulatedEntity get() { if (this.messages.isEmpty()) { return null; } int count = 0; - Batch batch = createBatch(); - AccumulatedEntity lastMessageInBatch = null; + ProducerUtils.Batch batch = this.createBatch(); + ProducerUtils.AccumulatedEntity lastMessageInBatch = null; while (count != this.subEntrySize) { - AccumulatedEntity message = messages.poll(); + ProducerUtils.AccumulatedEntity message = messages.poll(); if (message == null) { break; } @@ -94,89 +92,4 @@ public AccumulatedEntity get() { return batch; } } - - private static class Batch implements AccumulatedEntity { - - private final EncodedMessageBatch encodedMessageBatch; - private final CompositeConfirmationCallback confirmationCallback; - private volatile long publishingId; - private volatile long time; - - private Batch( - EncodedMessageBatch encodedMessageBatch, - CompositeConfirmationCallback confirmationCallback) { - this.encodedMessageBatch = encodedMessageBatch; - this.confirmationCallback = confirmationCallback; - } - - void add( - Codec.EncodedMessage encodedMessage, - StreamProducer.ConfirmationCallback confirmationCallback) { - this.encodedMessageBatch.add(encodedMessage); - this.confirmationCallback.add(confirmationCallback); - } - - boolean isEmpty() { - return this.confirmationCallback.callbacks.isEmpty(); - } - - @Override - public long publishingId() { - return publishingId; - } - - @Override - public String filterValue() { - return null; - } - - @Override - public Object encodedEntity() { - return encodedMessageBatch; - } - - @Override - public long time() { - return time; - } - - @Override - public StreamProducer.ConfirmationCallback confirmationCallback() { - return confirmationCallback; - } - - @Override - public Object observationContext() { - throw new UnsupportedOperationException( - "batch entity does not contain only one observation context"); - } - } - - private static class CompositeConfirmationCallback - implements StreamProducer.ConfirmationCallback { - - private final List callbacks; - - private CompositeConfirmationCallback(List callbacks) { - this.callbacks = callbacks; - } - - private void add(StreamProducer.ConfirmationCallback confirmationCallback) { - this.callbacks.add(confirmationCallback); - } - - @Override - public int handle(boolean confirmed, short code) { - for (StreamProducer.ConfirmationCallback callback : callbacks) { - callback.handle(confirmed, code); - } - return callbacks.size(); - } - - @Override - public Message message() { - throw new UnsupportedOperationException( - "composite confirmation callback does not contain just one message"); - } - } } diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java new file mode 100644 index 0000000000..2049915457 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -0,0 +1,71 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.google.common.util.concurrent.RateLimiter; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; + +public class DynamicBatchTest { + + @Test + void test() { + MetricRegistry metrics = new MetricRegistry(); + Histogram batchSizeMetrics = metrics.histogram("batch-size"); + ConsoleReporter reporter = + ConsoleReporter.forRegistry(metrics) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + + int itemCount = 3000; + TestUtils.Sync sync = TestUtils.sync(itemCount); + Random random = new Random(); + DynamicBatch batch = + new DynamicBatch<>( + (items, replay) -> { + batchSizeMetrics.update(items.size()); + try { + Thread.sleep(random.nextInt(10) + 1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + sync.down(items.size()); + return true; + }, + 100); + try { + RateLimiter rateLimiter = RateLimiter.create(3000); + long start = System.nanoTime(); + IntStream.range(0, itemCount) + .forEach( + i -> { + rateLimiter.acquire(); + batch.add(String.valueOf(i)); + }); + Assertions.assertThat(sync).completes(); + long end = System.nanoTime(); + // System.out.println("Done in " + Duration.ofNanos(end - start)); + // reporter.report(); + } finally { + batch.close(); + } + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 6cf06356d0..e900fb4990 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -14,19 +14,17 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static com.rabbitmq.stream.impl.TestUtils.localhost; -import static com.rabbitmq.stream.impl.TestUtils.streamName; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.impl.Assertions.assertThat; +import static com.rabbitmq.stream.impl.TestUtils.*; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import ch.qos.logback.classic.Level; import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducerInfo; import com.rabbitmq.stream.impl.StreamProducer.Status; +import com.rabbitmq.stream.impl.TestUtils.Sync; import io.netty.channel.ChannelOption; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoopGroup; @@ -94,7 +92,7 @@ void tearDown() { void send() throws Exception { int batchSize = 10; int messageCount = 10 * batchSize + 1; // don't want a multiple of batch size - CountDownLatch publishLatch = new CountDownLatch(messageCount); + Sync confirmSync = sync(messageCount); Producer producer = environment.producerBuilder().stream(stream).batchSize(batchSize).build(); AtomicLong count = new AtomicLong(0); AtomicLong sequence = new AtomicLong(0); @@ -117,13 +115,12 @@ void send() throws Exception { idsConfirmed.add( confirmationStatus.getMessage().getProperties().getMessageIdAsLong()); count.incrementAndGet(); - publishLatch.countDown(); + confirmSync.down(); }); }); - boolean completed = publishLatch.await(10, TimeUnit.SECONDS); + assertThat(confirmSync).completes(); assertThat(idsSent).hasSameSizeAs(idsConfirmed); idsSent.forEach(idSent -> assertThat(idsConfirmed).contains(idSent)); - assertThat(completed).isTrue(); ProducerInfo info = MonitoringTestUtils.extract(producer); assertThat(info.getId()).isGreaterThanOrEqualTo(0); @@ -349,63 +346,59 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception { @ParameterizedTest @ValueSource(ints = {1, 7}) void producerShouldBeClosedWhenStreamIsDeleted(int subEntrySize, TestInfo info) throws Exception { - Level initialLogLevel = TestUtils.newLoggerLevel(ProducersCoordinator.class, Level.DEBUG); - try { - String s = streamName(info); - environment.streamCreator().stream(s).create(); - - StreamProducer producer = - (StreamProducer) - environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build(); - - AtomicInteger published = new AtomicInteger(0); - AtomicInteger confirmed = new AtomicInteger(0); - AtomicInteger errored = new AtomicInteger(0); - Set errorCodes = ConcurrentHashMap.newKeySet(); - - AtomicBoolean continuePublishing = new AtomicBoolean(true); - Thread publishThread = - new Thread( - () -> { - ConfirmationHandler confirmationHandler = - confirmationStatus -> { - if (confirmationStatus.isConfirmed()) { - confirmed.incrementAndGet(); - } else { - errored.incrementAndGet(); - errorCodes.add(confirmationStatus.getCode()); - } - }; - while (continuePublishing.get()) { - try { - producer.send( - producer - .messageBuilder() - .addData("".getBytes(StandardCharsets.UTF_8)) - .build(), - confirmationHandler); - published.incrementAndGet(); - } catch (StreamException e) { - // OK - } + String s = streamName(info); + environment.streamCreator().stream(s).create(); + + StreamProducer producer = + (StreamProducer) environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build(); + + AtomicInteger published = new AtomicInteger(0); + AtomicInteger confirmed = new AtomicInteger(0); + AtomicInteger errored = new AtomicInteger(0); + Set errorCodes = ConcurrentHashMap.newKeySet(); + + AtomicBoolean continuePublishing = new AtomicBoolean(true); + Thread publishThread = + new Thread( + () -> { + ConfirmationHandler confirmationHandler = + confirmationStatus -> { + if (confirmationStatus.isConfirmed()) { + confirmed.incrementAndGet(); + } else { + errored.incrementAndGet(); + errorCodes.add(confirmationStatus.getCode()); + } + }; + while (continuePublishing.get()) { + try { + producer.send( + producer + .messageBuilder() + .addData("".getBytes(StandardCharsets.UTF_8)) + .build(), + confirmationHandler); + published.incrementAndGet(); + } catch (StreamException e) { + // OK } - }); - publishThread.start(); + } + }); + publishThread.start(); - Thread.sleep(1000L); + waitAtMost(() -> confirmed.get() > 100); + int confirmedNow = confirmed.get(); + waitAtMost(() -> confirmed.get() > confirmedNow + 1000); - assertThat(producer.isOpen()).isTrue(); + assertThat(producer.isOpen()).isTrue(); - environment.deleteStream(s); + environment.deleteStream(s); - waitAtMost(() -> !producer.isOpen()); - continuePublishing.set(false); - waitAtMost( - () -> !errorCodes.isEmpty(), - () -> "The producer should have received negative publish confirms"); - } finally { - TestUtils.newLoggerLevel(ProducersCoordinator.class, initialLogLevel); - } + waitAtMost(() -> !producer.isOpen()); + continuePublishing.set(false); + waitAtMost( + () -> !errorCodes.isEmpty(), + () -> "The producer should have received negative publish confirms"); } @ParameterizedTest @@ -415,15 +408,14 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) int firstWaveLineCount = lineCount / 5; int backwardCount = firstWaveLineCount / 10; SortedSet document = new TreeSet<>(); - IntStream.range(0, lineCount).forEach(i -> document.add(i)); + IntStream.range(0, lineCount).forEach(document::add); Producer producer = environment.producerBuilder().name("producer-1").stream(stream) .subEntrySize(subEntrySize) .build(); - AtomicReference latch = - new AtomicReference<>(new CountDownLatch(firstWaveLineCount)); - ConfirmationHandler confirmationHandler = confirmationStatus -> latch.get().countDown(); + Sync confirmSync = sync(firstWaveLineCount); + ConfirmationHandler confirmationHandler = confirmationStatus -> confirmSync.down(); Consumer publishMessage = i -> producer.send( @@ -433,15 +425,17 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) .addData(String.valueOf(i).getBytes()) .build(), confirmationHandler); + // publish the first wave document.headSet(firstWaveLineCount).forEach(publishMessage); - assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(confirmSync).completes(); - latch.set(new CountDownLatch(lineCount - firstWaveLineCount + backwardCount)); + confirmSync.reset(lineCount - firstWaveLineCount + backwardCount); + // publish the rest, but with some overlap from the first wave document.tailSet(firstWaveLineCount - backwardCount).forEach(publishMessage); - assertThat(latch.get().await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(confirmSync).completes(); CountDownLatch consumeLatch = new CountDownLatch(lineCount); AtomicInteger consumed = new AtomicInteger(); @@ -453,14 +447,17 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) consumeLatch.countDown(); }) .build(); - assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); - Thread.sleep(1000); - // if we are using sub-entries, we cannot avoid duplicates. - // here, a sub-entry in the second wave, right at the end of the re-submitted - // values will contain those duplicates, because its publishing ID will be - // the one of its last message, so the server will accept the whole sub-entry, - // including the duplicates. - assertThat(consumed.get()).isEqualTo(lineCount + backwardCount % subEntrySize); + assertThat(consumeLatch.await(5, TimeUnit.SECONDS)).isTrue(); + if (subEntrySize == 1) { + assertThat(consumed.get()).isEqualTo(lineCount); + } else { + // if we are using sub-entries, we cannot avoid duplicates. + // here, a sub-entry in the second wave, right at the end of the re-submitted + // values will contain those duplicates, because its publishing ID will be + // the one of its last message, so the server will accept the whole sub-entry, + // including the duplicates. + assertThat(consumed.get()).isBetween(lineCount, lineCount + subEntrySize); + } } @ParameterizedTest @@ -636,11 +633,10 @@ void subEntryBatchesSentCompressedShouldBeConsumedProperly() { } @Test - void methodsShouldThrowExceptionWhenProducerIsClosed() throws InterruptedException { + void methodsShouldThrowExceptionWhenProducerIsClosed() { Producer producer = environment.producerBuilder().stream(stream).build(); producer.close(); - assertThatThrownBy(() -> producer.getLastPublishingId()) - .isInstanceOf(IllegalStateException.class); + assertThatThrownBy(producer::getLastPublishingId).isInstanceOf(IllegalStateException.class); } @Test diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 28c87bfbc2..19390e6877 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2021-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -15,6 +15,7 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.*; @@ -27,6 +28,7 @@ import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.codec.SimpleCodec; import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.compression.DefaultCompressionCodecFactory; import com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -42,8 +44,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.ToLongFunction; -import java.util.stream.IntStream; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; +import org.assertj.core.data.Offset; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -115,6 +117,8 @@ void init() { when(env.clock()).thenReturn(clock); when(env.codec()).thenReturn(new SimpleCodec()); when(env.observationCollector()).thenAnswer(invocation -> ObservationCollector.NO_OP); + DefaultCompressionCodecFactory ccf = new DefaultCompressionCodecFactory(); + when(env.compressionCodecFactory()).thenReturn(ccf); doAnswer( (Answer) invocationOnMock -> { @@ -172,6 +176,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( "stream", subEntrySize, 10, + StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH, Compression.NONE, Duration.ofMillis(100), messageCount * 10, @@ -181,26 +186,33 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( null, env); - IntStream.range(0, messageCount) + range(0, messageCount) .forEach( i -> producer.send( producer.messageBuilder().addData("".getBytes()).build(), confirmationHandler)); - IntStream.range(0, confirmedPart).forEach(publishingId -> producer.confirm(publishingId)); - assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed); + waitAtMost(() -> producer.unconfirmedCount() >= messageCount / subEntrySize); + range(0, confirmedPart).forEach(producer::confirm); + if (subEntrySize == 1) { + assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed); + } else { + assertThat(confirmedCount.get()).isCloseTo(confirmedCount.get(), Offset.offset(subEntrySize)); + } assertThat(erroredCount.get()).isZero(); + int confirmedPreviously = confirmedCount.get(); executorService.scheduleAtFixedRate(() -> clock.refresh(), 100, 100, TimeUnit.MILLISECONDS); Thread.sleep(waitTime.toMillis()); - assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed); + assertThat(confirmedCount.get()).isEqualTo(confirmedPreviously); if (confirmTimeout.isZero()) { assertThat(erroredCount.get()).isZero(); assertThat(responseCodes).isEmpty(); } else { waitAtMost( - waitTime.multipliedBy(2), () -> erroredCount.get() == (messageCount - expectedConfirmed)); + waitTime.multipliedBy(2), + () -> erroredCount.get() == (messageCount - confirmedPreviously)); assertThat(responseCodes).hasSize(1).contains(Constants.CODE_PUBLISH_CONFIRM_TIMEOUT); } } @@ -215,6 +227,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry "stream", subEntrySize, 10, + StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH, Compression.NONE, Duration.ZERO, 2, @@ -255,6 +268,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize) "stream", subEntrySize, 10, + StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH, Compression.NONE, Duration.ZERO, 2, diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 9ab50aadf7..76c1418d36 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -1138,6 +1138,10 @@ void down() { this.latch.get().countDown(); } + void down(int count) { + IntStream.range(0, count).forEach(ignored -> this.latch.get().countDown()); + } + boolean await(Duration timeout) { try { return this.latch.get().await(timeout.toMillis(), TimeUnit.MILLISECONDS); diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 6931d5f21a..4bec720537 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -6,7 +6,6 @@ -