Skip to content

Use dynamic batch publishing by default #671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ blocking when the limit is reached.

|`dynamicBatch`
|Adapt batch size depending on ingress rate.
|false
|true

|`confirmTimeout`
|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
Expand Down Expand Up @@ -897,11 +897,11 @@ Useful when using an external store for offset tracking.
|`flow#initialCredits`
|Number of credits when the subscription is created.
Increase for higher throughput at the expense of memory usage.
|1
|10

|`flow#strategy`
|The `ConsumerFlowStrategy` to use.
|`ConsumerFlowStrategy#creditOnChunkArrival(1)`
|`ConsumerFlowStrategy#creditOnChunkArrival(10)`
|===

[NOTE]
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -245,10 +245,14 @@ interface FlowConfiguration {
/**
* The number of initial credits for the subscription.
*
* <p>Default is 1.
* <p>Default is 10.
*
* <p>This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}.
*
* <p>Use a small value like 1 for streams with large chunks (several hundreds of messages per
* chunk) and higher values (5 or more) for streams with small chunks (1 or a few messages per
* chunk).
*
* @param initialCredits the number of initial credits
* @return this configuration instance
* @see ConsumerFlowStrategy#creditOnChunkArrival(int)
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Broadcom. All Rights Reserved.
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -120,13 +120,14 @@ static ConsumerFlowStrategy creditOnChunkArrival() {
*
* @param initialCredits number of initial credits
* @return flow strategy
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
*/
static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits);
}

/**
* Strategy that provides 1 initial credit and a credit when half of the chunk messages are
* Strategy that provides 10 initial credits and a credit when half of the chunk messages are
* processed.
*
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
Expand All @@ -135,7 +136,7 @@ static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
* @return flow strategy
*/
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
return creditOnProcessedMessageCount(1, 0.5);
return creditOnProcessedMessageCount(10, 0.5);
}

/**
Expand All @@ -147,6 +148,7 @@ static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
*
* @param initialCredits number of initial credits
* @return flow strategy
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
*/
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) {
return creditOnProcessedMessageCount(initialCredits, 0.5);
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,26 @@ public interface ProducerBuilder {
/**
* Adapt batch size depending on ingress rate.
*
* <p>A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
* for sustained high ingress rates.
* <p>A dynamic-batch approach improves latency for low ingress rates.
*
* <p>Set this flag to <code>true</code> if you want as little delay as possible between calling
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
* Consumers should provide enough initial credits (between 5 and 10, depending on the workload),
* see {@link ConsumerBuilder#flow()} and {@link
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
*
* <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
* want the highest throughput possible for both publishing and consuming.
* want the highest throughput possible for both publishing and consuming. Consumers can provide 1
* initial credit (depending on the workload), see {@link ConsumerBuilder#flow()} and {@link
* ConsumerBuilder.FlowConfiguration#initialCredits(int)}.
*
* <p>Dynamic batch is not activated by default (<code>dynamicBatch = false</code>).
*
* <p>Dynamic batch is experimental.
* <p>Dynamic batch is activated by default (<code>dynamicBatch = true</code>).
*
* @param dynamicBatch
* @return this builder instance
* @since 0.20.0
* @see ConsumerBuilder#flow()
* @see com.rabbitmq.stream.ConsumerBuilder.FlowConfiguration#initialCredits(int)
*/
ProducerBuilder dynamicBatch(boolean dynamicBatch);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -428,7 +428,7 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
this.consumerBuilder = consumerBuilder;
}

private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival();
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(10);

@Override
public FlowConfiguration initialCredits(int initialCredits) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class StreamProducerBuilder implements ProducerBuilder {

static final boolean DEFAULT_DYNAMIC_BATCH =
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "false"));
Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));

private final StreamEnvironment environment;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ void ignoredMessageShouldTriggerMessageProcessing() {
new ConsumerFlowStrategy() {
@Override
public int initialCredits() {
return 1;
return 10;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2023 Broadcom. All Rights Reserved.
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -55,7 +55,7 @@ void smallChunksAndSmallRatiosShouldCredit() {
}

ConsumerFlowStrategy build(double ratio) {
return creditOnProcessedMessageCount(1, ratio);
return creditOnProcessedMessageCount(10, ratio);
}

ConsumerFlowStrategy.Context context(long messageCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
environment.consumerBuilder().stream(stream)
.offset(OffsetSpecification.first())
.flow()
.strategy(creditWhenHalfMessagesProcessed())
.strategy(creditWhenHalfMessagesProcessed(1))
.builder();

List<MessageHandler.Context> messageContexts = synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -243,15 +243,14 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
void asynchronousProcessingWithFlowControl() {
int messageCount = 100_000;
publishAndWaitForConfirms(cf, messageCount, stream);

ExecutorService executorService =
Executors.newFixedThreadPool(getRuntime().availableProcessors());
try {
CountDownLatch latch = new CountDownLatch(messageCount);
environment.consumerBuilder().stream(stream)
.offset(OffsetSpecification.first())
.flow()
.strategy(creditWhenHalfMessagesProcessed())
.strategy(creditWhenHalfMessagesProcessed(1))
.builder()
.messageHandler(
(ctx, message) ->
Expand Down
Loading