diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index c075f91453..d490a5d12b 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -470,7 +470,7 @@ blocking when the limit is reached. |`dynamicBatch` |Adapt batch size depending on ingress rate. -|false +|true |`confirmTimeout` |[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal @@ -897,11 +897,11 @@ Useful when using an external store for offset tracking. |`flow#initialCredits` |Number of credits when the subscription is created. Increase for higher throughput at the expense of memory usage. -|1 +|10 |`flow#strategy` |The `ConsumerFlowStrategy` to use. -|`ConsumerFlowStrategy#creditOnChunkArrival(1)` +|`ConsumerFlowStrategy#creditOnChunkArrival(10)` |=== [NOTE] diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index 9e391d525c..2579548c01 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -245,10 +245,14 @@ interface FlowConfiguration { /** * The number of initial credits for the subscription. * - *
Default is 1. + *
Default is 10. * *
This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}. * + *
Use a small value like 1 for streams with large chunks (several hundreds of messages per + * chunk) and higher values (5 or more) for streams with small chunks (1 or a few messages per + * chunk). + * * @param initialCredits the number of initial credits * @return this configuration instance * @see ConsumerFlowStrategy#creditOnChunkArrival(int) diff --git a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java index 3d6b6e94b1..9f32a2f9fd 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Broadcom. All Rights Reserved. +// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -120,13 +120,14 @@ static ConsumerFlowStrategy creditOnChunkArrival() { * * @param initialCredits number of initial credits * @return flow strategy + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) { return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits); } /** - * Strategy that provides 1 initial credit and a credit when half of the chunk messages are + * Strategy that provides 10 initial credits and a credit when half of the chunk messages are * processed. * *
Make sure to call {@link MessageHandler.Context#processed()} on every message when using @@ -135,7 +136,7 @@ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) { * @return flow strategy */ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() { - return creditOnProcessedMessageCount(1, 0.5); + return creditOnProcessedMessageCount(10, 0.5); } /** @@ -147,6 +148,7 @@ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() { * * @param initialCredits number of initial credits * @return flow strategy + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) { return creditOnProcessedMessageCount(initialCredits, 0.5); diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index dca01baf9f..a445dc2fb4 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -106,22 +106,26 @@ public interface ProducerBuilder { /** * Adapt batch size depending on ingress rate. * - *
A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive - * for sustained high ingress rates. + *
A dynamic-batch approach improves latency for low ingress rates. * *
Set this flag to true
if you want as little delay as possible between calling
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
+ * Consumers should provide enough initial credits (between 5 and 10, depending on the workload),
+ * see {@link ConsumerBuilder#flow()} and {@link
+ * ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
*
*
Set this flag to false
if latency is not critical for your use case and you
- * want the highest throughput possible for both publishing and consuming.
+ * want the highest throughput possible for both publishing and consuming. Consumers can provide 1
+ * initial credit (depending on the workload), see {@link ConsumerBuilder#flow()} and {@link
+ * ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
*
- *
Dynamic batch is not activated by default (dynamicBatch = false
).
- *
- *
Dynamic batch is experimental. + *
Dynamic batch is activated by default (dynamicBatch = true
).
*
* @param dynamicBatch
* @return this builder instance
* @since 0.20.0
+ * @see ConsumerBuilder#flow()
+ * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
*/
ProducerBuilder dynamicBatch(boolean dynamicBatch);
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
index c6364d1aa3..ce8660c6a5 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -428,7 +428,7 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
this.consumerBuilder = consumerBuilder;
}
- private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival();
+ private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(10);
@Override
public FlowConfiguration initialCredits(int initialCredits) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
index 54807489e2..244b2b1746 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
@@ -29,7 +29,7 @@
class StreamProducerBuilder implements ProducerBuilder {
static final boolean DEFAULT_DYNAMIC_BATCH =
- Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false"));
+ Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));
private final StreamEnvironment environment;
diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
index 3777d7e7d9..7dd5e8da48 100644
--- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java
@@ -665,7 +665,7 @@ void ignoredMessageShouldTriggerMessageProcessing() {
new ConsumerFlowStrategy() {
@Override
public int initialCredits() {
- return 1;
+ return 10;
}
@Override
diff --git a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java
index 34166a8f2e..90ec80ae1c 100644
--- a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2023-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -55,7 +55,7 @@ void smallChunksAndSmallRatiosShouldCredit() {
}
ConsumerFlowStrategy build(double ratio) {
- return creditOnProcessedMessageCount(1, ratio);
+ return creditOnProcessedMessageCount(10, ratio);
}
ConsumerFlowStrategy.Context context(long messageCount) {
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
index be2dc4a8c3..0f439df0b8 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
@@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
environment.consumerBuilder().stream(stream)
.offset(OffsetSpecification.first())
.flow()
- .strategy(creditWhenHalfMessagesProcessed())
+ .strategy(creditWhenHalfMessagesProcessed(1))
.builder();
List