Skip to content

Commit f582f7d

Browse files
committed
Introduce replay argument in dynamic batch support class
1 parent b56df00 commit f582f7d

File tree

3 files changed

+86
-80
lines changed

3 files changed

+86
-80
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

+36-28
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.concurrent.BlockingQueue;
2323
import java.util.concurrent.LinkedBlockingQueue;
2424
import java.util.concurrent.TimeUnit;
25-
import java.util.function.Predicate;
25+
import java.util.function.BiPredicate;
2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;
2828

@@ -33,11 +33,11 @@ final class DynamicBatch<T> implements AutoCloseable {
3333
private static final int MAX_BATCH_SIZE = 8192;
3434

3535
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
36-
private final Predicate<List<T>> consumer;
36+
private final BiPredicate<List<T>, Boolean> consumer;
3737
private final int configuredBatchSize;
3838
private final Thread thread;
3939

40-
DynamicBatch(Predicate<List<T>> consumer, int batchSize) {
40+
DynamicBatch(BiPredicate<List<T>, Boolean> consumer, int batchSize) {
4141
this.consumer = consumer;
4242
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
4343
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
@@ -53,8 +53,10 @@ void add(T item) {
5353
}
5454

5555
private void loop() {
56-
int batchSize = this.configuredBatchSize;
57-
List<T> batch = new ArrayList<>(batchSize);
56+
State<T> state = new State<>();
57+
state.batchSize = this.configuredBatchSize;
58+
state.items = new ArrayList<>(state.batchSize);
59+
state.retry = false;
5860
Thread currentThread = Thread.currentThread();
5961
T item;
6062
while (!currentThread.isInterrupted()) {
@@ -65,44 +67,50 @@ private void loop() {
6567
return;
6668
}
6769
if (item != null) {
68-
batch.add(item);
69-
if (batch.size() >= batchSize) {
70-
if (this.completeBatch(batch)) {
71-
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
72-
batch = new ArrayList<>(batchSize);
73-
}
70+
state.items.add(item);
71+
if (state.items.size() >= state.batchSize) {
72+
this.maybeCompleteBatch(state, true);
7473
} else {
7574
item = this.requests.poll();
7675
if (item == null) {
77-
if (this.completeBatch(batch)) {
78-
batchSize = max(batchSize / 2, MIN_BATCH_SIZE);
79-
batch = new ArrayList<>(batchSize);
80-
}
76+
this.maybeCompleteBatch(state, false);
8177
} else {
82-
batch.add(item);
83-
if (batch.size() >= batchSize) {
84-
if (this.completeBatch(batch)) {
85-
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
86-
batch = new ArrayList<>(batchSize);
87-
}
78+
state.items.add(item);
79+
if (state.items.size() >= state.batchSize) {
80+
this.maybeCompleteBatch(state, true);
8881
}
8982
}
9083
}
9184
} else {
92-
if (this.completeBatch(batch)) {
93-
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
94-
batch = new ArrayList<>(batchSize);
95-
}
85+
this.maybeCompleteBatch(state, false);
9686
}
9787
}
9888
}
9989

100-
private boolean completeBatch(List<T> items) {
90+
private static final class State<T> {
91+
92+
int batchSize;
93+
List<T> items;
94+
boolean retry;
95+
}
96+
97+
private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
10198
try {
102-
return this.consumer.test(items);
99+
boolean completed = this.consumer.test(state.items, state.retry);
100+
if (completed) {
101+
if (increaseIfCompleted) {
102+
state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
103+
} else {
104+
state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
105+
}
106+
state.items = new ArrayList<>(state.batchSize);
107+
state.retry = false;
108+
} else {
109+
state.retry = true;
110+
}
103111
} catch (Exception e) {
104112
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
105-
return false;
113+
state.retry = true;
106114
}
107115
}
108116

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

+32-38
Original file line numberDiff line numberDiff line change
@@ -62,67 +62,61 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
6262
if (subEntrySize <= 1) {
6363
this.dynamicBatch =
6464
new DynamicBatch<>(
65-
items -> {
66-
// TODO add a "replay" flag to DynamicBatch to avoid checking the producer status
67-
// the status check helps to avoid collecting the observation another time
68-
if (producer.canSend()) {
65+
(items, replay) -> {
66+
if (!replay) {
6967
items.forEach(
7068
i -> {
7169
AccumulatedEntity entity = (AccumulatedEntity) i;
7270
this.observationCollector.published(
7371
entity.observationContext(), entity.confirmationCallback().message());
7472
});
75-
return this.publish(items);
76-
} else {
77-
return false;
7873
}
74+
return this.publish(items);
7975
},
8076
batchSize);
8177
} else {
8278
byte compressionCode =
8379
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
8480
this.dynamicBatch =
8581
new DynamicBatch<>(
86-
items -> {
87-
if (this.producer.canSend()) {
88-
List<Object> subBatches = new ArrayList<>();
89-
int count = 0;
90-
ProducerUtils.Batch batch =
91-
this.helper.batch(
92-
byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
93-
AccumulatedEntity lastMessageInBatch = null;
94-
for (Object msg : items) {
95-
AccumulatedEntity message = (AccumulatedEntity) msg;
82+
(items, replay) -> {
83+
List<Object> subBatches = new ArrayList<>();
84+
int count = 0;
85+
ProducerUtils.Batch batch =
86+
this.helper.batch(
87+
byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
88+
AccumulatedEntity lastMessageInBatch = null;
89+
for (Object msg : items) {
90+
AccumulatedEntity message = (AccumulatedEntity) msg;
91+
if (!replay) {
9692
this.observationCollector.published(
9793
message.observationContext(), message.confirmationCallback().message());
98-
lastMessageInBatch = message;
99-
batch.add(
100-
(Codec.EncodedMessage) message.encodedEntity(),
101-
message.confirmationCallback());
102-
count++;
103-
if (count == subEntrySize) {
104-
batch.time = lastMessageInBatch.time();
105-
batch.publishingId = lastMessageInBatch.publishingId();
106-
batch.encodedMessageBatch.close();
107-
subBatches.add(batch);
108-
lastMessageInBatch = null;
109-
batch =
110-
this.helper.batch(
111-
byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
112-
count = 0;
113-
}
11494
}
115-
116-
if (!batch.isEmpty() && count < subEntrySize) {
95+
lastMessageInBatch = message;
96+
batch.add(
97+
(Codec.EncodedMessage) message.encodedEntity(),
98+
message.confirmationCallback());
99+
count++;
100+
if (count == subEntrySize) {
117101
batch.time = lastMessageInBatch.time();
118102
batch.publishingId = lastMessageInBatch.publishingId();
119103
batch.encodedMessageBatch.close();
120104
subBatches.add(batch);
105+
lastMessageInBatch = null;
106+
batch =
107+
this.helper.batch(
108+
byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
109+
count = 0;
121110
}
122-
return this.publish(subBatches);
123-
} else {
124-
return false;
125111
}
112+
113+
if (!batch.isEmpty() && count < subEntrySize) {
114+
batch.time = lastMessageInBatch.time();
115+
batch.publishingId = lastMessageInBatch.publishingId();
116+
batch.encodedMessageBatch.close();
117+
subBatches.add(batch);
118+
}
119+
return this.publish(subBatches);
126120
},
127121
batchSize * subEntrySize);
128122
}

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

+18-14
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,32 @@ void test() {
4040
Random random = new Random();
4141
DynamicBatch<String> batch =
4242
new DynamicBatch<>(
43-
items -> {
43+
(items, replay) -> {
4444
batchSizeMetrics.update(items.size());
45-
sync.down(items.size());
4645
try {
4746
Thread.sleep(random.nextInt(10) + 1);
4847
} catch (InterruptedException e) {
4948
throw new RuntimeException(e);
5049
}
50+
sync.down(items.size());
5151
return true;
5252
},
5353
100);
54-
RateLimiter rateLimiter = RateLimiter.create(3000);
55-
long start = System.nanoTime();
56-
IntStream.range(0, itemCount)
57-
.forEach(
58-
i -> {
59-
rateLimiter.acquire();
60-
batch.add(String.valueOf(i));
61-
});
62-
Assertions.assertThat(sync).completes();
63-
long end = System.nanoTime();
64-
// System.out.println("Done in " + Duration.ofNanos(end - start));
65-
// reporter.report();
54+
try {
55+
RateLimiter rateLimiter = RateLimiter.create(3000);
56+
long start = System.nanoTime();
57+
IntStream.range(0, itemCount)
58+
.forEach(
59+
i -> {
60+
rateLimiter.acquire();
61+
batch.add(String.valueOf(i));
62+
});
63+
Assertions.assertThat(sync).completes();
64+
long end = System.nanoTime();
65+
// System.out.println("Done in " + Duration.ofNanos(end - start));
66+
// reporter.report();
67+
} finally {
68+
batch.close();
69+
}
6670
}
6771
}

0 commit comments

Comments
 (0)