From 16396dfab333f2e0ec7db8bec5ee401bd73e3a0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 5 Nov 2024 10:08:34 +0100 Subject: [PATCH 01/13] Experiment with dynamic-batch approach for publishing --- .../rabbitmq/stream/impl/DynamicBatch.java | 97 ++++++++ .../stream/impl/SimpleMessageAccumulator.java | 9 +- .../rabbitmq/stream/impl/StreamProducer.java | 217 ++++++++++++++---- .../impl/SubEntryMessageAccumulator.java | 15 +- .../stream/impl/DynamicBatchTest.java | 69 ++++++ .../stream/impl/StreamProducerTest.java | 32 +-- .../com/rabbitmq/stream/impl/TestUtils.java | 4 + 7 files changed, 370 insertions(+), 73 deletions(-) create mode 100644 src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java create mode 100644 src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java new file mode 100644 index 0000000000..070f1702e1 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -0,0 +1,97 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static java.lang.Math.max; +import static java.lang.Math.min; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +class DynamicBatch { + + private static final int MIN_BATCH_SIZE = 32; + private static final int MAX_BATCH_SIZE = 8192; + + final BlockingQueue requests = new LinkedBlockingQueue<>(); + final Consumer> consumer; + final int configuredBatchSize; + private final AtomicLong count = new AtomicLong(0); + + DynamicBatch(Consumer> consumer, int batchSize) { + this.consumer = consumer; + this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); + new Thread(this::loop).start(); + } + + void add(T item) { + try { + requests.put(item); + this.count.incrementAndGet(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void loop() { + int batchSize = this.configuredBatchSize; + List batch = new ArrayList<>(batchSize); + Thread currentThread = Thread.currentThread(); + T item; + while (!currentThread.isInterrupted()) { + try { + item = this.requests.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + currentThread.interrupt(); + return; + } + if (item != null) { + batch.add(item); + if (batch.size() >= batchSize) { + this.completeBatch(batch); + batchSize = min(batchSize * 2, MAX_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } else { + item = this.requests.poll(); + if (item == null) { + this.completeBatch(batch); + batchSize = max(batchSize / 2, MIN_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } else { + batch.add(item); + if (batch.size() >= batchSize) { + this.completeBatch(batch); + batchSize = min(batchSize * 2, MAX_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } + } + } + } else { + this.completeBatch(batch); + batchSize = min(batchSize * 2, MAX_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } + } + } + + private void completeBatch(List items) { + this.consumer.accept(items); + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index 718f253da7..72b0323ae7 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -103,7 +103,7 @@ public int size() { return messages.size(); } - private static final class SimpleAccumulatedEntity implements AccumulatedEntity { + static final class SimpleAccumulatedEntity implements AccumulatedEntity { private final long time; private final long publishingId; @@ -112,7 +112,7 @@ private static final class SimpleAccumulatedEntity implements AccumulatedEntity private final StreamProducer.ConfirmationCallback confirmationCallback; private final Object observationContext; - private SimpleAccumulatedEntity( + SimpleAccumulatedEntity( long time, long publishingId, String filterValue, @@ -158,13 +158,12 @@ public Object observationContext() { } } - private static final class SimpleConfirmationCallback - implements StreamProducer.ConfirmationCallback { + static final class SimpleConfirmationCallback implements StreamProducer.ConfirmationCallback { private final Message message; private final ConfirmationHandler confirmationHandler; - private SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) { + SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) { this.message = message; this.confirmationHandler = confirmationHandler; } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 4050fd0af7..e648b94dac 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -18,15 +18,9 @@ import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.namedRunnable; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.ConfirmationHandler; -import com.rabbitmq.stream.ConfirmationStatus; -import com.rabbitmq.stream.Constants; -import com.rabbitmq.stream.Message; -import com.rabbitmq.stream.MessageBuilder; -import com.rabbitmq.stream.Producer; -import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.compression.CompressionCodec; import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -61,7 +55,10 @@ class StreamProducer implements Producer { private static final Logger LOGGER = LoggerFactory.getLogger(StreamProducer.class); private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {}; private final long id; - private final MessageAccumulator accumulator; + // private final MessageAccumulator accumulator; + private final DynamicBatch dynamicBatch; + private final Clock clock; + private final ToLongFunction accumulatorPublishSequenceFunction; // FIXME investigate a more optimized data structure to handle pending messages private final ConcurrentMap unconfirmedMessages; private final int batchSize; @@ -76,6 +73,8 @@ class StreamProducer implements Producer { private final Codec codec; private final ToLongFunction publishSequenceFunction = entity -> ((AccumulatedEntity) entity).publishingId(); + private final Function filterValueExtractor; + private final ObservationCollector observationCollector; private final long enqueueTimeoutMs; private final boolean blockOnMaxUnconfirmed; private final boolean retryOnRecovery; @@ -114,7 +113,7 @@ class StreamProducer implements Producer { this.closingCallback = environment.registerProducer(this, name, this.stream); final Client.OutboundEntityWriteCallback delegateWriteCallback; AtomicLong publishingSequence = new AtomicLong(computeFirstValueOfPublishingSequence()); - ToLongFunction accumulatorPublishSequenceFunction = + this.accumulatorPublishSequenceFunction = msg -> { if (msg.hasPublishingId()) { return msg.getPublishingId(); @@ -122,37 +121,42 @@ class StreamProducer implements Producer { return publishingSequence.getAndIncrement(); } }; + this.filterValueExtractor = filterValueExtractor == null ? m -> null : filterValueExtractor; + this.clock = environment.clock(); + this.observationCollector = + (ObservationCollector) this.environment.observationCollector(); + if (subEntrySize <= 1) { - this.accumulator = - new SimpleMessageAccumulator( - batchSize, - environment.codec(), - client.maxFrameSize(), - accumulatorPublishSequenceFunction, - filterValueExtractor, - this.environment.clock(), - stream, - this.environment.observationCollector()); + // this.accumulator = + // new SimpleMessageAccumulator( + // batchSize, + // environment.codec(), + // client.maxFrameSize(), + // accumulatorPublishSequenceFunction, + // filterValueExtractor, + // this.environment.clock(), + // stream, + // this.environment.observationCollector()); if (filterValueExtractor == null) { delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; } else { delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK; } } else { - this.accumulator = - new SubEntryMessageAccumulator( - subEntrySize, - batchSize, - compression == Compression.NONE - ? null - : environment.compressionCodecFactory().get(compression), - environment.codec(), - this.environment.byteBufAllocator(), - client.maxFrameSize(), - accumulatorPublishSequenceFunction, - this.environment.clock(), - stream, - environment.observationCollector()); + // this.accumulator = + // new SubEntryMessageAccumulator( + // subEntrySize, + // batchSize, + // compression == Compression.NONE + // ? null + // : environment.compressionCodecFactory().get(compression), + // environment.codec(), + // this.environment.byteBufAllocator(), + // client.maxFrameSize(), + // accumulatorPublishSequenceFunction, + // this.environment.clock(), + // stream, + // environment.observationCollector()); delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK; } @@ -198,14 +202,122 @@ public int fragmentLength(Object entity) { }; } + if (subEntrySize <= 1) { + this.dynamicBatch = + new DynamicBatch<>( + items -> { + client.publishInternal( + this.publishVersion, + this.publisherId, + items, + this.writeCallback, + this.publishSequenceFunction); + }, + batchSize); + } else { + CompressionCodec compressionCodec = + compression == Compression.NONE + ? null + : environment.compressionCodecFactory().get(compression); + byte compressionCode = + compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); + this.dynamicBatch = + new DynamicBatch<>( + items -> { + List subBatches = new ArrayList<>(); + int count = 0; + SubEntryMessageAccumulator.Batch batch = + new SubEntryMessageAccumulator.Batch( + Client.EncodedMessageBatch.create( + this.environment.byteBufAllocator(), + compressionCode, + compressionCodec, + subEntrySize), + new SubEntryMessageAccumulator.CompositeConfirmationCallback( + new ArrayList<>(subEntrySize))); + AccumulatedEntity lastMessageInBatch = null; + for (Object msg : items) { + AccumulatedEntity message = (AccumulatedEntity) msg; + this.observationCollector.published( + message.observationContext(), message.confirmationCallback().message()); + lastMessageInBatch = message; + batch.add( + (Codec.EncodedMessage) message.encodedEntity(), + message.confirmationCallback()); + count++; + if (count == subEntrySize) { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + subBatches.add(batch); + lastMessageInBatch = null; + batch = + new SubEntryMessageAccumulator.Batch( + Client.EncodedMessageBatch.create( + this.environment.byteBufAllocator(), + compressionCode, + compressionCodec, + subEntrySize), + new SubEntryMessageAccumulator.CompositeConfirmationCallback( + new ArrayList<>(subEntrySize))); + count = 0; + } + } + + if (!batch.isEmpty() && count < subEntrySize) { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + subBatches.add(batch); + } + + client.publishInternal( + this.publishVersion, + this.publisherId, + subBatches, + this.writeCallback, + this.publishSequenceFunction); + + /* + while (count != subEntrySize) { + AccumulatedEntity message = items.poll(); + if (message == null) { + break; + } + this.observationCollector.published( + message.observationContext(), message.confirmationCallback().message()); + lastMessageInBatch = message; + batch.add((Codec.EncodedMessage) message.encodedEntity(), message.confirmationCallback()); + count++; + } + if (batch.isEmpty()) { + return null; + } else { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + return batch; + } + client.publishInternal( + this.publishVersion, + this.publisherId, + subBatches, + this.writeCallback, + this.publishSequenceFunction); + + */ + }, + batchSize * subEntrySize); + } + if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) { AtomicReference taskReference = new AtomicReference<>(); Runnable task = () -> { if (canSend()) { - synchronized (StreamProducer.this) { - publishBatch(true); - } + // synchronized (StreamProducer.this) { + // publishBatch(true); + // } } if (status != Status.CLOSED) { environment @@ -397,11 +509,22 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { private void doSend(Message message, ConfirmationHandler confirmationHandler) { if (canSend()) { - if (accumulator.add(message, confirmationHandler)) { - synchronized (this) { - publishBatch(true); - } - } + long publishingId = this.accumulatorPublishSequenceFunction.applyAsLong(message); + Object observationContext = this.observationCollector.prePublish(this.stream, message); + this.dynamicBatch.add( + new SimpleMessageAccumulator.SimpleAccumulatedEntity( + this.clock.time(), + publishingId, + this.filterValueExtractor.apply(message), + this.codec.encode(message), + new SimpleMessageAccumulator.SimpleConfirmationCallback(message, confirmationHandler), + observationContext)); + + // if (accumulator.add(message, confirmationHandler)) { + // synchronized (this) { + // publishBatch(true); + // } + // } } else { failPublishing(message, confirmationHandler); } @@ -476,12 +599,13 @@ private void cancelConfirmTimeoutTask() { } } + /* private void publishBatch(boolean stateCheck) { if ((!stateCheck || canSend()) && !accumulator.isEmpty()) { List messages = new ArrayList<>(this.batchSize); int batchCount = 0; while (batchCount != this.batchSize) { - Object accMessage = accumulator.get(); + AccumulatedEntity accMessage = accumulator.get(); if (accMessage == null) { break; } @@ -497,6 +621,8 @@ private void publishBatch(boolean stateCheck) { } } + */ + boolean isOpen() { return !this.closed.get(); } @@ -510,7 +636,8 @@ void running() { LOGGER.debug( "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", this.unconfirmedMessages.size(), - this.accumulator.size()); + 0); + // this.accumulator.size()); if (this.retryOnRecovery) { LOGGER.debug("Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size()); if (!this.unconfirmedMessages.isEmpty()) { @@ -556,7 +683,7 @@ void running() { } } } - publishBatch(false); + // publishBatch(false); int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); if (toRelease > 0) { unconfirmedMessagesSemaphore.release(toRelease); diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index 9693aea50c..d2043310b9 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -95,14 +95,14 @@ public AccumulatedEntity get() { } } - private static class Batch implements AccumulatedEntity { + static class Batch implements AccumulatedEntity { - private final EncodedMessageBatch encodedMessageBatch; + final EncodedMessageBatch encodedMessageBatch; private final CompositeConfirmationCallback confirmationCallback; - private volatile long publishingId; - private volatile long time; + volatile long publishingId; + volatile long time; - private Batch( + Batch( EncodedMessageBatch encodedMessageBatch, CompositeConfirmationCallback confirmationCallback) { this.encodedMessageBatch = encodedMessageBatch; @@ -152,12 +152,11 @@ public Object observationContext() { } } - private static class CompositeConfirmationCallback - implements StreamProducer.ConfirmationCallback { + static class CompositeConfirmationCallback implements StreamProducer.ConfirmationCallback { private final List callbacks; - private CompositeConfirmationCallback(List callbacks) { + CompositeConfirmationCallback(List callbacks) { this.callbacks = callbacks; } diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java new file mode 100644 index 0000000000..9680845842 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -0,0 +1,69 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import com.google.common.util.concurrent.RateLimiter; +import java.time.Duration; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Test; + +public class DynamicBatchTest { + + @Test + void test() { + MetricRegistry metrics = new MetricRegistry(); + Histogram batchSizeMetrics = metrics.histogram("batch-size"); + ConsoleReporter reporter = + ConsoleReporter.forRegistry(metrics) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build(); + + int itemCount = 3000; + TestUtils.Sync sync = TestUtils.sync(itemCount); + Random random = new Random(); + DynamicBatch batch = + new DynamicBatch<>( + items -> { + // System.out.println(System.currentTimeMillis()); + // System.out.println(items.size()); + batchSizeMetrics.update(items.size()); + sync.down(items.size()); + try { + Thread.sleep(random.nextInt(10) + 1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + 100); + RateLimiter rateLimiter = RateLimiter.create(3000); + long start = System.nanoTime(); + IntStream.range(0, itemCount) + .forEach( + i -> { + rateLimiter.acquire(); + batch.add(String.valueOf(i)); + }); + Assertions.assertThat(sync).completes(); + long end = System.nanoTime(); + System.out.println("Done in " + Duration.ofNanos(end - start)); + reporter.report(); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 6cf06356d0..9c08a29332 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -14,10 +14,8 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.latchAssert; -import static com.rabbitmq.stream.impl.TestUtils.localhost; -import static com.rabbitmq.stream.impl.TestUtils.streamName; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.impl.Assertions.assertThat; +import static com.rabbitmq.stream.impl.TestUtils.*; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -27,6 +25,7 @@ import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducerInfo; import com.rabbitmq.stream.impl.StreamProducer.Status; +import com.rabbitmq.stream.impl.TestUtils.Sync; import io.netty.channel.ChannelOption; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoopGroup; @@ -94,7 +93,7 @@ void tearDown() { void send() throws Exception { int batchSize = 10; int messageCount = 10 * batchSize + 1; // don't want a multiple of batch size - CountDownLatch publishLatch = new CountDownLatch(messageCount); + Sync confirmSync = sync(messageCount); Producer producer = environment.producerBuilder().stream(stream).batchSize(batchSize).build(); AtomicLong count = new AtomicLong(0); AtomicLong sequence = new AtomicLong(0); @@ -117,13 +116,12 @@ void send() throws Exception { idsConfirmed.add( confirmationStatus.getMessage().getProperties().getMessageIdAsLong()); count.incrementAndGet(); - publishLatch.countDown(); + confirmSync.down(); }); }); - boolean completed = publishLatch.await(10, TimeUnit.SECONDS); + assertThat(confirmSync).completes(); assertThat(idsSent).hasSameSizeAs(idsConfirmed); idsSent.forEach(idSent -> assertThat(idsConfirmed).contains(idSent)); - assertThat(completed).isTrue(); ProducerInfo info = MonitoringTestUtils.extract(producer); assertThat(info.getId()).isGreaterThanOrEqualTo(0); @@ -455,12 +453,16 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) .build(); assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); Thread.sleep(1000); - // if we are using sub-entries, we cannot avoid duplicates. - // here, a sub-entry in the second wave, right at the end of the re-submitted - // values will contain those duplicates, because its publishing ID will be - // the one of its last message, so the server will accept the whole sub-entry, - // including the duplicates. - assertThat(consumed.get()).isEqualTo(lineCount + backwardCount % subEntrySize); + if (subEntrySize == 1) { + assertThat(consumed.get()).isEqualTo(lineCount); + } else { + // if we are using sub-entries, we cannot avoid duplicates. + // here, a sub-entry in the second wave, right at the end of the re-submitted + // values will contain those duplicates, because its publishing ID will be + // the one of its last message, so the server will accept the whole sub-entry, + // including the duplicates. + assertThat(consumed.get()).isBetween(lineCount, lineCount + subEntrySize); + } } @ParameterizedTest @@ -636,7 +638,7 @@ void subEntryBatchesSentCompressedShouldBeConsumedProperly() { } @Test - void methodsShouldThrowExceptionWhenProducerIsClosed() throws InterruptedException { + void methodsShouldThrowExceptionWhenProducerIsClosed() { Producer producer = environment.producerBuilder().stream(stream).build(); producer.close(); assertThatThrownBy(() -> producer.getLastPublishingId()) diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 9ab50aadf7..8700987f0f 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1138,6 +1138,10 @@ void down() { this.latch.get().countDown(); } + void down(int count) { + IntStream.range(0, count).forEach(ignored -> this.latch.get().countDown()); + } + boolean await(Duration timeout) { try { return this.latch.get().await(timeout.toMillis(), TimeUnit.MILLISECONDS); From 36c264da4440ba8d3933a0a4f2a5dcb70230a2fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 5 Nov 2024 16:43:31 +0100 Subject: [PATCH 02/13] Refactor message accumulator abstraction for dynamic batching --- .../stream/impl/MessageAccumulator.java | 21 +- .../rabbitmq/stream/impl/ProducerUtils.java | 207 ++++++++++++++++++ .../stream/impl/SimpleMessageAccumulator.java | 125 ++++------- .../rabbitmq/stream/impl/StreamProducer.java | 160 ++++++-------- .../impl/SubEntryMessageAccumulator.java | 107 +-------- .../stream/impl/StreamProducerTest.java | 21 +- 6 files changed, 335 insertions(+), 306 deletions(-) create mode 100644 src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java index c35271f29f..ff0c592317 100644 --- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java @@ -19,26 +19,9 @@ interface MessageAccumulator { - boolean add(Message message, ConfirmationHandler confirmationHandler); - - AccumulatedEntity get(); - - boolean isEmpty(); + void add(Message message, ConfirmationHandler confirmationHandler); int size(); - interface AccumulatedEntity { - - long time(); - - long publishingId(); - - String filterValue(); - - Object encodedEntity(); - - StreamProducer.ConfirmationCallback confirmationCallback(); - - Object observationContext(); - } + void flush(boolean force); } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java new file mode 100644 index 0000000000..4933932755 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -0,0 +1,207 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.ConfirmationHandler; +import com.rabbitmq.stream.ConfirmationStatus; +import com.rabbitmq.stream.Message; +import java.util.List; + +final class ProducerUtils { + + private ProducerUtils() {} + + interface ConfirmationCallback { + + int handle(boolean confirmed, short code); + + Message message(); + } + + interface AccumulatedEntity { + + long time(); + + long publishingId(); + + String filterValue(); + + Object encodedEntity(); + + ConfirmationCallback confirmationCallback(); + + Object observationContext(); + } + + static final class SimpleConfirmationCallback implements ConfirmationCallback { + + private final Message message; + private final ConfirmationHandler confirmationHandler; + + SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) { + this.message = message; + this.confirmationHandler = confirmationHandler; + } + + @Override + public int handle(boolean confirmed, short code) { + confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code)); + return 1; + } + + @Override + public Message message() { + return this.message; + } + } + + static final class SimpleAccumulatedEntity implements AccumulatedEntity { + + private final long time; + private final long publishingId; + private final String filterValue; + private final Codec.EncodedMessage encodedMessage; + private final ConfirmationCallback confirmationCallback; + private final Object observationContext; + + SimpleAccumulatedEntity( + long time, + long publishingId, + String filterValue, + Codec.EncodedMessage encodedMessage, + ConfirmationCallback confirmationCallback, + Object observationContext) { + this.time = time; + this.publishingId = publishingId; + this.encodedMessage = encodedMessage; + this.filterValue = filterValue; + this.confirmationCallback = confirmationCallback; + this.observationContext = observationContext; + } + + @Override + public long publishingId() { + return publishingId; + } + + @Override + public String filterValue() { + return filterValue; + } + + @Override + public Object encodedEntity() { + return encodedMessage; + } + + @Override + public long time() { + return time; + } + + @Override + public ConfirmationCallback confirmationCallback() { + return confirmationCallback; + } + + @Override + public Object observationContext() { + return this.observationContext; + } + } + + static final class CompositeConfirmationCallback implements ConfirmationCallback { + + private final List callbacks; + + CompositeConfirmationCallback(List callbacks) { + this.callbacks = callbacks; + } + + private void add(ConfirmationCallback confirmationCallback) { + this.callbacks.add(confirmationCallback); + } + + @Override + public int handle(boolean confirmed, short code) { + for (ConfirmationCallback callback : callbacks) { + callback.handle(confirmed, code); + } + return callbacks.size(); + } + + @Override + public Message message() { + throw new UnsupportedOperationException( + "composite confirmation callback does not contain just one message"); + } + } + + static final class Batch implements AccumulatedEntity { + + final Client.EncodedMessageBatch encodedMessageBatch; + private final CompositeConfirmationCallback confirmationCallback; + volatile long publishingId; + volatile long time; + + Batch( + Client.EncodedMessageBatch encodedMessageBatch, + CompositeConfirmationCallback confirmationCallback) { + this.encodedMessageBatch = encodedMessageBatch; + this.confirmationCallback = confirmationCallback; + } + + void add(Codec.EncodedMessage encodedMessage, ConfirmationCallback confirmationCallback) { + this.encodedMessageBatch.add(encodedMessage); + this.confirmationCallback.add(confirmationCallback); + } + + boolean isEmpty() { + return this.confirmationCallback.callbacks.isEmpty(); + } + + @Override + public long publishingId() { + return publishingId; + } + + @Override + public String filterValue() { + return null; + } + + @Override + public Object encodedEntity() { + return encodedMessageBatch; + } + + @Override + public long time() { + return time; + } + + @Override + public ConfirmationCallback confirmationCallback() { + return confirmationCallback; + } + + @Override + public Object observationContext() { + throw new UnsupportedOperationException( + "batch entity does not contain only one observation context"); + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index 72b0323ae7..90a70e43a3 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -15,9 +15,12 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.ToLongFunction; @@ -25,7 +28,7 @@ class SimpleMessageAccumulator implements MessageAccumulator { private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; - protected final BlockingQueue messages; + protected final BlockingQueue messages; protected final Clock clock; private final int capacity; protected final Codec codec; @@ -34,6 +37,7 @@ class SimpleMessageAccumulator implements MessageAccumulator { private final Function filterValueExtractor; final String stream; final ObservationCollector observationCollector; + private final StreamProducer producer; @SuppressWarnings("unchecked") SimpleMessageAccumulator( @@ -44,7 +48,8 @@ class SimpleMessageAccumulator implements MessageAccumulator { Function filterValueExtractor, Clock clock, String stream, - ObservationCollector observationCollector) { + ObservationCollector observationCollector, + StreamProducer producer) { this.capacity = capacity; this.messages = new LinkedBlockingQueue<>(capacity); this.codec = codec; @@ -55,9 +60,10 @@ class SimpleMessageAccumulator implements MessageAccumulator { this.clock = clock; this.stream = stream; this.observationCollector = (ObservationCollector) observationCollector; + this.producer = producer; } - public boolean add(Message message, ConfirmationHandler confirmationHandler) { + public void add(Message message, ConfirmationHandler confirmationHandler) { Object observationContext = this.observationCollector.prePublish(this.stream, message); Codec.EncodedMessage encodedMessage = this.codec.encode(message); Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); @@ -65,12 +71,12 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) { try { boolean offered = messages.offer( - new SimpleAccumulatedEntity( + new ProducerUtils.SimpleAccumulatedEntity( clock.time(), publishingId, this.filterValueExtractor.apply(message), encodedMessage, - new SimpleConfirmationCallback(message, confirmationHandler), + new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), observationContext), 60, TimeUnit.SECONDS); @@ -80,12 +86,15 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) { } catch (InterruptedException e) { throw new StreamException("Error while accumulating outbound message", e); } - return this.messages.size() == this.capacity; + if (this.messages.size() == this.capacity) { + synchronized (this.producer) { + publishBatch(true); + } + } } - @Override - public AccumulatedEntity get() { - AccumulatedEntity entity = this.messages.poll(); + ProducerUtils.AccumulatedEntity get() { + ProducerUtils.AccumulatedEntity entity = this.messages.poll(); if (entity != null) { this.observationCollector.published( entity.observationContext(), entity.confirmationCallback().message()); @@ -93,90 +102,36 @@ public AccumulatedEntity get() { return entity; } - @Override - public boolean isEmpty() { - return messages.isEmpty(); - } - @Override public int size() { return messages.size(); } - static final class SimpleAccumulatedEntity implements AccumulatedEntity { - - private final long time; - private final long publishingId; - private final String filterValue; - private final Codec.EncodedMessage encodedMessage; - private final StreamProducer.ConfirmationCallback confirmationCallback; - private final Object observationContext; - - SimpleAccumulatedEntity( - long time, - long publishingId, - String filterValue, - Codec.EncodedMessage encodedMessage, - StreamProducer.ConfirmationCallback confirmationCallback, - Object observationContext) { - this.time = time; - this.publishingId = publishingId; - this.encodedMessage = encodedMessage; - this.filterValue = filterValue; - this.confirmationCallback = confirmationCallback; - this.observationContext = observationContext; - } - - @Override - public long publishingId() { - return publishingId; - } - - @Override - public String filterValue() { - return filterValue; - } - - @Override - public Object encodedEntity() { - return encodedMessage; - } - - @Override - public long time() { - return time; - } - - @Override - public StreamProducer.ConfirmationCallback confirmationCallback() { - return confirmationCallback; - } - - @Override - public Object observationContext() { - return this.observationContext; + @Override + public void flush(boolean force) { + boolean stateCheck = !force; + synchronized (this.producer) { + publishBatch(stateCheck); } + // System.out.println(sent.get()); } - static final class SimpleConfirmationCallback implements StreamProducer.ConfirmationCallback { - - private final Message message; - private final ConfirmationHandler confirmationHandler; - - SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) { - this.message = message; - this.confirmationHandler = confirmationHandler; - } - - @Override - public int handle(boolean confirmed, short code) { - confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code)); - return 1; - } - - @Override - public Message message() { - return this.message; + AtomicInteger sent = new AtomicInteger(); + + private void publishBatch(boolean stateCheck) { + if ((!stateCheck || this.producer.canSend()) && !this.messages.isEmpty()) { + List entities = new ArrayList<>(this.capacity); + int batchCount = 0; + while (batchCount != this.capacity) { + ProducerUtils.AccumulatedEntity entity = this.get(); + if (entity == null) { + break; + } + entities.add(entity); + batchCount++; + } + this.sent.addAndGet(entities.size()); + producer.publishInternal(entities); } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index e648b94dac..8c1de81345 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -20,9 +20,8 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.Compression; -import com.rabbitmq.stream.compression.CompressionCodec; import com.rabbitmq.stream.impl.Client.Response; -import com.rabbitmq.stream.impl.MessageAccumulator.AccumulatedEntity; +import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBuf; import java.nio.charset.StandardCharsets; @@ -55,8 +54,8 @@ class StreamProducer implements Producer { private static final Logger LOGGER = LoggerFactory.getLogger(StreamProducer.class); private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {}; private final long id; - // private final MessageAccumulator accumulator; - private final DynamicBatch dynamicBatch; + private final MessageAccumulator accumulator; + // private final DynamicBatch dynamicBatch; private final Clock clock; private final ToLongFunction accumulatorPublishSequenceFunction; // FIXME investigate a more optimized data structure to handle pending messages @@ -127,36 +126,38 @@ class StreamProducer implements Producer { (ObservationCollector) this.environment.observationCollector(); if (subEntrySize <= 1) { - // this.accumulator = - // new SimpleMessageAccumulator( - // batchSize, - // environment.codec(), - // client.maxFrameSize(), - // accumulatorPublishSequenceFunction, - // filterValueExtractor, - // this.environment.clock(), - // stream, - // this.environment.observationCollector()); + this.accumulator = + new SimpleMessageAccumulator( + batchSize, + environment.codec(), + client.maxFrameSize(), + accumulatorPublishSequenceFunction, + filterValueExtractor, + this.environment.clock(), + stream, + this.environment.observationCollector(), + this); if (filterValueExtractor == null) { delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; } else { delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK; } } else { - // this.accumulator = - // new SubEntryMessageAccumulator( - // subEntrySize, - // batchSize, - // compression == Compression.NONE - // ? null - // : environment.compressionCodecFactory().get(compression), - // environment.codec(), - // this.environment.byteBufAllocator(), - // client.maxFrameSize(), - // accumulatorPublishSequenceFunction, - // this.environment.clock(), - // stream, - // environment.observationCollector()); + this.accumulator = + new SubEntryMessageAccumulator( + subEntrySize, + batchSize, + compression == Compression.NONE + ? null + : environment.compressionCodecFactory().get(compression), + environment.codec(), + this.environment.byteBufAllocator(), + client.maxFrameSize(), + accumulatorPublishSequenceFunction, + this.environment.clock(), + stream, + environment.observationCollector(), + this); delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK; } @@ -170,8 +171,7 @@ class StreamProducer implements Producer { new Client.OutboundEntityWriteCallback() { @Override public int write(ByteBuf bb, Object entity, long publishingId) { - MessageAccumulator.AccumulatedEntity accumulatedEntity = - (MessageAccumulator.AccumulatedEntity) entity; + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; unconfirmedMessages.put(publishingId, accumulatedEntity); return delegateWriteCallback.write( bb, accumulatedEntity.encodedEntity(), publishingId); @@ -180,7 +180,7 @@ public int write(ByteBuf bb, Object entity, long publishingId) { @Override public int fragmentLength(Object entity) { return delegateWriteCallback.fragmentLength( - ((MessageAccumulator.AccumulatedEntity) entity).encodedEntity()); + ((AccumulatedEntity) entity).encodedEntity()); } }; } else { @@ -189,8 +189,7 @@ public int fragmentLength(Object entity) { new Client.OutboundEntityWriteCallback() { @Override public int write(ByteBuf bb, Object entity, long publishingId) { - MessageAccumulator.AccumulatedEntity accumulatedEntity = - (MessageAccumulator.AccumulatedEntity) entity; + AccumulatedEntity accumulatedEntity = (AccumulatedEntity) entity; unconfirmedMessages.put(publishingId, accumulatedEntity); return delegateWriteCallback.write(bb, accumulatedEntity, publishingId); } @@ -202,6 +201,7 @@ public int fragmentLength(Object entity) { }; } + /* if (subEntrySize <= 1) { this.dynamicBatch = new DynamicBatch<>( @@ -226,14 +226,14 @@ public int fragmentLength(Object entity) { items -> { List subBatches = new ArrayList<>(); int count = 0; - SubEntryMessageAccumulator.Batch batch = - new SubEntryMessageAccumulator.Batch( + ProducerUtils.Batch batch = + new ProducerUtils.Batch( Client.EncodedMessageBatch.create( this.environment.byteBufAllocator(), compressionCode, compressionCodec, subEntrySize), - new SubEntryMessageAccumulator.CompositeConfirmationCallback( + new ProducerUtils.CompositeConfirmationCallback( new ArrayList<>(subEntrySize))); AccumulatedEntity lastMessageInBatch = null; for (Object msg : items) { @@ -252,13 +252,13 @@ public int fragmentLength(Object entity) { subBatches.add(batch); lastMessageInBatch = null; batch = - new SubEntryMessageAccumulator.Batch( + new ProducerUtils.Batch( Client.EncodedMessageBatch.create( this.environment.byteBufAllocator(), compressionCode, compressionCodec, subEntrySize), - new SubEntryMessageAccumulator.CompositeConfirmationCallback( + new ProducerUtils.CompositeConfirmationCallback( new ArrayList<>(subEntrySize))); count = 0; } @@ -278,46 +278,17 @@ public int fragmentLength(Object entity) { this.writeCallback, this.publishSequenceFunction); - /* - while (count != subEntrySize) { - AccumulatedEntity message = items.poll(); - if (message == null) { - break; - } - this.observationCollector.published( - message.observationContext(), message.confirmationCallback().message()); - lastMessageInBatch = message; - batch.add((Codec.EncodedMessage) message.encodedEntity(), message.confirmationCallback()); - count++; - } - if (batch.isEmpty()) { - return null; - } else { - batch.time = lastMessageInBatch.time(); - batch.publishingId = lastMessageInBatch.publishingId(); - batch.encodedMessageBatch.close(); - return batch; - } - client.publishInternal( - this.publishVersion, - this.publisherId, - subBatches, - this.writeCallback, - this.publishSequenceFunction); - - */ }, batchSize * subEntrySize); } + */ if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) { AtomicReference taskReference = new AtomicReference<>(); Runnable task = () -> { if (canSend()) { - // synchronized (StreamProducer.this) { - // publishBatch(true); - // } + this.accumulator.flush(false); } if (status != Status.CLOSED) { environment @@ -509,22 +480,18 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { private void doSend(Message message, ConfirmationHandler confirmationHandler) { if (canSend()) { - long publishingId = this.accumulatorPublishSequenceFunction.applyAsLong(message); - Object observationContext = this.observationCollector.prePublish(this.stream, message); - this.dynamicBatch.add( - new SimpleMessageAccumulator.SimpleAccumulatedEntity( - this.clock.time(), - publishingId, - this.filterValueExtractor.apply(message), - this.codec.encode(message), - new SimpleMessageAccumulator.SimpleConfirmationCallback(message, confirmationHandler), - observationContext)); - - // if (accumulator.add(message, confirmationHandler)) { - // synchronized (this) { - // publishBatch(true); - // } - // } + // long publishingId = this.accumulatorPublishSequenceFunction.applyAsLong(message); + // Object observationContext = this.observationCollector.prePublish(this.stream, + // message); + // this.dynamicBatch.add( + // new ProducerUtils.SimpleAccumulatedEntity( + // this.clock.time(), + // publishingId, + // this.filterValueExtractor.apply(message), + // this.codec.encode(message), + // new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), + // observationContext)); + this.accumulator.add(message, confirmationHandler); } else { failPublishing(message, confirmationHandler); } @@ -542,7 +509,7 @@ private void failPublishing(Message message, ConfirmationHandler confirmationHan } } - private boolean canSend() { + boolean canSend() { return this.status == Status.RUNNING; } @@ -623,6 +590,15 @@ private void publishBatch(boolean stateCheck) { */ + void publishInternal(List messages) { + client.publishInternal( + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); + } + boolean isOpen() { return !this.closed.get(); } @@ -636,8 +612,7 @@ void running() { LOGGER.debug( "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", this.unconfirmedMessages.size(), - 0); - // this.accumulator.size()); + this.accumulator.size()); if (this.retryOnRecovery) { LOGGER.debug("Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size()); if (!this.unconfirmedMessages.isEmpty()) { @@ -683,7 +658,7 @@ void running() { } } } - // publishBatch(false); + this.accumulator.flush(true); int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); if (toRelease > 0) { unconfirmedMessagesSemaphore.release(toRelease); @@ -715,13 +690,6 @@ enum Status { CLOSED } - interface ConfirmationCallback { - - int handle(boolean confirmed, short code); - - Message message(); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index d2043310b9..dd15601e10 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -23,10 +23,9 @@ import com.rabbitmq.stream.impl.Client.EncodedMessageBatch; import io.netty.buffer.ByteBufAllocator; import java.util.ArrayList; -import java.util.List; import java.util.function.ToLongFunction; -class SubEntryMessageAccumulator extends SimpleMessageAccumulator { +final class SubEntryMessageAccumulator extends SimpleMessageAccumulator { private final int subEntrySize; private final CompressionCodec compressionCodec; @@ -43,7 +42,8 @@ public SubEntryMessageAccumulator( ToLongFunction publishSequenceFunction, Clock clock, String stream, - ObservationCollector observationCollector) { + ObservationCollector observationCollector, + StreamProducer producer) { super( subEntrySize * batchSize, codec, @@ -52,30 +52,31 @@ public SubEntryMessageAccumulator( null, clock, stream, - observationCollector); + observationCollector, + producer); this.subEntrySize = subEntrySize; this.compressionCodec = compressionCodec; this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); this.byteBufAllocator = byteBufAllocator; } - private Batch createBatch() { - return new Batch( + private ProducerUtils.Batch createBatch() { + return new ProducerUtils.Batch( EncodedMessageBatch.create( byteBufAllocator, compression, compressionCodec, this.subEntrySize), - new CompositeConfirmationCallback(new ArrayList<>(this.subEntrySize))); + new ProducerUtils.CompositeConfirmationCallback(new ArrayList<>(this.subEntrySize))); } @Override - public AccumulatedEntity get() { + protected ProducerUtils.AccumulatedEntity get() { if (this.messages.isEmpty()) { return null; } int count = 0; - Batch batch = createBatch(); - AccumulatedEntity lastMessageInBatch = null; + ProducerUtils.Batch batch = createBatch(); + ProducerUtils.AccumulatedEntity lastMessageInBatch = null; while (count != this.subEntrySize) { - AccumulatedEntity message = messages.poll(); + ProducerUtils.AccumulatedEntity message = messages.poll(); if (message == null) { break; } @@ -94,88 +95,4 @@ public AccumulatedEntity get() { return batch; } } - - static class Batch implements AccumulatedEntity { - - final EncodedMessageBatch encodedMessageBatch; - private final CompositeConfirmationCallback confirmationCallback; - volatile long publishingId; - volatile long time; - - Batch( - EncodedMessageBatch encodedMessageBatch, - CompositeConfirmationCallback confirmationCallback) { - this.encodedMessageBatch = encodedMessageBatch; - this.confirmationCallback = confirmationCallback; - } - - void add( - Codec.EncodedMessage encodedMessage, - StreamProducer.ConfirmationCallback confirmationCallback) { - this.encodedMessageBatch.add(encodedMessage); - this.confirmationCallback.add(confirmationCallback); - } - - boolean isEmpty() { - return this.confirmationCallback.callbacks.isEmpty(); - } - - @Override - public long publishingId() { - return publishingId; - } - - @Override - public String filterValue() { - return null; - } - - @Override - public Object encodedEntity() { - return encodedMessageBatch; - } - - @Override - public long time() { - return time; - } - - @Override - public StreamProducer.ConfirmationCallback confirmationCallback() { - return confirmationCallback; - } - - @Override - public Object observationContext() { - throw new UnsupportedOperationException( - "batch entity does not contain only one observation context"); - } - } - - static class CompositeConfirmationCallback implements StreamProducer.ConfirmationCallback { - - private final List callbacks; - - CompositeConfirmationCallback(List callbacks) { - this.callbacks = callbacks; - } - - private void add(StreamProducer.ConfirmationCallback confirmationCallback) { - this.callbacks.add(confirmationCallback); - } - - @Override - public int handle(boolean confirmed, short code) { - for (StreamProducer.ConfirmationCallback callback : callbacks) { - callback.handle(confirmed, code); - } - return callbacks.size(); - } - - @Override - public Message message() { - throw new UnsupportedOperationException( - "composite confirmation callback does not contain just one message"); - } - } } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 9c08a29332..19be5ea05b 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -413,15 +413,14 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) int firstWaveLineCount = lineCount / 5; int backwardCount = firstWaveLineCount / 10; SortedSet document = new TreeSet<>(); - IntStream.range(0, lineCount).forEach(i -> document.add(i)); + IntStream.range(0, lineCount).forEach(document::add); Producer producer = environment.producerBuilder().name("producer-1").stream(stream) .subEntrySize(subEntrySize) .build(); - AtomicReference latch = - new AtomicReference<>(new CountDownLatch(firstWaveLineCount)); - ConfirmationHandler confirmationHandler = confirmationStatus -> latch.get().countDown(); + Sync confirmSync = sync(firstWaveLineCount); + ConfirmationHandler confirmationHandler = confirmationStatus -> confirmSync.down(); Consumer publishMessage = i -> producer.send( @@ -431,15 +430,17 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) .addData(String.valueOf(i).getBytes()) .build(), confirmationHandler); + // publish the first wave document.headSet(firstWaveLineCount).forEach(publishMessage); - assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(confirmSync).completes(); - latch.set(new CountDownLatch(lineCount - firstWaveLineCount + backwardCount)); + confirmSync.reset(lineCount - firstWaveLineCount + backwardCount); + // publish the rest, but with some overlap from the first wave document.tailSet(firstWaveLineCount - backwardCount).forEach(publishMessage); - assertThat(latch.get().await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(confirmSync).completes(); CountDownLatch consumeLatch = new CountDownLatch(lineCount); AtomicInteger consumed = new AtomicInteger(); @@ -451,8 +452,7 @@ void messagesShouldBeDeDuplicatedWhenUsingNameAndPublishingId(int subEntrySize) consumeLatch.countDown(); }) .build(); - assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); - Thread.sleep(1000); + assertThat(consumeLatch.await(5, TimeUnit.SECONDS)).isTrue(); if (subEntrySize == 1) { assertThat(consumed.get()).isEqualTo(lineCount); } else { @@ -641,8 +641,7 @@ void subEntryBatchesSentCompressedShouldBeConsumedProperly() { void methodsShouldThrowExceptionWhenProducerIsClosed() { Producer producer = environment.producerBuilder().stream(stream).build(); producer.close(); - assertThatThrownBy(() -> producer.getLastPublishingId()) - .isInstanceOf(IllegalStateException.class); + assertThatThrownBy(producer::getLastPublishingId).isInstanceOf(IllegalStateException.class); } @Test From 817d57a2a2ec0e84f366ffc3b2053ec698d403e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 5 Nov 2024 18:27:05 +0100 Subject: [PATCH 03/13] Add dynamic-batch message accumulator --- .../rabbitmq/stream/impl/DynamicBatch.java | 53 +++--- .../impl/DynamicBatchMessageAccumulator.java | 162 ++++++++++++++++++ .../rabbitmq/stream/impl/ProducerUtils.java | 66 ++++++- .../stream/impl/SimpleMessageAccumulator.java | 14 +- .../rabbitmq/stream/impl/StreamProducer.java | 89 +++++----- .../impl/SubEntryMessageAccumulator.java | 2 +- .../stream/impl/DynamicBatchTest.java | 3 +- .../stream/impl/StreamProducerUnitTest.java | 3 + 8 files changed, 312 insertions(+), 80 deletions(-) create mode 100644 src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 070f1702e1..a2dc00acd7 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -22,20 +22,21 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; +import java.util.function.Predicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class DynamicBatch { + private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); private static final int MIN_BATCH_SIZE = 32; private static final int MAX_BATCH_SIZE = 8192; - final BlockingQueue requests = new LinkedBlockingQueue<>(); - final Consumer> consumer; - final int configuredBatchSize; - private final AtomicLong count = new AtomicLong(0); + private final BlockingQueue requests = new LinkedBlockingQueue<>(); + private final Predicate> consumer; + private final int configuredBatchSize; - DynamicBatch(Consumer> consumer, int batchSize) { + DynamicBatch(Predicate> consumer, int batchSize) { this.consumer = consumer; this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); new Thread(this::loop).start(); @@ -44,7 +45,6 @@ class DynamicBatch { void add(T item) { try { requests.put(item); - this.count.incrementAndGet(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -65,33 +65,42 @@ private void loop() { if (item != null) { batch.add(item); if (batch.size() >= batchSize) { - this.completeBatch(batch); - batchSize = min(batchSize * 2, MAX_BATCH_SIZE); - batch = new ArrayList<>(batchSize); + if (this.completeBatch(batch)) { + batchSize = min(batchSize * 2, MAX_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } } else { item = this.requests.poll(); if (item == null) { - this.completeBatch(batch); - batchSize = max(batchSize / 2, MIN_BATCH_SIZE); - batch = new ArrayList<>(batchSize); + if (this.completeBatch(batch)) { + batchSize = max(batchSize / 2, MIN_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } } else { batch.add(item); if (batch.size() >= batchSize) { - this.completeBatch(batch); - batchSize = min(batchSize * 2, MAX_BATCH_SIZE); - batch = new ArrayList<>(batchSize); + if (this.completeBatch(batch)) { + batchSize = min(batchSize * 2, MAX_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } } } } } else { - this.completeBatch(batch); - batchSize = min(batchSize * 2, MAX_BATCH_SIZE); - batch = new ArrayList<>(batchSize); + if (this.completeBatch(batch)) { + batchSize = min(batchSize * 2, MAX_BATCH_SIZE); + batch = new ArrayList<>(batchSize); + } } } } - private void completeBatch(List items) { - this.consumer.accept(items); + private boolean completeBatch(List items) { + try { + return this.consumer.test(items); + } catch (Exception e) { + LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage()); + return false; + } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java new file mode 100644 index 0000000000..11b1a768b0 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -0,0 +1,162 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.Codec; +import com.rabbitmq.stream.ConfirmationHandler; +import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.ObservationCollector; +import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.compression.CompressionCodec; +import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; +import io.netty.buffer.ByteBufAllocator; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.ToLongFunction; + +final class DynamicBatchMessageAccumulator implements MessageAccumulator { + + private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; + + private final DynamicBatch dynamicBatch; + private final ObservationCollector observationCollector; + private final ToLongFunction publishSequenceFunction; + private final String stream; + private final StreamProducer producer; + private final Codec codec; + private final int maxFrameSize; + private final Clock clock; + private final Function filterValueExtractor; + + @SuppressWarnings("unchecked") + DynamicBatchMessageAccumulator( + int subEntrySize, + int batchSize, + Codec codec, + int maxFrameSize, + ToLongFunction publishSequenceFunction, + Function filterValueExtractor, + Clock clock, + String stream, + CompressionCodec compressionCodec, + ByteBufAllocator byteBufAllocator, + ObservationCollector observationCollector, + StreamProducer producer) { + this.producer = producer; + this.stream = stream; + this.publishSequenceFunction = publishSequenceFunction; + this.observationCollector = (ObservationCollector) observationCollector; + this.codec = codec; + this.clock = clock; + this.maxFrameSize = maxFrameSize; + this.filterValueExtractor = + filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; + if (subEntrySize <= 1) { + this.dynamicBatch = new DynamicBatch<>(this::publish, batchSize); + } else { + byte compressionCode = + compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); + this.dynamicBatch = + new DynamicBatch<>( + items -> { + if (this.producer.canSend()) { + List subBatches = new ArrayList<>(); + int count = 0; + ProducerUtils.Batch batch = + new ProducerUtils.Batch( + Client.EncodedMessageBatch.create( + byteBufAllocator, compressionCode, compressionCodec, subEntrySize), + new ProducerUtils.CompositeConfirmationCallback( + new ArrayList<>(subEntrySize))); + AccumulatedEntity lastMessageInBatch = null; + for (Object msg : items) { + AccumulatedEntity message = (AccumulatedEntity) msg; + this.observationCollector.published( + message.observationContext(), message.confirmationCallback().message()); + lastMessageInBatch = message; + batch.add( + (Codec.EncodedMessage) message.encodedEntity(), + message.confirmationCallback()); + count++; + if (count == subEntrySize) { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + subBatches.add(batch); + lastMessageInBatch = null; + batch = + new ProducerUtils.Batch( + Client.EncodedMessageBatch.create( + byteBufAllocator, + compressionCode, + compressionCodec, + subEntrySize), + new ProducerUtils.CompositeConfirmationCallback( + new ArrayList<>(subEntrySize))); + count = 0; + } + } + + if (!batch.isEmpty() && count < subEntrySize) { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + subBatches.add(batch); + } + + return this.publish(subBatches); + } else { + return false; + } + }, + batchSize * subEntrySize); + } + } + + @Override + public void add(Message message, ConfirmationHandler confirmationHandler) { + Object observationContext = this.observationCollector.prePublish(this.stream, message); + Codec.EncodedMessage encodedMessage = this.codec.encode(message); + Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); + long publishingId = this.publishSequenceFunction.applyAsLong(message); + this.dynamicBatch.add( + new ProducerUtils.SimpleAccumulatedEntity( + this.clock.time(), + publishingId, + this.filterValueExtractor.apply(message), + this.codec.encode(message), + new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), + observationContext)); + } + + @Override + public int size() { + // TODO compute dynamic batch message accumulator pending message count + return 0; + } + + @Override + public void flush(boolean force) {} + + private boolean publish(List entities) { + if (this.producer.canSend()) { + this.producer.publishInternal(entities); + return true; + } else { + return false; + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java index 4933932755..f4facf78da 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -14,16 +14,74 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import com.rabbitmq.stream.Codec; -import com.rabbitmq.stream.ConfirmationHandler; -import com.rabbitmq.stream.ConfirmationStatus; -import com.rabbitmq.stream.Message; +import com.rabbitmq.stream.*; +import com.rabbitmq.stream.compression.CompressionCodec; +import io.netty.buffer.ByteBufAllocator; import java.util.List; +import java.util.function.Function; +import java.util.function.ToLongFunction; final class ProducerUtils { private ProducerUtils() {} + static MessageAccumulator createMessageAccumulator( + boolean dynamicBatch, + int subEntrySize, + int batchSize, + CompressionCodec compressionCodec, + Codec codec, + ByteBufAllocator byteBufAllocator, + int maxFrameSize, + ToLongFunction publishSequenceFunction, + Function filterValueExtractor, + Clock clock, + String stream, + ObservationCollector observationCollector, + StreamProducer producer) { + if (dynamicBatch) { + return new DynamicBatchMessageAccumulator( + subEntrySize, + batchSize, + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + compressionCodec, + byteBufAllocator, + observationCollector, + producer); + } else { + if (subEntrySize <= 1) { + return new SimpleMessageAccumulator( + batchSize, + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + observationCollector, + producer); + } else { + return new SubEntryMessageAccumulator( + subEntrySize, + batchSize, + compressionCodec, + codec, + byteBufAllocator, + maxFrameSize, + publishSequenceFunction, + clock, + stream, + observationCollector, + producer); + } + } + } + interface ConfirmationCallback { int handle(boolean confirmed, short code); diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index 90a70e43a3..3a14ec21d8 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -15,12 +15,12 @@ package com.rabbitmq.stream.impl; import com.rabbitmq.stream.*; +import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.ToLongFunction; @@ -28,7 +28,7 @@ class SimpleMessageAccumulator implements MessageAccumulator { private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; - protected final BlockingQueue messages; + protected final BlockingQueue messages; protected final Clock clock; private final int capacity; protected final Codec codec; @@ -93,8 +93,8 @@ public void add(Message message, ConfirmationHandler confirmationHandler) { } } - ProducerUtils.AccumulatedEntity get() { - ProducerUtils.AccumulatedEntity entity = this.messages.poll(); + AccumulatedEntity get() { + AccumulatedEntity entity = this.messages.poll(); if (entity != null) { this.observationCollector.published( entity.observationContext(), entity.confirmationCallback().message()); @@ -113,24 +113,20 @@ public void flush(boolean force) { synchronized (this.producer) { publishBatch(stateCheck); } - // System.out.println(sent.get()); } - AtomicInteger sent = new AtomicInteger(); - private void publishBatch(boolean stateCheck) { if ((!stateCheck || this.producer.canSend()) && !this.messages.isEmpty()) { List entities = new ArrayList<>(this.capacity); int batchCount = 0; while (batchCount != this.capacity) { - ProducerUtils.AccumulatedEntity entity = this.get(); + AccumulatedEntity entity = this.get(); if (entity == null) { break; } entities.add(entity); batchCount++; } - this.sent.addAndGet(entities.size()); producer.publishInternal(entities); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 8c1de81345..9cecc8518b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -20,6 +20,7 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.compression.CompressionCodec; import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -72,8 +73,6 @@ class StreamProducer implements Producer { private final Codec codec; private final ToLongFunction publishSequenceFunction = entity -> ((AccumulatedEntity) entity).publishingId(); - private final Function filterValueExtractor; - private final ObservationCollector observationCollector; private final long enqueueTimeoutMs; private final boolean blockOnMaxUnconfirmed; private final boolean retryOnRecovery; @@ -120,44 +119,41 @@ class StreamProducer implements Producer { return publishingSequence.getAndIncrement(); } }; - this.filterValueExtractor = filterValueExtractor == null ? m -> null : filterValueExtractor; this.clock = environment.clock(); - this.observationCollector = - (ObservationCollector) this.environment.observationCollector(); if (subEntrySize <= 1) { - this.accumulator = - new SimpleMessageAccumulator( - batchSize, - environment.codec(), - client.maxFrameSize(), - accumulatorPublishSequenceFunction, - filterValueExtractor, - this.environment.clock(), - stream, - this.environment.observationCollector(), - this); + // this.accumulator = + // new SimpleMessageAccumulator( + // batchSize, + // environment.codec(), + // client.maxFrameSize(), + // accumulatorPublishSequenceFunction, + // filterValueExtractor, + // this.environment.clock(), + // stream, + // this.environment.observationCollector(), + // this); if (filterValueExtractor == null) { delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; } else { delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK; } } else { - this.accumulator = - new SubEntryMessageAccumulator( - subEntrySize, - batchSize, - compression == Compression.NONE - ? null - : environment.compressionCodecFactory().get(compression), - environment.codec(), - this.environment.byteBufAllocator(), - client.maxFrameSize(), - accumulatorPublishSequenceFunction, - this.environment.clock(), - stream, - environment.observationCollector(), - this); + // this.accumulator = + // new SubEntryMessageAccumulator( + // subEntrySize, + // batchSize, + // compression == Compression.NONE + // ? null + // : environment.compressionCodecFactory().get(compression), + // environment.codec(), + // this.environment.byteBufAllocator(), + // client.maxFrameSize(), + // accumulatorPublishSequenceFunction, + // this.environment.clock(), + // stream, + // environment.observationCollector(), + // this); delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK; } @@ -201,6 +197,26 @@ public int fragmentLength(Object entity) { }; } + CompressionCodec compressionCodec = null; + if (compression != null) { + compressionCodec = environment.compressionCodecFactory().get(compression); + } + this.accumulator = + ProducerUtils.createMessageAccumulator( + false, + subEntrySize, + batchSize, + compressionCodec, + environment.codec(), + environment.byteBufAllocator(), + client.maxFrameSize(), + accumulatorPublishSequenceFunction, + filterValueExtractor, + environment.clock(), + stream, + environment.observationCollector(), + this); + /* if (subEntrySize <= 1) { this.dynamicBatch = @@ -480,17 +496,6 @@ public void send(Message message, ConfirmationHandler confirmationHandler) { private void doSend(Message message, ConfirmationHandler confirmationHandler) { if (canSend()) { - // long publishingId = this.accumulatorPublishSequenceFunction.applyAsLong(message); - // Object observationContext = this.observationCollector.prePublish(this.stream, - // message); - // this.dynamicBatch.add( - // new ProducerUtils.SimpleAccumulatedEntity( - // this.clock.time(), - // publishingId, - // this.filterValueExtractor.apply(message), - // this.codec.encode(message), - // new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), - // observationContext)); this.accumulator.add(message, confirmationHandler); } else { failPublishing(message, confirmationHandler); diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index dd15601e10..3cd2fbdb78 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -42,7 +42,7 @@ public SubEntryMessageAccumulator( ToLongFunction publishSequenceFunction, Clock clock, String stream, - ObservationCollector observationCollector, + ObservationCollector observationCollector, StreamProducer producer) { super( subEntrySize * batchSize, diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index 9680845842..1dc42b7bba 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -42,8 +42,6 @@ void test() { DynamicBatch batch = new DynamicBatch<>( items -> { - // System.out.println(System.currentTimeMillis()); - // System.out.println(items.size()); batchSizeMetrics.update(items.size()); sync.down(items.size()); try { @@ -51,6 +49,7 @@ void test() { } catch (InterruptedException e) { throw new RuntimeException(e); } + return true; }, 100); RateLimiter rateLimiter = RateLimiter.create(3000); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 28c87bfbc2..44240c2a5c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -27,6 +27,7 @@ import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.codec.SimpleCodec; import com.rabbitmq.stream.compression.Compression; +import com.rabbitmq.stream.compression.DefaultCompressionCodecFactory; import com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -115,6 +116,8 @@ void init() { when(env.clock()).thenReturn(clock); when(env.codec()).thenReturn(new SimpleCodec()); when(env.observationCollector()).thenAnswer(invocation -> ObservationCollector.NO_OP); + DefaultCompressionCodecFactory ccf = new DefaultCompressionCodecFactory(); + when(env.compressionCodecFactory()).thenReturn(ccf); doAnswer( (Answer) invocationOnMock -> { From 127e3ca395f3b53f2dd8f116ebe8525afc047246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 5 Nov 2024 18:33:55 +0100 Subject: [PATCH 04/13] Clean up producer code --- .../rabbitmq/stream/impl/StreamProducer.java | 137 +----------------- 1 file changed, 1 insertion(+), 136 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 9cecc8518b..2618d7f2dc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -56,8 +56,6 @@ class StreamProducer implements Producer { private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {}; private final long id; private final MessageAccumulator accumulator; - // private final DynamicBatch dynamicBatch; - private final Clock clock; private final ToLongFunction accumulatorPublishSequenceFunction; // FIXME investigate a more optimized data structure to handle pending messages private final ConcurrentMap unconfirmedMessages; @@ -119,41 +117,14 @@ class StreamProducer implements Producer { return publishingSequence.getAndIncrement(); } }; - this.clock = environment.clock(); if (subEntrySize <= 1) { - // this.accumulator = - // new SimpleMessageAccumulator( - // batchSize, - // environment.codec(), - // client.maxFrameSize(), - // accumulatorPublishSequenceFunction, - // filterValueExtractor, - // this.environment.clock(), - // stream, - // this.environment.observationCollector(), - // this); if (filterValueExtractor == null) { delegateWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK; } else { delegateWriteCallback = OUTBOUND_MSG_FILTER_VALUE_WRITE_CALLBACK; } } else { - // this.accumulator = - // new SubEntryMessageAccumulator( - // subEntrySize, - // batchSize, - // compression == Compression.NONE - // ? null - // : environment.compressionCodecFactory().get(compression), - // environment.codec(), - // this.environment.byteBufAllocator(), - // client.maxFrameSize(), - // accumulatorPublishSequenceFunction, - // this.environment.clock(), - // stream, - // environment.observationCollector(), - // this); delegateWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK; } @@ -217,88 +188,6 @@ public int fragmentLength(Object entity) { environment.observationCollector(), this); - /* - if (subEntrySize <= 1) { - this.dynamicBatch = - new DynamicBatch<>( - items -> { - client.publishInternal( - this.publishVersion, - this.publisherId, - items, - this.writeCallback, - this.publishSequenceFunction); - }, - batchSize); - } else { - CompressionCodec compressionCodec = - compression == Compression.NONE - ? null - : environment.compressionCodecFactory().get(compression); - byte compressionCode = - compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); - this.dynamicBatch = - new DynamicBatch<>( - items -> { - List subBatches = new ArrayList<>(); - int count = 0; - ProducerUtils.Batch batch = - new ProducerUtils.Batch( - Client.EncodedMessageBatch.create( - this.environment.byteBufAllocator(), - compressionCode, - compressionCodec, - subEntrySize), - new ProducerUtils.CompositeConfirmationCallback( - new ArrayList<>(subEntrySize))); - AccumulatedEntity lastMessageInBatch = null; - for (Object msg : items) { - AccumulatedEntity message = (AccumulatedEntity) msg; - this.observationCollector.published( - message.observationContext(), message.confirmationCallback().message()); - lastMessageInBatch = message; - batch.add( - (Codec.EncodedMessage) message.encodedEntity(), - message.confirmationCallback()); - count++; - if (count == subEntrySize) { - batch.time = lastMessageInBatch.time(); - batch.publishingId = lastMessageInBatch.publishingId(); - batch.encodedMessageBatch.close(); - subBatches.add(batch); - lastMessageInBatch = null; - batch = - new ProducerUtils.Batch( - Client.EncodedMessageBatch.create( - this.environment.byteBufAllocator(), - compressionCode, - compressionCodec, - subEntrySize), - new ProducerUtils.CompositeConfirmationCallback( - new ArrayList<>(subEntrySize))); - count = 0; - } - } - - if (!batch.isEmpty() && count < subEntrySize) { - batch.time = lastMessageInBatch.time(); - batch.publishingId = lastMessageInBatch.publishingId(); - batch.encodedMessageBatch.close(); - subBatches.add(batch); - } - - client.publishInternal( - this.publishVersion, - this.publisherId, - subBatches, - this.writeCallback, - this.publishSequenceFunction); - - }, - batchSize * subEntrySize); - } - */ - if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) { AtomicReference taskReference = new AtomicReference<>(); Runnable task = @@ -388,7 +277,7 @@ private Runnable confirmTimeoutTask(Duration confirmTimeout) { error(unconfirmedEntry.getKey(), Constants.CODE_PUBLISH_CONFIRM_TIMEOUT); count++; } else { - // everything else is after, so we can stop + // everything else is after, we can stop break; } } @@ -571,30 +460,6 @@ private void cancelConfirmTimeoutTask() { } } - /* - private void publishBatch(boolean stateCheck) { - if ((!stateCheck || canSend()) && !accumulator.isEmpty()) { - List messages = new ArrayList<>(this.batchSize); - int batchCount = 0; - while (batchCount != this.batchSize) { - AccumulatedEntity accMessage = accumulator.get(); - if (accMessage == null) { - break; - } - messages.add(accMessage); - batchCount++; - } - client.publishInternal( - this.publishVersion, - this.publisherId, - messages, - this.writeCallback, - this.publishSequenceFunction); - } - } - - */ - void publishInternal(List messages) { client.publishInternal( this.publishVersion, From f2ce3048d3f7bcf8f3404a4635377ce8c114437d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 10:01:59 +0100 Subject: [PATCH 05/13] Test against dynamic-batch message accumulator --- .../impl/DynamicBatchMessageAccumulator.java | 19 +++++++++++++++++- .../rabbitmq/stream/impl/StreamProducer.java | 9 ++++++++- .../stream/impl/DynamicBatchTest.java | 5 ++--- .../stream/impl/StreamProducerUnitTest.java | 20 +++++++++++++------ 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index 11b1a768b0..73583f6e04 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -65,7 +65,24 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { this.filterValueExtractor = filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; if (subEntrySize <= 1) { - this.dynamicBatch = new DynamicBatch<>(this::publish, batchSize); + this.dynamicBatch = + new DynamicBatch<>( + items -> { + // TODO add a "replay" flag to DynamicBatch to avoid checking the producer status + // the status check helps to avoid collecting the observation another time + if (producer.canSend()) { + items.forEach( + i -> { + AccumulatedEntity entity = (AccumulatedEntity) i; + this.observationCollector.published( + entity.observationContext(), entity.confirmationCallback().message()); + }); + return this.publish(items); + } else { + return false; + } + }, + batchSize); } else { byte compressionCode = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 2618d7f2dc..5b4876af0b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -174,7 +174,7 @@ public int fragmentLength(Object entity) { } this.accumulator = ProducerUtils.createMessageAccumulator( - false, + true, subEntrySize, batchSize, compressionCodec, @@ -306,8 +306,10 @@ private long computeFirstValueOfPublishingSequence() { } } + // visible for testing void confirm(long publishingId) { AccumulatedEntity accumulatedEntity = this.unconfirmedMessages.remove(publishingId); + if (accumulatedEntity != null) { int confirmedCount = accumulatedEntity.confirmationCallback().handle(true, Constants.RESPONSE_CODE_OK); @@ -317,6 +319,11 @@ void confirm(long publishingId) { } } + // for testing + int unconfirmedCount() { + return this.unconfirmedMessages.size(); + } + void error(long publishingId, short errorCode) { AccumulatedEntity accumulatedEntity = unconfirmedMessages.remove(publishingId); if (accumulatedEntity != null) { diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index 1dc42b7bba..764631b615 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -18,7 +18,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.google.common.util.concurrent.RateLimiter; -import java.time.Duration; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -62,7 +61,7 @@ void test() { }); Assertions.assertThat(sync).completes(); long end = System.nanoTime(); - System.out.println("Done in " + Duration.ofNanos(end - start)); - reporter.report(); + // System.out.println("Done in " + Duration.ofNanos(end - start)); + // reporter.report(); } } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 44240c2a5c..451df47de5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -15,6 +15,7 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.*; @@ -43,8 +44,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.ToLongFunction; -import java.util.stream.IntStream; import org.assertj.core.api.ThrowableAssert.ThrowingCallable; +import org.assertj.core.data.Offset; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; @@ -184,26 +185,33 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( null, env); - IntStream.range(0, messageCount) + range(0, messageCount) .forEach( i -> producer.send( producer.messageBuilder().addData("".getBytes()).build(), confirmationHandler)); - IntStream.range(0, confirmedPart).forEach(publishingId -> producer.confirm(publishingId)); - assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed); + waitAtMost(() -> producer.unconfirmedCount() >= messageCount / subEntrySize); + range(0, confirmedPart).forEach(producer::confirm); + if (subEntrySize == 1) { + assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed); + } else { + assertThat(confirmedCount.get()).isCloseTo(confirmedCount.get(), Offset.offset(subEntrySize)); + } assertThat(erroredCount.get()).isZero(); + int confirmedPreviously = confirmedCount.get(); executorService.scheduleAtFixedRate(() -> clock.refresh(), 100, 100, TimeUnit.MILLISECONDS); Thread.sleep(waitTime.toMillis()); - assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed); + assertThat(confirmedCount.get()).isEqualTo(confirmedPreviously); if (confirmTimeout.isZero()) { assertThat(erroredCount.get()).isZero(); assertThat(responseCodes).isEmpty(); } else { waitAtMost( - waitTime.multipliedBy(2), () -> erroredCount.get() == (messageCount - expectedConfirmed)); + waitTime.multipliedBy(2), + () -> erroredCount.get() == (messageCount - confirmedPreviously)); assertThat(responseCodes).hasSize(1).contains(Constants.CODE_PUBLISH_CONFIRM_TIMEOUT); } } From 8c54607985645649583f32c9196dec5b051dc58c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 11:48:47 +0100 Subject: [PATCH 06/13] Add helper for message accumulator --- .../impl/DynamicBatchMessageAccumulator.java | 55 +++------- .../rabbitmq/stream/impl/ProducerUtils.java | 57 ++++++++++ .../stream/impl/SimpleMessageAccumulator.java | 44 +++----- .../rabbitmq/stream/impl/StreamProducer.java | 9 +- .../impl/SubEntryMessageAccumulator.java | 15 ++- .../stream/impl/StreamProducerTest.java | 101 +++++++++--------- src/test/resources/logback-test.xml | 1 - 7 files changed, 146 insertions(+), 136 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index 73583f6e04..3c850440ba 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -29,17 +29,10 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { - private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; - private final DynamicBatch dynamicBatch; private final ObservationCollector observationCollector; - private final ToLongFunction publishSequenceFunction; - private final String stream; private final StreamProducer producer; - private final Codec codec; - private final int maxFrameSize; - private final Clock clock; - private final Function filterValueExtractor; + private final ProducerUtils.MessageAccumulatorHelper helper; @SuppressWarnings("unchecked") DynamicBatchMessageAccumulator( @@ -55,15 +48,17 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { ByteBufAllocator byteBufAllocator, ObservationCollector observationCollector, StreamProducer producer) { + this.helper = + new ProducerUtils.MessageAccumulatorHelper( + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + observationCollector); this.producer = producer; - this.stream = stream; - this.publishSequenceFunction = publishSequenceFunction; this.observationCollector = (ObservationCollector) observationCollector; - this.codec = codec; - this.clock = clock; - this.maxFrameSize = maxFrameSize; - this.filterValueExtractor = - filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; if (subEntrySize <= 1) { this.dynamicBatch = new DynamicBatch<>( @@ -93,11 +88,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { List subBatches = new ArrayList<>(); int count = 0; ProducerUtils.Batch batch = - new ProducerUtils.Batch( - Client.EncodedMessageBatch.create( - byteBufAllocator, compressionCode, compressionCodec, subEntrySize), - new ProducerUtils.CompositeConfirmationCallback( - new ArrayList<>(subEntrySize))); + this.helper.batch( + byteBufAllocator, compressionCode, compressionCodec, subEntrySize); AccumulatedEntity lastMessageInBatch = null; for (Object msg : items) { AccumulatedEntity message = (AccumulatedEntity) msg; @@ -115,14 +107,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { subBatches.add(batch); lastMessageInBatch = null; batch = - new ProducerUtils.Batch( - Client.EncodedMessageBatch.create( - byteBufAllocator, - compressionCode, - compressionCodec, - subEntrySize), - new ProducerUtils.CompositeConfirmationCallback( - new ArrayList<>(subEntrySize))); + this.helper.batch( + byteBufAllocator, compressionCode, compressionCodec, subEntrySize); count = 0; } } @@ -145,18 +131,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { @Override public void add(Message message, ConfirmationHandler confirmationHandler) { - Object observationContext = this.observationCollector.prePublish(this.stream, message); - Codec.EncodedMessage encodedMessage = this.codec.encode(message); - Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); - long publishingId = this.publishSequenceFunction.applyAsLong(message); - this.dynamicBatch.add( - new ProducerUtils.SimpleAccumulatedEntity( - this.clock.time(), - publishingId, - this.filterValueExtractor.apply(message), - this.codec.encode(message), - new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), - observationContext)); + this.dynamicBatch.add(helper.entity(message, confirmationHandler)); } @Override diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java index f4facf78da..c74fa0102b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -17,6 +17,7 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.CompressionCodec; import io.netty.buffer.ByteBufAllocator; +import java.util.ArrayList; import java.util.List; import java.util.function.Function; import java.util.function.ToLongFunction; @@ -262,4 +263,60 @@ public Object observationContext() { "batch entity does not contain only one observation context"); } } + + static final class MessageAccumulatorHelper { + + private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; + + private final ObservationCollector observationCollector; + private final ToLongFunction publishSequenceFunction; + private final String stream; + private final Codec codec; + private final int maxFrameSize; + private final Clock clock; + private final Function filterValueExtractor; + + @SuppressWarnings("unchecked") + MessageAccumulatorHelper( + Codec codec, + int maxFrameSize, + ToLongFunction publishSequenceFunction, + Function filterValueExtractor, + Clock clock, + String stream, + ObservationCollector observationCollector) { + this.publishSequenceFunction = publishSequenceFunction; + this.codec = codec; + this.clock = clock; + this.maxFrameSize = maxFrameSize; + this.filterValueExtractor = + filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; + this.observationCollector = (ObservationCollector) observationCollector; + this.stream = stream; + } + + AccumulatedEntity entity(Message message, ConfirmationHandler confirmationHandler) { + Object observationContext = this.observationCollector.prePublish(this.stream, message); + Codec.EncodedMessage encodedMessage = this.codec.encode(message); + Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); + long publishingId = this.publishSequenceFunction.applyAsLong(message); + return new ProducerUtils.SimpleAccumulatedEntity( + this.clock.time(), + publishingId, + this.filterValueExtractor.apply(message), + this.codec.encode(message), + new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), + observationContext); + } + + Batch batch( + ByteBufAllocator bba, + byte compressionCode, + CompressionCodec compressionCodec, + int subEntrySize) { + return new ProducerUtils.Batch( + Client.EncodedMessageBatch.create(bba, compressionCode, compressionCodec, subEntrySize), + new ProducerUtils.CompositeConfirmationCallback(new ArrayList<>(subEntrySize))); + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index 3a14ec21d8..b88fe115cd 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -26,18 +26,11 @@ class SimpleMessageAccumulator implements MessageAccumulator { - private static final Function NULL_FILTER_VALUE_EXTRACTOR = m -> null; - protected final BlockingQueue messages; - protected final Clock clock; private final int capacity; - protected final Codec codec; - private final int maxFrameSize; - private final ToLongFunction publishSequenceFunction; - private final Function filterValueExtractor; - final String stream; final ObservationCollector observationCollector; private final StreamProducer producer; + final ProducerUtils.MessageAccumulatorHelper helper; @SuppressWarnings("unchecked") SimpleMessageAccumulator( @@ -50,36 +43,25 @@ class SimpleMessageAccumulator implements MessageAccumulator { String stream, ObservationCollector observationCollector, StreamProducer producer) { + this.helper = + new ProducerUtils.MessageAccumulatorHelper( + codec, + maxFrameSize, + publishSequenceFunction, + filterValueExtractor, + clock, + stream, + observationCollector); this.capacity = capacity; - this.messages = new LinkedBlockingQueue<>(capacity); - this.codec = codec; - this.maxFrameSize = maxFrameSize; - this.publishSequenceFunction = publishSequenceFunction; - this.filterValueExtractor = - filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor; - this.clock = clock; - this.stream = stream; + this.messages = new LinkedBlockingQueue<>(this.capacity); this.observationCollector = (ObservationCollector) observationCollector; this.producer = producer; } public void add(Message message, ConfirmationHandler confirmationHandler) { - Object observationContext = this.observationCollector.prePublish(this.stream, message); - Codec.EncodedMessage encodedMessage = this.codec.encode(message); - Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage); - long publishingId = this.publishSequenceFunction.applyAsLong(message); + AccumulatedEntity entity = this.helper.entity(message, confirmationHandler); try { - boolean offered = - messages.offer( - new ProducerUtils.SimpleAccumulatedEntity( - clock.time(), - publishingId, - this.filterValueExtractor.apply(message), - encodedMessage, - new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler), - observationContext), - 60, - TimeUnit.SECONDS); + boolean offered = messages.offer(entity, 60, TimeUnit.SECONDS); if (!offered) { throw new StreamException("Could not accumulate outbound message"); } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 5b4876af0b..7ae4582da9 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -172,9 +172,10 @@ public int fragmentLength(Object entity) { if (compression != null) { compressionCodec = environment.compressionCodecFactory().get(compression); } + boolean dynamicBatch = true; this.accumulator = ProducerUtils.createMessageAccumulator( - true, + dynamicBatch, subEntrySize, batchSize, compressionCodec, @@ -188,7 +189,11 @@ public int fragmentLength(Object entity) { environment.observationCollector(), this); - if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) { + boolean backgroundBatchPublishingTaskRequired = + !dynamicBatch && batchPublishingDelay.toMillis() > 0; + LOGGER.debug( + "Background batch publishing task required? {}", backgroundBatchPublishingTaskRequired); + if (backgroundBatchPublishingTaskRequired) { AtomicReference taskReference = new AtomicReference<>(); Runnable task = () -> { diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index 3cd2fbdb78..27e7fcce0d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -20,9 +20,7 @@ import com.rabbitmq.stream.ObservationCollector; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.compression.CompressionCodec; -import com.rabbitmq.stream.impl.Client.EncodedMessageBatch; import io.netty.buffer.ByteBufAllocator; -import java.util.ArrayList; import java.util.function.ToLongFunction; final class SubEntryMessageAccumulator extends SimpleMessageAccumulator { @@ -30,7 +28,7 @@ final class SubEntryMessageAccumulator extends SimpleMessageAccumulator { private final int subEntrySize; private final CompressionCodec compressionCodec; private final ByteBufAllocator byteBufAllocator; - private final byte compression; + private final byte compressionCode; public SubEntryMessageAccumulator( int subEntrySize, @@ -56,15 +54,14 @@ public SubEntryMessageAccumulator( producer); this.subEntrySize = subEntrySize; this.compressionCodec = compressionCodec; - this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); + this.compressionCode = + compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); this.byteBufAllocator = byteBufAllocator; } private ProducerUtils.Batch createBatch() { - return new ProducerUtils.Batch( - EncodedMessageBatch.create( - byteBufAllocator, compression, compressionCodec, this.subEntrySize), - new ProducerUtils.CompositeConfirmationCallback(new ArrayList<>(this.subEntrySize))); + return this.helper.batch( + this.byteBufAllocator, this.compressionCode, this.compressionCodec, this.subEntrySize); } @Override @@ -73,7 +70,7 @@ protected ProducerUtils.AccumulatedEntity get() { return null; } int count = 0; - ProducerUtils.Batch batch = createBatch(); + ProducerUtils.Batch batch = this.createBatch(); ProducerUtils.AccumulatedEntity lastMessageInBatch = null; while (count != this.subEntrySize) { ProducerUtils.AccumulatedEntity message = messages.poll(); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 19be5ea05b..ef68af3bf0 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import ch.qos.logback.classic.Level; import com.rabbitmq.stream.*; import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducerInfo; @@ -347,63 +346,59 @@ void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception { @ParameterizedTest @ValueSource(ints = {1, 7}) void producerShouldBeClosedWhenStreamIsDeleted(int subEntrySize, TestInfo info) throws Exception { - Level initialLogLevel = TestUtils.newLoggerLevel(ProducersCoordinator.class, Level.DEBUG); - try { - String s = streamName(info); - environment.streamCreator().stream(s).create(); - - StreamProducer producer = - (StreamProducer) - environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build(); - - AtomicInteger published = new AtomicInteger(0); - AtomicInteger confirmed = new AtomicInteger(0); - AtomicInteger errored = new AtomicInteger(0); - Set errorCodes = ConcurrentHashMap.newKeySet(); - - AtomicBoolean continuePublishing = new AtomicBoolean(true); - Thread publishThread = - new Thread( - () -> { - ConfirmationHandler confirmationHandler = - confirmationStatus -> { - if (confirmationStatus.isConfirmed()) { - confirmed.incrementAndGet(); - } else { - errored.incrementAndGet(); - errorCodes.add(confirmationStatus.getCode()); - } - }; - while (continuePublishing.get()) { - try { - producer.send( - producer - .messageBuilder() - .addData("".getBytes(StandardCharsets.UTF_8)) - .build(), - confirmationHandler); - published.incrementAndGet(); - } catch (StreamException e) { - // OK - } + String s = streamName(info); + environment.streamCreator().stream(s).create(); + + StreamProducer producer = + (StreamProducer) environment.producerBuilder().subEntrySize(subEntrySize).stream(s).build(); + + AtomicInteger published = new AtomicInteger(0); + AtomicInteger confirmed = new AtomicInteger(0); + AtomicInteger errored = new AtomicInteger(0); + Set errorCodes = ConcurrentHashMap.newKeySet(); + + AtomicBoolean continuePublishing = new AtomicBoolean(true); + Thread publishThread = + new Thread( + () -> { + ConfirmationHandler confirmationHandler = + confirmationStatus -> { + if (confirmationStatus.isConfirmed()) { + confirmed.incrementAndGet(); + } else { + errored.incrementAndGet(); + errorCodes.add(confirmationStatus.getCode()); + } + }; + while (continuePublishing.get()) { + try { + producer.send( + producer + .messageBuilder() + .addData("".getBytes(StandardCharsets.UTF_8)) + .build(), + confirmationHandler); + published.incrementAndGet(); + } catch (StreamException e) { + // OK } - }); - publishThread.start(); + } + }); + publishThread.start(); - Thread.sleep(1000L); + waitAtMost(() -> confirmed.get() > 100); + int confirmedNow = confirmed.get(); + waitAtMost(() -> confirmed.get() > confirmedNow + 1000); - assertThat(producer.isOpen()).isTrue(); + assertThat(producer.isOpen()).isTrue(); - environment.deleteStream(s); + environment.deleteStream(s); - waitAtMost(() -> !producer.isOpen()); - continuePublishing.set(false); - waitAtMost( - () -> !errorCodes.isEmpty(), - () -> "The producer should have received negative publish confirms"); - } finally { - TestUtils.newLoggerLevel(ProducersCoordinator.class, initialLogLevel); - } + waitAtMost(() -> !producer.isOpen()); + continuePublishing.set(false); + waitAtMost( + () -> !errorCodes.isEmpty(), + () -> "The producer should have received negative publish confirms"); } @ParameterizedTest diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 6931d5f21a..4bec720537 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -6,7 +6,6 @@ - From 517926e2d0af3108455a4c4069c7a0efe06c0bd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 15:44:22 +0100 Subject: [PATCH 07/13] Run test suite against both versions of message accumulator Regular and dynamic batch. --- .github/workflows/test-rabbitmq-alphas.yml | 10 +++++++++- .github/workflows/test-supported-java-versions.yml | 11 ++++++++++- .github/workflows/test.yml | 10 +++++++++- .../java/com/rabbitmq/stream/impl/StreamProducer.java | 2 +- .../rabbitmq/stream/impl/StreamProducerBuilder.java | 6 ++++++ .../rabbitmq/stream/impl/StreamProducerUnitTest.java | 3 +++ 6 files changed, 38 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test-rabbitmq-alphas.yml b/.github/workflows/test-rabbitmq-alphas.yml index 4409947e27..2a89ab39ff 100644 --- a/.github/workflows/test-rabbitmq-alphas.yml +++ b/.github/workflows/test-rabbitmq-alphas.yml @@ -32,11 +32,19 @@ jobs: run: ci/start-broker.sh env: RABBITMQ_IMAGE: ${{ matrix.rabbitmq-image }} - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - name: Stop broker run: docker stop rabbitmq && docker rm rabbitmq diff --git a/.github/workflows/test-supported-java-versions.yml b/.github/workflows/test-supported-java-versions.yml index 05983059c4..8e66bcb89d 100644 --- a/.github/workflows/test-supported-java-versions.yml +++ b/.github/workflows/test-supported-java-versions.yml @@ -33,12 +33,21 @@ jobs: run: ci/start-broker.sh - name: Display Java version run: ./mvnw --version - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \ -Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \ + -Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true - name: Stop broker run: docker stop rabbitmq && docker rm rabbitmq diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7081a78ee5..91a432ef39 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -33,12 +33,20 @@ jobs: gpg-passphrase: MAVEN_GPG_PASSPHRASE - name: Start broker run: ci/start-broker.sh - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - name: Stop broker run: docker stop rabbitmq && docker rm rabbitmq - name: Upload Codecov report diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 7ae4582da9..24ffdc8ffb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -86,6 +86,7 @@ class StreamProducer implements Producer { String stream, int subEntrySize, int batchSize, + boolean dynamicBatch, Compression compression, Duration batchPublishingDelay, int maxUnconfirmedMessages, @@ -172,7 +173,6 @@ public int fragmentLength(Object entity) { if (compression != null) { compressionCodec = environment.compressionCodecFactory().get(compression); } - boolean dynamicBatch = true; this.accumulator = ProducerUtils.createMessageAccumulator( dynamicBatch, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 493426c67f..3ed441c9eb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -27,6 +27,9 @@ class StreamProducerBuilder implements ProducerBuilder { + static final boolean DEFAULT_DYNAMIC_BATCH = + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false")); + private final StreamEnvironment environment; private String name; @@ -53,6 +56,8 @@ class StreamProducerBuilder implements ProducerBuilder { private Function filterValueExtractor; + private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH; + StreamProducerBuilder(StreamEnvironment environment) { this.environment = environment; } @@ -198,6 +203,7 @@ public Producer build() { stream, subEntrySize, batchSize, + dynamicBatch, compression, batchPublishingDelay, maxUnconfirmedMessages, diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 451df47de5..c91590a4ec 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -176,6 +176,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout( "stream", subEntrySize, 10, + StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH, Compression.NONE, Duration.ofMillis(100), messageCount * 10, @@ -226,6 +227,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry "stream", subEntrySize, 10, + StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH, Compression.NONE, Duration.ZERO, 2, @@ -266,6 +268,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize) "stream", subEntrySize, 10, + StreamProducerBuilder.DEFAULT_DYNAMIC_BATCH, Compression.NONE, Duration.ZERO, 2, From fa5e0a3fbea9226f33ea2be4116cd3f63c1c9961 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 15:56:44 +0100 Subject: [PATCH 08/13] Add dynamic batch option in producer builder --- src/docs/asciidoc/api.adoc | 4 ++++ .../com/rabbitmq/stream/ProducerBuilder.java | 22 +++++++++++++++++++ .../stream/impl/StreamProducerBuilder.java | 7 ++++++ 3 files changed, 33 insertions(+) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 28eb063bc6..f7edd8fffd 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -455,6 +455,10 @@ blocking when the limit is reached. |Period to send a batch of messages. |100 ms +|`dynamicBatch` +|Adapt batch size depending on ingress rate. +|false + |`confirmTimeout` |[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal outstanding unconfirmed messages timed out. diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 5686a370cf..8d9c8cc7b4 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -97,6 +97,28 @@ public interface ProducerBuilder { */ ProducerBuilder batchPublishingDelay(Duration batchPublishingDelay); + /** + * Adapt batch size depending on ingress rate. + * + *

A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive + * for sustained high ingress rates. + * + *

Set this flag to true if you want as little delay as possible before calling + * {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker. + * + *

Set this flag to false if latency is not critical for your use case and you + * want the highest throughput possible for both publishing and consuming. + * + *

Dynamic batch is not activated by default (dynamicBatch = false). + * + *

Dynamic batch is experimental. + * + * @param dynamicBatch + * @return this builder instance + * @since 0.20.0 + */ + ProducerBuilder dynamicBatch(boolean dynamicBatch); + /** * The maximum number of unconfirmed outbound messages. * diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 3ed441c9eb..814266f7e5 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -102,11 +102,18 @@ public ProducerBuilder compression(Compression compression) { return this; } + @Override public StreamProducerBuilder batchPublishingDelay(Duration batchPublishingDelay) { this.batchPublishingDelay = batchPublishingDelay; return this; } + @Override + public ProducerBuilder dynamicBatch(boolean dynamicBatch) { + this.dynamicBatch = dynamicBatch; + return this; + } + @Override public ProducerBuilder maxUnconfirmedMessages(int maxUnconfirmedMessages) { if (maxUnconfirmedMessages <= 0) { From 6d5165343793da2d98e899fe48bd3f24c1e0214a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 16:05:48 +0100 Subject: [PATCH 09/13] Fix producer builder duplication --- .../rabbitmq/stream/impl/StreamProducerBuilder.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 814266f7e5..14a7526a43 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -21,6 +21,7 @@ import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.compression.Compression; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.time.Duration; import java.util.function.Function; import java.util.function.ToIntFunction; @@ -242,11 +243,13 @@ public Producer build() { StreamProducerBuilder duplicate() { StreamProducerBuilder duplicate = new StreamProducerBuilder(this.environment); for (Field field : StreamProducerBuilder.class.getDeclaredFields()) { - field.setAccessible(true); - try { - field.set(duplicate, field.get(this)); - } catch (IllegalAccessException e) { - throw new StreamException("Error while duplicating stream producer builder", e); + if (!Modifier.isStatic(field.getModifiers())) { + field.setAccessible(true); + try { + field.set(duplicate, field.get(this)); + } catch (IllegalAccessException e) { + throw new StreamException("Error while duplicating stream producer builder", e); + } } } return duplicate; From 40d58c895862687a1c620c5471f9c0b40eaf7be4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 16:43:23 +0100 Subject: [PATCH 10/13] Stop thread in DynamicBatch --- .../java/com/rabbitmq/stream/impl/DynamicBatch.java | 11 +++++++++-- .../stream/impl/DynamicBatchMessageAccumulator.java | 5 +++++ .../com/rabbitmq/stream/impl/MessageAccumulator.java | 5 ++++- .../stream/impl/SimpleMessageAccumulator.java | 3 +++ .../java/com/rabbitmq/stream/impl/StreamProducer.java | 1 + 5 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index a2dc00acd7..9984f3125f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class DynamicBatch { +final class DynamicBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); private static final int MIN_BATCH_SIZE = 32; @@ -35,11 +35,13 @@ class DynamicBatch { private final BlockingQueue requests = new LinkedBlockingQueue<>(); private final Predicate> consumer; private final int configuredBatchSize; + private final Thread thread; DynamicBatch(Predicate> consumer, int batchSize) { this.consumer = consumer; this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); - new Thread(this::loop).start(); + this.thread = new Thread(this::loop); + this.thread.start(); } void add(T item) { @@ -103,4 +105,9 @@ private boolean completeBatch(List items) { return false; } } + + @Override + public void close() { + this.thread.interrupt(); + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index 3c850440ba..0420216712 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -151,4 +151,9 @@ private boolean publish(List entities) { return false; } } + + @Override + public void close() { + this.dynamicBatch.close(); + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java index ff0c592317..56222dc355 100644 --- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java @@ -17,11 +17,14 @@ import com.rabbitmq.stream.ConfirmationHandler; import com.rabbitmq.stream.Message; -interface MessageAccumulator { +interface MessageAccumulator extends AutoCloseable { void add(Message message, ConfirmationHandler confirmationHandler); int size(); void flush(boolean force); + + @Override + void close(); } diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index b88fe115cd..53253bd77d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -112,4 +112,7 @@ private void publishBatch(boolean stateCheck) { producer.publishInternal(entities); } } + + @Override + public void close() {} } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 24ffdc8ffb..546ea1549a 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -441,6 +441,7 @@ public void close() { } void closeFromEnvironment() { + this.accumulator.close(); this.closingCallback.run(); cancelConfirmTimeoutTask(); this.closed.set(true); From b56df00204c10652bafa6fda6b8686bf86b8e70c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 17:37:48 +0100 Subject: [PATCH 11/13] Use virtual threads if available in dynamic batch support class --- pom.xml | 2 +- .../com/rabbitmq/stream/ProducerBuilder.java | 2 +- .../stream/impl/ConcurrencyUtils.java | 60 +++++++ .../rabbitmq/stream/impl/DynamicBatch.java | 2 +- .../impl/DynamicBatchMessageAccumulator.java | 1 - .../stream/impl/MessageAccumulator.java | 2 +- .../stream/impl/SimpleMessageAccumulator.java | 35 ++-- .../rabbitmq/stream/impl/StreamProducer.java | 151 ++++++++++-------- .../stream/impl/StreamProducerBuilder.java | 2 +- .../impl/SubEntryMessageAccumulator.java | 2 +- .../stream/impl/StreamProducerTest.java | 2 +- .../stream/impl/StreamProducerUnitTest.java | 2 +- .../com/rabbitmq/stream/impl/TestUtils.java | 2 +- 13 files changed, 174 insertions(+), 91 deletions(-) create mode 100644 src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java diff --git a/pom.xml b/pom.xml index b4e60445f3..e8f3751d30 100644 --- a/pom.xml +++ b/pom.xml @@ -533,7 +533,7 @@ - + origin/main // Copyright (c) $YEAR Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 8d9c8cc7b4..79bc9905d6 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the diff --git a/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java b/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java new file mode 100644 index 0000000000..daf877f20b --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java @@ -0,0 +1,60 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ConcurrencyUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrencyUtils.class); + + private static final ThreadFactory THREAD_FACTORY; + + static { + if (isJava21OrMore()) { + LOGGER.debug("Running Java 21 or more, using virtual threads"); + Class builderClass = + Arrays.stream(Thread.class.getDeclaredClasses()) + .filter(c -> "Builder".equals(c.getSimpleName())) + .findFirst() + .get(); + // Reflection code is the same as: + // Thread.ofVirtual().factory(); + try { + Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null); + THREAD_FACTORY = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder); + } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } else { + THREAD_FACTORY = Executors.defaultThreadFactory(); + } + } + + private ConcurrencyUtils() {} + + static ThreadFactory defaultThreadFactory() { + return THREAD_FACTORY; + } + + private static boolean isJava21OrMore() { + return Utils.versionCompare(System.getProperty("java.version"), "21.0") >= 0; + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 9984f3125f..f06cbc6591 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -40,7 +40,7 @@ final class DynamicBatch implements AutoCloseable { DynamicBatch(Predicate> consumer, int batchSize) { this.consumer = consumer; this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); - this.thread = new Thread(this::loop); + this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); this.thread.start(); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index 0420216712..0031ebd6c2 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -119,7 +119,6 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { batch.encodedMessageBatch.close(); subBatches.add(batch); } - return this.publish(subBatches); } else { return false; diff --git a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java index 56222dc355..80e3bb516e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the diff --git a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java index 53253bd77d..97e99e4b9f 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -69,9 +69,7 @@ public void add(Message message, ConfirmationHandler confirmationHandler) { throw new StreamException("Error while accumulating outbound message", e); } if (this.messages.size() == this.capacity) { - synchronized (this.producer) { - publishBatch(true); - } + publishBatch(true); } } @@ -92,24 +90,27 @@ public int size() { @Override public void flush(boolean force) { boolean stateCheck = !force; - synchronized (this.producer) { - publishBatch(stateCheck); - } + publishBatch(stateCheck); } private void publishBatch(boolean stateCheck) { - if ((!stateCheck || this.producer.canSend()) && !this.messages.isEmpty()) { - List entities = new ArrayList<>(this.capacity); - int batchCount = 0; - while (batchCount != this.capacity) { - AccumulatedEntity entity = this.get(); - if (entity == null) { - break; + this.producer.lock(); + try { + if ((!stateCheck || this.producer.canSend()) && !this.messages.isEmpty()) { + List entities = new ArrayList<>(this.capacity); + int batchCount = 0; + while (batchCount != this.capacity) { + AccumulatedEntity entity = this.get(); + if (entity == null) { + break; + } + entities.add(entity); + batchCount++; } - entities.add(entity); - batchCount++; + producer.publishInternal(entities); } - producer.publishInternal(entities); + } finally { + this.producer.unlock(); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 546ea1549a..e7e67be0be 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -43,6 +43,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.ToLongFunction; import org.slf4j.Logger; @@ -56,7 +58,6 @@ class StreamProducer implements Producer { private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {}; private final long id; private final MessageAccumulator accumulator; - private final ToLongFunction accumulatorPublishSequenceFunction; // FIXME investigate a more optimized data structure to handle pending messages private final ConcurrentMap unconfirmedMessages; private final int batchSize; @@ -79,6 +80,7 @@ class StreamProducer implements Producer { private volatile Status status; private volatile ScheduledFuture confirmTimeoutFuture; private final short publishVersion; + private final Lock lock = new ReentrantLock(); @SuppressFBWarnings("CT_CONSTRUCTOR_THROW") StreamProducer( @@ -110,7 +112,7 @@ class StreamProducer implements Producer { this.closingCallback = environment.registerProducer(this, name, this.stream); final Client.OutboundEntityWriteCallback delegateWriteCallback; AtomicLong publishingSequence = new AtomicLong(computeFirstValueOfPublishingSequence()); - this.accumulatorPublishSequenceFunction = + ToLongFunction accumulatorPublishSequenceFunction = msg -> { if (msg.hasPublishingId()) { return msg.getPublishingId(); @@ -491,76 +493,80 @@ void unavailable() { } void running() { - synchronized (this) { - LOGGER.debug( - "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", - this.unconfirmedMessages.size(), - this.accumulator.size()); - if (this.retryOnRecovery) { - LOGGER.debug("Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size()); - if (!this.unconfirmedMessages.isEmpty()) { - Map messagesToResend = new TreeMap<>(this.unconfirmedMessages); - this.unconfirmedMessages.clear(); - Iterator> resendIterator = - messagesToResend.entrySet().iterator(); - while (resendIterator.hasNext()) { - List messages = new ArrayList<>(this.batchSize); - int batchCount = 0; - while (batchCount != this.batchSize) { - Object accMessage = - resendIterator.hasNext() ? resendIterator.next().getValue() : null; - if (accMessage == null) { - break; + this.executeInLock( + () -> { + LOGGER.debug( + "Recovering producer with {} unconfirmed message(s) and {} accumulated message(s)", + this.unconfirmedMessages.size(), + this.accumulator.size()); + if (this.retryOnRecovery) { + LOGGER.debug( + "Re-publishing {} unconfirmed message(s)", this.unconfirmedMessages.size()); + if (!this.unconfirmedMessages.isEmpty()) { + Map messagesToResend = + new TreeMap<>(this.unconfirmedMessages); + this.unconfirmedMessages.clear(); + Iterator> resendIterator = + messagesToResend.entrySet().iterator(); + while (resendIterator.hasNext()) { + List messages = new ArrayList<>(this.batchSize); + int batchCount = 0; + while (batchCount != this.batchSize) { + Object accMessage = + resendIterator.hasNext() ? resendIterator.next().getValue() : null; + if (accMessage == null) { + break; + } + messages.add(accMessage); + batchCount++; + } + client.publishInternal( + this.publishVersion, + this.publisherId, + messages, + this.writeCallback, + this.publishSequenceFunction); + } + } + } else { + LOGGER.debug( + "Skipping republishing of {} unconfirmed messages", + this.unconfirmedMessages.size()); + Map messagesToFail = new TreeMap<>(this.unconfirmedMessages); + this.unconfirmedMessages.clear(); + for (AccumulatedEntity accumulatedEntity : messagesToFail.values()) { + try { + int permits = + accumulatedEntity + .confirmationCallback() + .handle(false, CODE_PUBLISH_CONFIRM_TIMEOUT); + this.unconfirmedMessagesSemaphore.release(permits); + } catch (Exception e) { + LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage()); + this.unconfirmedMessagesSemaphore.release(1); } - messages.add(accMessage); - batchCount++; } - client.publishInternal( - this.publishVersion, - this.publisherId, - messages, - this.writeCallback, - this.publishSequenceFunction); } - } - } else { - LOGGER.debug( - "Skipping republishing of {} unconfirmed messages", this.unconfirmedMessages.size()); - Map messagesToFail = new TreeMap<>(this.unconfirmedMessages); - this.unconfirmedMessages.clear(); - for (AccumulatedEntity accumulatedEntity : messagesToFail.values()) { - try { - int permits = - accumulatedEntity - .confirmationCallback() - .handle(false, CODE_PUBLISH_CONFIRM_TIMEOUT); - this.unconfirmedMessagesSemaphore.release(permits); - } catch (Exception e) { - LOGGER.debug("Error while nack-ing outbound message: {}", e.getMessage()); - this.unconfirmedMessagesSemaphore.release(1); + this.accumulator.flush(true); + int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); + if (toRelease > 0) { + unconfirmedMessagesSemaphore.release(toRelease); + if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) { + LOGGER.debug( + "Could not acquire {} permit(s) for message republishing", + this.unconfirmedMessages.size()); + } } - } - } - this.accumulator.flush(true); - int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits(); - if (toRelease > 0) { - unconfirmedMessagesSemaphore.release(toRelease); - if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) { - LOGGER.debug( - "Could not acquire {} permit(s) for message republishing", - this.unconfirmedMessages.size()); - } - } - } + }); this.status = Status.RUNNING; } - synchronized void setClient(Client client) { - this.client = client; + void setClient(Client client) { + this.executeInLock(() -> this.client = client); } - synchronized void setPublisherId(byte publisherId) { - this.publisherId = publisherId; + void setPublisherId(byte publisherId) { + this.executeInLock(() -> this.publisherId = publisherId); } Status status() { @@ -646,4 +652,21 @@ public int fragmentLength(Object entity) { } } } + + void lock() { + this.lock.lock(); + } + + void unlock() { + this.lock.unlock(); + } + + private void executeInLock(Runnable action) { + this.lock(); + try { + action.run(); + } finally { + this.unlock(); + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 14a7526a43..54807489e2 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the diff --git a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java index 27e7fcce0d..14645074e6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index ef68af3bf0..e900fb4990 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index c91590a4ec..19390e6877 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2021-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 8700987f0f..76c1418d36 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the From f582f7d2f5f9fbdf7de878eb418fbd084c37ee06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 18:11:05 +0100 Subject: [PATCH 12/13] Introduce replay argument in dynamic batch support class --- .../rabbitmq/stream/impl/DynamicBatch.java | 64 +++++++++-------- .../impl/DynamicBatchMessageAccumulator.java | 70 +++++++++---------- .../stream/impl/DynamicBatchTest.java | 32 +++++---- 3 files changed, 86 insertions(+), 80 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index f06cbc6591..ba52d23950 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -22,7 +22,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; +import java.util.function.BiPredicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,11 +33,11 @@ final class DynamicBatch implements AutoCloseable { private static final int MAX_BATCH_SIZE = 8192; private final BlockingQueue requests = new LinkedBlockingQueue<>(); - private final Predicate> consumer; + private final BiPredicate, Boolean> consumer; private final int configuredBatchSize; private final Thread thread; - DynamicBatch(Predicate> consumer, int batchSize) { + DynamicBatch(BiPredicate, Boolean> consumer, int batchSize) { this.consumer = consumer; this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); @@ -53,8 +53,10 @@ void add(T item) { } private void loop() { - int batchSize = this.configuredBatchSize; - List batch = new ArrayList<>(batchSize); + State state = new State<>(); + state.batchSize = this.configuredBatchSize; + state.items = new ArrayList<>(state.batchSize); + state.retry = false; Thread currentThread = Thread.currentThread(); T item; while (!currentThread.isInterrupted()) { @@ -65,44 +67,50 @@ private void loop() { return; } if (item != null) { - batch.add(item); - if (batch.size() >= batchSize) { - if (this.completeBatch(batch)) { - batchSize = min(batchSize * 2, MAX_BATCH_SIZE); - batch = new ArrayList<>(batchSize); - } + state.items.add(item); + if (state.items.size() >= state.batchSize) { + this.maybeCompleteBatch(state, true); } else { item = this.requests.poll(); if (item == null) { - if (this.completeBatch(batch)) { - batchSize = max(batchSize / 2, MIN_BATCH_SIZE); - batch = new ArrayList<>(batchSize); - } + this.maybeCompleteBatch(state, false); } else { - batch.add(item); - if (batch.size() >= batchSize) { - if (this.completeBatch(batch)) { - batchSize = min(batchSize * 2, MAX_BATCH_SIZE); - batch = new ArrayList<>(batchSize); - } + state.items.add(item); + if (state.items.size() >= state.batchSize) { + this.maybeCompleteBatch(state, true); } } } } else { - if (this.completeBatch(batch)) { - batchSize = min(batchSize * 2, MAX_BATCH_SIZE); - batch = new ArrayList<>(batchSize); - } + this.maybeCompleteBatch(state, false); } } } - private boolean completeBatch(List items) { + private static final class State { + + int batchSize; + List items; + boolean retry; + } + + private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { try { - return this.consumer.test(items); + boolean completed = this.consumer.test(state.items, state.retry); + if (completed) { + if (increaseIfCompleted) { + state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE); + } else { + state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE); + } + state.items = new ArrayList<>(state.batchSize); + state.retry = false; + } else { + state.retry = true; + } } catch (Exception e) { LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage()); - return false; + state.retry = true; } } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index 0031ebd6c2..2b8d6241e9 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -62,20 +62,16 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { if (subEntrySize <= 1) { this.dynamicBatch = new DynamicBatch<>( - items -> { - // TODO add a "replay" flag to DynamicBatch to avoid checking the producer status - // the status check helps to avoid collecting the observation another time - if (producer.canSend()) { + (items, replay) -> { + if (!replay) { items.forEach( i -> { AccumulatedEntity entity = (AccumulatedEntity) i; this.observationCollector.published( entity.observationContext(), entity.confirmationCallback().message()); }); - return this.publish(items); - } else { - return false; } + return this.publish(items); }, batchSize); } else { @@ -83,46 +79,44 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); this.dynamicBatch = new DynamicBatch<>( - items -> { - if (this.producer.canSend()) { - List subBatches = new ArrayList<>(); - int count = 0; - ProducerUtils.Batch batch = - this.helper.batch( - byteBufAllocator, compressionCode, compressionCodec, subEntrySize); - AccumulatedEntity lastMessageInBatch = null; - for (Object msg : items) { - AccumulatedEntity message = (AccumulatedEntity) msg; + (items, replay) -> { + List subBatches = new ArrayList<>(); + int count = 0; + ProducerUtils.Batch batch = + this.helper.batch( + byteBufAllocator, compressionCode, compressionCodec, subEntrySize); + AccumulatedEntity lastMessageInBatch = null; + for (Object msg : items) { + AccumulatedEntity message = (AccumulatedEntity) msg; + if (!replay) { this.observationCollector.published( message.observationContext(), message.confirmationCallback().message()); - lastMessageInBatch = message; - batch.add( - (Codec.EncodedMessage) message.encodedEntity(), - message.confirmationCallback()); - count++; - if (count == subEntrySize) { - batch.time = lastMessageInBatch.time(); - batch.publishingId = lastMessageInBatch.publishingId(); - batch.encodedMessageBatch.close(); - subBatches.add(batch); - lastMessageInBatch = null; - batch = - this.helper.batch( - byteBufAllocator, compressionCode, compressionCodec, subEntrySize); - count = 0; - } } - - if (!batch.isEmpty() && count < subEntrySize) { + lastMessageInBatch = message; + batch.add( + (Codec.EncodedMessage) message.encodedEntity(), + message.confirmationCallback()); + count++; + if (count == subEntrySize) { batch.time = lastMessageInBatch.time(); batch.publishingId = lastMessageInBatch.publishingId(); batch.encodedMessageBatch.close(); subBatches.add(batch); + lastMessageInBatch = null; + batch = + this.helper.batch( + byteBufAllocator, compressionCode, compressionCodec, subEntrySize); + count = 0; } - return this.publish(subBatches); - } else { - return false; } + + if (!batch.isEmpty() && count < subEntrySize) { + batch.time = lastMessageInBatch.time(); + batch.publishingId = lastMessageInBatch.publishingId(); + batch.encodedMessageBatch.close(); + subBatches.add(batch); + } + return this.publish(subBatches); }, batchSize * subEntrySize); } diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index 764631b615..2049915457 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -40,28 +40,32 @@ void test() { Random random = new Random(); DynamicBatch batch = new DynamicBatch<>( - items -> { + (items, replay) -> { batchSizeMetrics.update(items.size()); - sync.down(items.size()); try { Thread.sleep(random.nextInt(10) + 1); } catch (InterruptedException e) { throw new RuntimeException(e); } + sync.down(items.size()); return true; }, 100); - RateLimiter rateLimiter = RateLimiter.create(3000); - long start = System.nanoTime(); - IntStream.range(0, itemCount) - .forEach( - i -> { - rateLimiter.acquire(); - batch.add(String.valueOf(i)); - }); - Assertions.assertThat(sync).completes(); - long end = System.nanoTime(); - // System.out.println("Done in " + Duration.ofNanos(end - start)); - // reporter.report(); + try { + RateLimiter rateLimiter = RateLimiter.create(3000); + long start = System.nanoTime(); + IntStream.range(0, itemCount) + .forEach( + i -> { + rateLimiter.acquire(); + batch.add(String.valueOf(i)); + }); + Assertions.assertThat(sync).completes(); + long end = System.nanoTime(); + // System.out.println("Done in " + Duration.ofNanos(end - start)); + // reporter.report(); + } finally { + batch.close(); + } } } From 30524543b9a73c8ff7d3ecccd2ef70811869b997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 6 Nov 2024 18:57:40 +0100 Subject: [PATCH 13/13] Test dynamic batch publishing on CI --- .github/workflows/test-pr.yml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 0af64477fa..7543f2f175 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -24,9 +24,17 @@ jobs: cache: 'maven' - name: Start broker run: ci/start-broker.sh - - name: Test + - name: Test (no dynamic-batch publishing) run: | ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem