Skip to content

Add dynamic-batch publishing option #649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ jobs:
cache: 'maven'
- name: Start broker
run: ci/start-broker.sh
- name: Test
- name: Test (no dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=false \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
- name: Test (dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=true \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
Expand Down
10 changes: 9 additions & 1 deletion .github/workflows/test-rabbitmq-alphas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ jobs:
run: ci/start-broker.sh
env:
RABBITMQ_IMAGE: ${{ matrix.rabbitmq-image }}
- name: Test
- name: Test (no dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=false \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
- name: Test (dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=true \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
- name: Stop broker
run: docker stop rabbitmq && docker rm rabbitmq
11 changes: 10 additions & 1 deletion .github/workflows/test-supported-java-versions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,21 @@ jobs:
run: ci/start-broker.sh
- name: Display Java version
run: ./mvnw --version
- name: Test
- name: Test (no dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=false \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
-Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
- name: Test (dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=true \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
-Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
- name: Stop broker
run: docker stop rabbitmq && docker rm rabbitmq
10 changes: 9 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,20 @@ jobs:
gpg-passphrase: MAVEN_GPG_PASSPHRASE
- name: Start broker
run: ci/start-broker.sh
- name: Test
- name: Test (no dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=false \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
- name: Test (dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=true \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
- name: Stop broker
run: docker stop rabbitmq && docker rm rabbitmq
- name: Upload Codecov report
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@
<style>GOOGLE</style>
</googleJavaFormat>
</java>
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
<ratchetFrom>origin/main</ratchetFrom>
<licenseHeader> <!-- specify either content or file, but not both -->
<content>// Copyright (c) $YEAR Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
Expand Down
4 changes: 4 additions & 0 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ blocking when the limit is reached.
|Period to send a batch of messages.
|100 ms

|`dynamicBatch`
|Adapt batch size depending on ingress rate.
|false

|`confirmTimeout`
|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
outstanding unconfirmed messages timed out.
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -97,6 +97,28 @@ public interface ProducerBuilder {
*/
ProducerBuilder batchPublishingDelay(Duration batchPublishingDelay);

/**
* Adapt batch size depending on ingress rate.
*
* <p>A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
* for sustained high ingress rates.
*
* <p>Set this flag to <code>true</code> if you want as little delay as possible before calling
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
*
* <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
* want the highest throughput possible for both publishing and consuming.
*
* <p>Dynamic batch is not activated by default (<code>dynamicBatch = false</code>).
*
* <p>Dynamic batch is experimental.
*
* @param dynamicBatch
* @return this builder instance
* @since 0.20.0
*/
ProducerBuilder dynamicBatch(boolean dynamicBatch);

/**
* The maximum number of unconfirmed outbound messages.
*
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/ConcurrencyUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.impl;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ConcurrencyUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrencyUtils.class);

private static final ThreadFactory THREAD_FACTORY;

static {
if (isJava21OrMore()) {
LOGGER.debug("Running Java 21 or more, using virtual threads");
Class<?> builderClass =
Arrays.stream(Thread.class.getDeclaredClasses())
.filter(c -> "Builder".equals(c.getSimpleName()))
.findFirst()
.get();
// Reflection code is the same as:
// Thread.ofVirtual().factory();
try {
Object builder = Thread.class.getDeclaredMethod("ofVirtual").invoke(null);
THREAD_FACTORY = (ThreadFactory) builderClass.getDeclaredMethod("factory").invoke(builder);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
} else {
THREAD_FACTORY = Executors.defaultThreadFactory();
}
}

private ConcurrencyUtils() {}

static ThreadFactory defaultThreadFactory() {
return THREAD_FACTORY;
}

private static boolean isJava21OrMore() {
return Utils.versionCompare(System.getProperty("java.version"), "21.0") >= 0;
}
}
121 changes: 121 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.impl;

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class DynamicBatch<T> implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
private static final int MIN_BATCH_SIZE = 32;
private static final int MAX_BATCH_SIZE = 8192;

private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
private final BiPredicate<List<T>, Boolean> consumer;
private final int configuredBatchSize;
private final Thread thread;

DynamicBatch(BiPredicate<List<T>, Boolean> consumer, int batchSize) {
this.consumer = consumer;
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
this.thread.start();
}

void add(T item) {
try {
requests.put(item);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private void loop() {
State<T> state = new State<>();
state.batchSize = this.configuredBatchSize;
state.items = new ArrayList<>(state.batchSize);
state.retry = false;
Thread currentThread = Thread.currentThread();
T item;
while (!currentThread.isInterrupted()) {
try {
item = this.requests.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
currentThread.interrupt();
return;
}
if (item != null) {
state.items.add(item);
if (state.items.size() >= state.batchSize) {
this.maybeCompleteBatch(state, true);
} else {
item = this.requests.poll();
if (item == null) {
this.maybeCompleteBatch(state, false);
} else {
state.items.add(item);
if (state.items.size() >= state.batchSize) {
this.maybeCompleteBatch(state, true);
}
}
}
} else {
this.maybeCompleteBatch(state, false);
}
}
}

private static final class State<T> {

int batchSize;
List<T> items;
boolean retry;
}

private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
try {
boolean completed = this.consumer.test(state.items, state.retry);
if (completed) {
if (increaseIfCompleted) {
state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
} else {
state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
}
state.items = new ArrayList<>(state.batchSize);
state.retry = false;
} else {
state.retry = true;
}
} catch (Exception e) {
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
state.retry = true;
}
}

@Override
public void close() {
this.thread.interrupt();
}
}
Loading