Skip to content

Commit 258e8e0

Browse files
authored
Merge pull request #649 from rabbitmq/dynamic-batch-publishing
Add dynamic-batch publishing option
2 parents 3188d61 + 3052454 commit 258e8e0

21 files changed

+1126
-476
lines changed

.github/workflows/test-pr.yml

+9-1
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,17 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27-
- name: Test
27+
- name: Test (no dynamic-batch publishing)
2828
run: |
2929
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
30+
-Drabbitmq.stream.producer.dynamic.batch=false \
31+
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
32+
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
33+
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
34+
- name: Test (dynamic-batch publishing)
35+
run: |
36+
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
37+
-Drabbitmq.stream.producer.dynamic.batch=true \
3038
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
3139
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
3240
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem

.github/workflows/test-rabbitmq-alphas.yml

+9-1
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,19 @@ jobs:
3232
run: ci/start-broker.sh
3333
env:
3434
RABBITMQ_IMAGE: ${{ matrix.rabbitmq-image }}
35-
- name: Test
35+
- name: Test (no dynamic-batch publishing)
3636
run: |
3737
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
38+
-Drabbitmq.stream.producer.dynamic.batch=false \
3839
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
3940
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4041
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
42+
- name: Test (dynamic-batch publishing)
43+
run: |
44+
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
45+
-Drabbitmq.stream.producer.dynamic.batch=true \
46+
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
47+
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
48+
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
4149
- name: Stop broker
4250
run: docker stop rabbitmq && docker rm rabbitmq

.github/workflows/test-supported-java-versions.yml

+10-1
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,21 @@ jobs:
3333
run: ci/start-broker.sh
3434
- name: Display Java version
3535
run: ./mvnw --version
36-
- name: Test
36+
- name: Test (no dynamic-batch publishing)
3737
run: |
3838
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
39+
-Drabbitmq.stream.producer.dynamic.batch=false \
3940
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4041
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4142
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
4243
-Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
44+
- name: Test (dynamic-batch publishing)
45+
run: |
46+
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
47+
-Drabbitmq.stream.producer.dynamic.batch=true \
48+
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
49+
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
50+
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
51+
-Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
4352
- name: Stop broker
4453
run: docker stop rabbitmq && docker rm rabbitmq

.github/workflows/test.yml

+9-1
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,20 @@ jobs:
3333
gpg-passphrase: MAVEN_GPG_PASSPHRASE
3434
- name: Start broker
3535
run: ci/start-broker.sh
36-
- name: Test
36+
- name: Test (no dynamic-batch publishing)
3737
run: |
3838
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
39+
-Drabbitmq.stream.producer.dynamic.batch=false \
3940
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4041
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4142
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
43+
- name: Test (dynamic-batch publishing)
44+
run: |
45+
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
46+
-Drabbitmq.stream.producer.dynamic.batch=true \
47+
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
48+
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
49+
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
4250
- name: Stop broker
4351
run: docker stop rabbitmq && docker rm rabbitmq
4452
- name: Upload Codecov report

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@
533533
<style>GOOGLE</style>
534534
</googleJavaFormat>
535535
</java>
536-
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
536+
<ratchetFrom>origin/main</ratchetFrom>
537537
<licenseHeader> <!-- specify either content or file, but not both -->
538538
<content>// Copyright (c) $YEAR Broadcom. All Rights Reserved.
539539
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

src/docs/asciidoc/api.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,10 @@ blocking when the limit is reached.
455455
|Period to send a batch of messages.
456456
|100 ms
457457

458+
|`dynamicBatch`
459+
|Adapt batch size depending on ingress rate.
460+
|false
461+
458462
|`confirmTimeout`
459463
|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
460464
outstanding unconfirmed messages timed out.

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -97,6 +97,28 @@ public interface ProducerBuilder {
9797
*/
9898
ProducerBuilder batchPublishingDelay(Duration batchPublishingDelay);
9999

100+
/**
101+
* Adapt batch size depending on ingress rate.
102+
*
103+
* <p>A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
104+
* for sustained high ingress rates.
105+
*
106+
* <p>Set this flag to <code>true</code> if you want as little delay as possible before calling
107+
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
108+
*
109+
* <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
110+
* want the highest throughput possible for both publishing and consuming.
111+
*
112+
* <p>Dynamic batch is not activated by default (<code>dynamicBatch = false</code>).
113+
*
114+
* <p>Dynamic batch is experimental.
115+
*
116+
* @param dynamicBatch
117+
* @return this builder instance
118+
* @since 0.20.0
119+
*/
120+
ProducerBuilder dynamicBatch(boolean dynamicBatch);
121+
100122
/**
101123
* The maximum number of unconfirmed outbound messages.
102124
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import java.lang.reflect.InvocationTargetException;
18+
import java.util.Arrays;
19+
import java.util.concurrent.Executors;
20+
import java.util.concurrent.ThreadFactory;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
final class ConcurrencyUtils {
25+
26+
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrencyUtils.class);
27+
28+
private static final ThreadFactory THREAD_FACTORY;
29+
30+
static {
31+
if (isJava21OrMore()) {
32+
LOGGER.debug("Running Java 21 or more, using virtual threads");
33+
Class<?> builderClass =
34+
Arrays.stream(Thread.class.getDeclaredClasses())
35+
.filter(c -> "Builder".equals(c.getSimpleName()))
36+
.findFirst()
37+
.get();
38+
// Reflection code is the same as:
39+
// Thread.ofVirtual().factory();
40+
try {
41+
Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null);
42+
THREAD_FACTORY = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
43+
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
44+
throw new RuntimeException(e);
45+
}
46+
} else {
47+
THREAD_FACTORY = Executors.defaultThreadFactory();
48+
}
49+
}
50+
51+
private ConcurrencyUtils() {}
52+
53+
static ThreadFactory defaultThreadFactory() {
54+
return THREAD_FACTORY;
55+
}
56+
57+
private static boolean isJava21OrMore() {
58+
return Utils.versionCompare(System.getProperty("java.version"), "21.0") >= 0;
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import static java.lang.Math.max;
18+
import static java.lang.Math.min;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.LinkedBlockingQueue;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.function.BiPredicate;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
final class DynamicBatch<T> implements AutoCloseable {
30+
31+
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
32+
private static final int MIN_BATCH_SIZE = 32;
33+
private static final int MAX_BATCH_SIZE = 8192;
34+
35+
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
36+
private final BiPredicate<List<T>, Boolean> consumer;
37+
private final int configuredBatchSize;
38+
private final Thread thread;
39+
40+
DynamicBatch(BiPredicate<List<T>, Boolean> consumer, int batchSize) {
41+
this.consumer = consumer;
42+
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
43+
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
44+
this.thread.start();
45+
}
46+
47+
void add(T item) {
48+
try {
49+
requests.put(item);
50+
} catch (InterruptedException e) {
51+
throw new RuntimeException(e);
52+
}
53+
}
54+
55+
private void loop() {
56+
State<T> state = new State<>();
57+
state.batchSize = this.configuredBatchSize;
58+
state.items = new ArrayList<>(state.batchSize);
59+
state.retry = false;
60+
Thread currentThread = Thread.currentThread();
61+
T item;
62+
while (!currentThread.isInterrupted()) {
63+
try {
64+
item = this.requests.poll(100, TimeUnit.MILLISECONDS);
65+
} catch (InterruptedException e) {
66+
currentThread.interrupt();
67+
return;
68+
}
69+
if (item != null) {
70+
state.items.add(item);
71+
if (state.items.size() >= state.batchSize) {
72+
this.maybeCompleteBatch(state, true);
73+
} else {
74+
item = this.requests.poll();
75+
if (item == null) {
76+
this.maybeCompleteBatch(state, false);
77+
} else {
78+
state.items.add(item);
79+
if (state.items.size() >= state.batchSize) {
80+
this.maybeCompleteBatch(state, true);
81+
}
82+
}
83+
}
84+
} else {
85+
this.maybeCompleteBatch(state, false);
86+
}
87+
}
88+
}
89+
90+
private static final class State<T> {
91+
92+
int batchSize;
93+
List<T> items;
94+
boolean retry;
95+
}
96+
97+
private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
98+
try {
99+
boolean completed = this.consumer.test(state.items, state.retry);
100+
if (completed) {
101+
if (increaseIfCompleted) {
102+
state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
103+
} else {
104+
state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
105+
}
106+
state.items = new ArrayList<>(state.batchSize);
107+
state.retry = false;
108+
} else {
109+
state.retry = true;
110+
}
111+
} catch (Exception e) {
112+
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
113+
state.retry = true;
114+
}
115+
}
116+
117+
@Override
118+
public void close() {
119+
this.thread.interrupt();
120+
}
121+
}

0 commit comments

Comments
 (0)