Skip to content

Commit fa5e0a3

Browse files
committed
Add dynamic batch option in producer builder
1 parent 517926e commit fa5e0a3

File tree

3 files changed

+33
-0
lines changed

3 files changed

+33
-0
lines changed

src/docs/asciidoc/api.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,10 @@ blocking when the limit is reached.
455455
|Period to send a batch of messages.
456456
|100 ms
457457

458+
|`dynamicBatch`
459+
|Adapt batch size depending on ingress rate.
460+
|false
461+
458462
|`confirmTimeout`
459463
|[[producer-confirm-timeout-configuration-entry]]Time before the client calls the confirm callback to signal
460464
outstanding unconfirmed messages timed out.

src/main/java/com/rabbitmq/stream/ProducerBuilder.java

+22
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,28 @@ public interface ProducerBuilder {
9797
*/
9898
ProducerBuilder batchPublishingDelay(Duration batchPublishingDelay);
9999

100+
/**
101+
* Adapt batch size depending on ingress rate.
102+
*
103+
* <p>A dynamic-batch approach improves latency for low ingress rates. It can be counterproductive
104+
* for sustained high ingress rates.
105+
*
106+
* <p>Set this flag to <code>true</code> if you want as little delay as possible before calling
107+
* {@link Producer#send(Message, ConfirmationHandler)} and the message being sent to the broker.
108+
*
109+
* <p>Set this flag to <code>false</code> if latency is not critical for your use case and you
110+
* want the highest throughput possible for both publishing and consuming.
111+
*
112+
* <p>Dynamic batch is not activated by default (<code>dynamicBatch = false</code>).
113+
*
114+
* <p>Dynamic batch is experimental.
115+
*
116+
* @param dynamicBatch
117+
* @return this builder instance
118+
* @since 0.20.0
119+
*/
120+
ProducerBuilder dynamicBatch(boolean dynamicBatch);
121+
100122
/**
101123
* The maximum number of unconfirmed outbound messages.
102124
*

src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java

+7
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,18 @@ public ProducerBuilder compression(Compression compression) {
102102
return this;
103103
}
104104

105+
@Override
105106
public StreamProducerBuilder batchPublishingDelay(Duration batchPublishingDelay) {
106107
this.batchPublishingDelay = batchPublishingDelay;
107108
return this;
108109
}
109110

111+
@Override
112+
public ProducerBuilder dynamicBatch(boolean dynamicBatch) {
113+
this.dynamicBatch = dynamicBatch;
114+
return this;
115+
}
116+
110117
@Override
111118
public ProducerBuilder maxUnconfirmedMessages(int maxUnconfirmedMessages) {
112119
if (maxUnconfirmedMessages <= 0) {

0 commit comments

Comments
 (0)