Skip to content

Commit 741de6a

Browse files
authored
Merge pull request #671 from rabbitmq/dynamic-batch-is-the-new-default
Use dynamic batch publishing by default
2 parents ff4e2cb + dfd94df commit 741de6a

9 files changed

+32
-23
lines changed

Diff for: src/docs/asciidoc/api.adoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ blocking when the limit is reached.
470470

471471
|`dynamicBatch`
472472
|Adapt batch size depending on ingress rate.
473-
|false
473+
|true
474474

475475
|`confirmTimeout`
476476
|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
@@ -897,11 +897,11 @@ Useful when using an external store for offset tracking.
897897
|`flow#initialCredits`
898898
|Number of credits when the subscription is created.
899899
Increase for higher throughput at the expense of memory usage.
900-
|1
900+
|10
901901

902902
|`flow#strategy`
903903
|The `ConsumerFlowStrategy` to use.
904-
|`ConsumerFlowStrategy#creditOnChunkArrival(1)`
904+
|`ConsumerFlowStrategy#creditOnChunkArrival(10)`
905905
|===
906906

907907
[NOTE]

Diff for: src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -245,10 +245,14 @@ interface FlowConfiguration {
245245
/**
246246
* The number of initial credits for the subscription.
247247
*
248-
* <p>Default is 1.
248+
* <p>Default is 10.
249249
*
250250
* <p>This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}.
251251
*
252+
* <p>Use a small value like 1 for streams with large chunks (several hundreds of messages per
253+
* chunk) and higher values (5 or more) for streams with small chunks (1 or a few messages per
254+
* chunk).
255+
*
252256
* @param initialCredits the number of initial credits
253257
* @return this configuration instance
254258
* @see ConsumerFlowStrategy#creditOnChunkArrival(int)

Diff for: src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -120,13 +120,14 @@ static ConsumerFlowStrategy creditOnChunkArrival() {
120120
*
121121
* @param initialCredits number of initial credits
122122
* @return flow strategy
123+
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
123124
*/
124125
static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
125126
return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits);
126127
}
127128

128129
/**
129-
* Strategy that provides 1 initial credit and a credit when half of the chunk messages are
130+
* Strategy that provides 10 initial credits and a credit when half of the chunk messages are
130131
* processed.
131132
*
132133
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
@@ -135,7 +136,7 @@ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
135136
* @return flow strategy
136137
*/
137138
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
138-
return creditOnProcessedMessageCount(1, 0.5);
139+
return creditOnProcessedMessageCount(10, 0.5);
139140
}
140141

141142
/**
@@ -147,6 +148,7 @@ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
147148
*
148149
* @param initialCredits number of initial credits
149150
* @return flow strategy
151+
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
150152
*/
151153
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) {
152154
return creditOnProcessedMessageCount(initialCredits, 0.5);

Diff for: src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,26 @@ public interface ProducerBuilder {
106106
/**
107107
* Adapt batch size depending on ingress rate.
108108
*
109-
* <p>A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
110-
* for sustained high ingress rates.
109+
* <p>A dynamic-batch approach improves latency for low ingress rates.
111110
*
112111
* <p>Set this flag to <code>true</code> if you want as little delay as possible between calling
113112
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
113+
* Consumers should provide enough initial credits (between 5 and 10, depending on the workload),
114+
* see {@link ConsumerBuilder#flow()} and {@link
115+
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
114116
*
115117
* <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
116-
* want the highest throughput possible for both publishing and consuming.
118+
* want the highest throughput possible for both publishing and consuming. Consumers can provide 1
119+
* initial credit (depending on the workload), see {@link ConsumerBuilder#flow()} and {@link
120+
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
117121
*
118-
* <p>Dynamic batch is not activated by default (<code>dynamicBatch = false</code>).
119-
*
120-
* <p>Dynamic batch is experimental.
122+
* <p>Dynamic batch is activated by default (<code>dynamicBatch = true</code>).
121123
*
122124
* @param dynamicBatch
123125
* @return this builder instance
124126
* @since 0.20.0
127+
* @see ConsumerBuilder#flow()
128+
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
125129
*/
126130
ProducerBuilder dynamicBatch(boolean dynamicBatch);
127131

Diff for: src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -428,7 +428,7 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
428428
this.consumerBuilder = consumerBuilder;
429429
}
430430

431-
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival();
431+
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(10);
432432

433433
@Override
434434
public FlowConfiguration initialCredits(int initialCredits) {

Diff for: src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
class StreamProducerBuilder implements ProducerBuilder {
3030

3131
static final boolean DEFAULT_DYNAMIC_BATCH =
32-
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false"));
32+
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));
3333

3434
private final StreamEnvironment environment;
3535

Diff for: src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ void ignoredMessageShouldTriggerMessageProcessing() {
665665
new ConsumerFlowStrategy() {
666666
@Override
667667
public int initialCredits() {
668-
return 1;
668+
return 10;
669669
}
670670

671671
@Override

Diff for: src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -55,7 +55,7 @@ void smallChunksAndSmallRatiosShouldCredit() {
5555
}
5656

5757
ConsumerFlowStrategy build(double ratio) {
58-
return creditOnProcessedMessageCount(1, ratio);
58+
return creditOnProcessedMessageCount(10, ratio);
5959
}
6060

6161
ConsumerFlowStrategy.Context context(long messageCount) {

Diff for: src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
203203
environment.consumerBuilder().stream(stream)
204204
.offset(OffsetSpecification.first())
205205
.flow()
206-
.strategy(creditWhenHalfMessagesProcessed())
206+
.strategy(creditWhenHalfMessagesProcessed(1))
207207
.builder();
208208

209209
List<MessageHandler.Context> messageContexts = synchronizedList(new ArrayList<>());
@@ -243,15 +243,14 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
243243
void asynchronousProcessingWithFlowControl() {
244244
int messageCount = 100_000;
245245
publishAndWaitForConfirms(cf, messageCount, stream);
246-
247246
ExecutorService executorService =
248247
Executors.newFixedThreadPool(getRuntime().availableProcessors());
249248
try {
250249
CountDownLatch latch = new CountDownLatch(messageCount);
251250
environment.consumerBuilder().stream(stream)
252251
.offset(OffsetSpecification.first())
253252
.flow()
254-
.strategy(creditWhenHalfMessagesProcessed())
253+
.strategy(creditWhenHalfMessagesProcessed(1))
255254
.builder()
256255
.messageHandler(
257256
(ctx, message) ->

0 commit comments

Comments
 (0)