Skip to content

Commit d23d661

Browse files
committed
Use dynamic batch publishing by default
And set initial credits to 10 by default, as dynamic batching creates smaller chunks for low ingress.
1 parent 02cd6e5 commit d23d661

File tree

8 files changed

+30
-20
lines changed

8 files changed

+30
-20
lines changed

Diff for: src/docs/asciidoc/api.adoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ blocking when the limit is reached.
470470

471471
|`dynamicBatch`
472472
|Adapt batch size depending on ingress rate.
473-
|false
473+
|true
474474

475475
|`confirmTimeout`
476476
|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
@@ -897,11 +897,11 @@ Useful when using an external store for offset tracking.
897897
|`flow#initialCredits`
898898
|Number of credits when the subscription is created.
899899
Increase for higher throughput at the expense of memory usage.
900-
|1
900+
|10
901901

902902
|`flow#strategy`
903903
|The `ConsumerFlowStrategy` to use.
904-
|`ConsumerFlowStrategy#creditOnChunkArrival(1)`
904+
|`ConsumerFlowStrategy#creditOnChunkArrival(10)`
905905
|===
906906

907907
[NOTE]

Diff for: src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -245,10 +245,14 @@ interface FlowConfiguration {
245245
/**
246246
* The number of initial credits for the subscription.
247247
*
248-
* <p>Default is 1.
248+
* <p>Default is 10.
249249
*
250250
* <p>This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}.
251251
*
252+
* <p>Use a small value like 1 for streams with large chunks (several hundreds of messages per
253+
* chunk) and higher values (5 or more) for streams with small chunks (1 or a few messages per
254+
* chunk).
255+
*
252256
* @param initialCredits the number of initial credits
253257
* @return this configuration instance
254258
* @see ConsumerFlowStrategy#creditOnChunkArrival(int)

Diff for: src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -120,13 +120,14 @@ static ConsumerFlowStrategy creditOnChunkArrival() {
120120
*
121121
* @param initialCredits number of initial credits
122122
* @return flow strategy
123+
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
123124
*/
124125
static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
125126
return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits);
126127
}
127128

128129
/**
129-
* Strategy that provides 1 initial credit and a credit when half of the chunk messages are
130+
* Strategy that provides 10 initial credits and a credit when half of the chunk messages are
130131
* processed.
131132
*
132133
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
@@ -135,7 +136,7 @@ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
135136
* @return flow strategy
136137
*/
137138
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
138-
return creditOnProcessedMessageCount(1, 0.5);
139+
return creditOnProcessedMessageCount(10, 0.5);
139140
}
140141

141142
/**
@@ -147,6 +148,7 @@ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
147148
*
148149
* @param initialCredits number of initial credits
149150
* @return flow strategy
151+
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
150152
*/
151153
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) {
152154
return creditOnProcessedMessageCount(initialCredits, 0.5);

Diff for: src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,26 @@ public interface ProducerBuilder {
106106
/**
107107
* Adapt batch size depending on ingress rate.
108108
*
109-
* <p>A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
110-
* for sustained high ingress rates.
109+
* <p>A dynamic-batch approach improves latency for low ingress rates.
111110
*
112111
* <p>Set this flag to <code>true</code> if you want as little delay as possible between calling
113112
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
113+
* Consumers should provide enough initial credits (between 5 and 10, depending on the workload),
114+
* see {@link ConsumerBuilder#flow()} and {@link
115+
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
114116
*
115117
* <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
116-
* want the highest throughput possible for both publishing and consuming.
118+
* want the highest throughput possible for both publishing and consuming. Consumers can provide 1
119+
* initial credit (depending on the workload), see {@link ConsumerBuilder#flow()} and {@link
120+
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
117121
*
118-
* <p>Dynamic batch is not activated by default (<code>dynamicBatch = false</code>).
119-
*
120-
* <p>Dynamic batch is experimental.
122+
* <p>Dynamic batch is activated by default (<code>dynamicBatch = true</code>).
121123
*
122124
* @param dynamicBatch
123125
* @return this builder instance
124126
* @since 0.20.0
127+
* @see ConsumerBuilder#flow()
128+
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
125129
*/
126130
ProducerBuilder dynamicBatch(boolean dynamicBatch);
127131

Diff for: src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -428,7 +428,7 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
428428
this.consumerBuilder = consumerBuilder;
429429
}
430430

431-
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival();
431+
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(10);
432432

433433
@Override
434434
public FlowConfiguration initialCredits(int initialCredits) {

Diff for: src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
class StreamProducerBuilder implements ProducerBuilder {
3030

3131
static final boolean DEFAULT_DYNAMIC_BATCH =
32-
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false"));
32+
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));
3333

3434
private final StreamEnvironment environment;
3535

Diff for: src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ void ignoredMessageShouldTriggerMessageProcessing() {
665665
new ConsumerFlowStrategy() {
666666
@Override
667667
public int initialCredits() {
668-
return 1;
668+
return 10;
669669
}
670670

671671
@Override

Diff for: src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -55,7 +55,7 @@ void smallChunksAndSmallRatiosShouldCredit() {
5555
}
5656

5757
ConsumerFlowStrategy build(double ratio) {
58-
return creditOnProcessedMessageCount(1, ratio);
58+
return creditOnProcessedMessageCount(10, ratio);
5959
}
6060

6161
ConsumerFlowStrategy.Context context(long messageCount) {

0 commit comments

Comments
 (0)