Skip to content

Commit 16396df

Browse files
committed
Experiment with dynamic-batch approach for publishing
1 parent 3188d61 commit 16396df

File tree

7 files changed

+370
-73
lines changed

7 files changed

+370
-73
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import static java.lang.Math.max;
18+
import static java.lang.Math.min;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.BlockingQueue;
23+
import java.util.concurrent.LinkedBlockingQueue;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.function.Consumer;
27+
28+
class DynamicBatch<T> {
29+
30+
private static final int MIN_BATCH_SIZE = 32;
31+
private static final int MAX_BATCH_SIZE = 8192;
32+
33+
final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
34+
final Consumer<List<T>> consumer;
35+
final int configuredBatchSize;
36+
private final AtomicLong count = new AtomicLong(0);
37+
38+
DynamicBatch(Consumer<List<T>> consumer, int batchSize) {
39+
this.consumer = consumer;
40+
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
41+
new Thread(this::loop).start();
42+
}
43+
44+
void add(T item) {
45+
try {
46+
requests.put(item);
47+
this.count.incrementAndGet();
48+
} catch (InterruptedException e) {
49+
throw new RuntimeException(e);
50+
}
51+
}
52+
53+
private void loop() {
54+
int batchSize = this.configuredBatchSize;
55+
List<T> batch = new ArrayList<>(batchSize);
56+
Thread currentThread = Thread.currentThread();
57+
T item;
58+
while (!currentThread.isInterrupted()) {
59+
try {
60+
item = this.requests.poll(100, TimeUnit.MILLISECONDS);
61+
} catch (InterruptedException e) {
62+
currentThread.interrupt();
63+
return;
64+
}
65+
if (item != null) {
66+
batch.add(item);
67+
if (batch.size() >= batchSize) {
68+
this.completeBatch(batch);
69+
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
70+
batch = new ArrayList<>(batchSize);
71+
} else {
72+
item = this.requests.poll();
73+
if (item == null) {
74+
this.completeBatch(batch);
75+
batchSize = max(batchSize / 2, MIN_BATCH_SIZE);
76+
batch = new ArrayList<>(batchSize);
77+
} else {
78+
batch.add(item);
79+
if (batch.size() >= batchSize) {
80+
this.completeBatch(batch);
81+
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
82+
batch = new ArrayList<>(batchSize);
83+
}
84+
}
85+
}
86+
} else {
87+
this.completeBatch(batch);
88+
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
89+
batch = new ArrayList<>(batchSize);
90+
}
91+
}
92+
}
93+
94+
private void completeBatch(List<T> items) {
95+
this.consumer.accept(items);
96+
}
97+
}

src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public int size() {
103103
return messages.size();
104104
}
105105

106-
private static final class SimpleAccumulatedEntity implements AccumulatedEntity {
106+
static final class SimpleAccumulatedEntity implements AccumulatedEntity {
107107

108108
private final long time;
109109
private final long publishingId;
@@ -112,7 +112,7 @@ private static final class SimpleAccumulatedEntity implements AccumulatedEntity
112112
private final StreamProducer.ConfirmationCallback confirmationCallback;
113113
private final Object observationContext;
114114

115-
private SimpleAccumulatedEntity(
115+
SimpleAccumulatedEntity(
116116
long time,
117117
long publishingId,
118118
String filterValue,
@@ -158,13 +158,12 @@ public Object observationContext() {
158158
}
159159
}
160160

161-
private static final class SimpleConfirmationCallback
162-
implements StreamProducer.ConfirmationCallback {
161+
static final class SimpleConfirmationCallback implements StreamProducer.ConfirmationCallback {
163162

164163
private final Message message;
165164
private final ConfirmationHandler confirmationHandler;
166165

167-
private SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
166+
SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
168167
this.message = message;
169168
this.confirmationHandler = confirmationHandler;
170169
}

0 commit comments

Comments
 (0)