diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index c075f91453..d490a5d12b 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -470,7 +470,7 @@ blocking when the limit is reached. |`dynamicBatch` |Adapt batch size depending on ingress rate. -|false +|true |`confirmTimeout` |[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal @@ -897,11 +897,11 @@ Useful when using an external store for offset tracking. |`flow#initialCredits` |Number of credits when the subscription is created. Increase for higher throughput at the expense of memory usage. -|1 +|10 |`flow#strategy` |The `ConsumerFlowStrategy` to use. -|`ConsumerFlowStrategy#creditOnChunkArrival(1)` +|`ConsumerFlowStrategy#creditOnChunkArrival(10)` |=== [NOTE] diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index 9e391d525c..2579548c01 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -245,10 +245,14 @@ interface FlowConfiguration { /** * The number of initial credits for the subscription. * - *

Default is 1. + *

Default is 10. * *

This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}. * + *

Use a small value like 1 for streams with large chunks (several hundreds of messages per + * chunk) and higher values (5 or more) for streams with small chunks (1 or a few messages per + * chunk). + * * @param initialCredits the number of initial credits * @return this configuration instance * @see ConsumerFlowStrategy#creditOnChunkArrival(int) diff --git a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java index 3d6b6e94b1..9f32a2f9fd 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Broadcom. All Rights Reserved. +// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -120,13 +120,14 @@ static ConsumerFlowStrategy creditOnChunkArrival() { * * @param initialCredits number of initial credits * @return flow strategy + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) { return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits); } /** - * Strategy that provides 1 initial credit and a credit when half of the chunk messages are + * Strategy that provides 10 initial credits and a credit when half of the chunk messages are * processed. * *

Make sure to call {@link MessageHandler.Context#processed()} on every message when using @@ -135,7 +136,7 @@ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) { * @return flow strategy */ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() { - return creditOnProcessedMessageCount(1, 0.5); + return creditOnProcessedMessageCount(10, 0.5); } /** @@ -147,6 +148,7 @@ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() { * * @param initialCredits number of initial credits * @return flow strategy + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) { return creditOnProcessedMessageCount(initialCredits, 0.5); diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index dca01baf9f..a445dc2fb4 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -106,22 +106,26 @@ public interface ProducerBuilder { /** * Adapt batch size depending on ingress rate. * - *

A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive - * for sustained high ingress rates. + *

A dynamic-batch approach improves latency for low ingress rates. * *

Set this flag to true if you want as little delay as possible between calling * {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker. + * Consumers should provide enough initial credits (between 5 and 10, depending on the workload), + * see {@link ConsumerBuilder#flow()} and {@link + * ConsumerBuilder.FlowConfiguration#initialCredits(int)}. * *

Set this flag to false if latency is not critical for your use case and you - * want the highest throughput possible for both publishing and consuming. + * want the highest throughput possible for both publishing and consuming. Consumers can provide 1 + * initial credit (depending on the workload), see {@link ConsumerBuilder#flow()} and {@link + * ConsumerBuilder.FlowConfiguration#initialCredits(int)}. * - *

Dynamic batch is not activated by default (dynamicBatch = false). - * - *

Dynamic batch is experimental. + *

Dynamic batch is activated by default (dynamicBatch = true). * * @param dynamicBatch * @return this builder instance * @since 0.20.0 + * @see ConsumerBuilder#flow() + * @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int) */ ProducerBuilder dynamicBatch(boolean dynamicBatch); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index c6364d1aa3..ce8660c6a5 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -428,7 +428,7 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) { this.consumerBuilder = consumerBuilder; } - private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(); + private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(10); @Override public FlowConfiguration initialCredits(int initialCredits) { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 54807489e2..244b2b1746 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -29,7 +29,7 @@ class StreamProducerBuilder implements ProducerBuilder { static final boolean DEFAULT_DYNAMIC_BATCH = - Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false")); + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); private final StreamEnvironment environment; diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 3777d7e7d9..7dd5e8da48 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -665,7 +665,7 @@ void ignoredMessageShouldTriggerMessageProcessing() { new ConsumerFlowStrategy() { @Override public int initialCredits() { - return 1; + return 10; } @Override diff --git a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java index 34166a8f2e..90ec80ae1c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Broadcom. All Rights Reserved. +// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -55,7 +55,7 @@ void smallChunksAndSmallRatiosShouldCredit() { } ConsumerFlowStrategy build(double ratio) { - return creditOnProcessedMessageCount(1, ratio); + return creditOnProcessedMessageCount(10, ratio); } ConsumerFlowStrategy.Context context(long messageCount) { diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index be2dc4a8c3..0f439df0b8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception { environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .flow() - .strategy(creditWhenHalfMessagesProcessed()) + .strategy(creditWhenHalfMessagesProcessed(1)) .builder(); List messageContexts = synchronizedList(new ArrayList<>()); @@ -243,7 +243,6 @@ void consumeWithAsyncConsumerFlowControl() throws Exception { void asynchronousProcessingWithFlowControl() { int messageCount = 100_000; publishAndWaitForConfirms(cf, messageCount, stream); - ExecutorService executorService = Executors.newFixedThreadPool(getRuntime().availableProcessors()); try { @@ -251,7 +250,7 @@ void asynchronousProcessingWithFlowControl() { environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .flow() - .strategy(creditWhenHalfMessagesProcessed()) + .strategy(creditWhenHalfMessagesProcessed(1)) .builder() .messageHandler( (ctx, message) ->