Skip to content

Commit 76048e0

Browse files
committed
Add test for dynamic batch support class
1 parent 78d1a78 commit 76048e0

File tree

5 files changed

+111
-59
lines changed

5 files changed

+111
-59
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1+
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
22
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
33
//
44
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -25,7 +25,6 @@
2525
import static java.lang.String.join;
2626
import static java.util.Arrays.asList;
2727
import static java.util.concurrent.TimeUnit.SECONDS;
28-
import static java.util.stream.StreamSupport.stream;
2928

3029
import com.rabbitmq.stream.AuthenticationFailureException;
3130
import com.rabbitmq.stream.ByteCapacity;

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.concurrent.BlockingQueue;
2323
import java.util.concurrent.LinkedBlockingQueue;
2424
import java.util.concurrent.TimeUnit;
25-
import java.util.function.BiPredicate;
2625
import org.slf4j.Logger;
2726
import org.slf4j.LoggerFactory;
2827

@@ -33,11 +32,11 @@ final class DynamicBatch<T> implements AutoCloseable {
3332
private static final int MAX_BATCH_SIZE = 8192;
3433

3534
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
36-
private final BiPredicate<List<T>, Boolean> consumer;
35+
private final BatchConsumer<T> consumer;
3736
private final int configuredBatchSize;
3837
private final Thread thread;
3938

40-
DynamicBatch(BiPredicate<List<T>, Boolean> consumer, int batchSize) {
39+
DynamicBatch(BatchConsumer<T> consumer, int batchSize) {
4140
this.consumer = consumer;
4241
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
4342
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
@@ -56,7 +55,6 @@ private void loop() {
5655
State<T> state = new State<>();
5756
state.batchSize = this.configuredBatchSize;
5857
state.items = new ArrayList<>(state.batchSize);
59-
state.retry = false;
6058
Thread currentThread = Thread.currentThread();
6159
T item;
6260
while (!currentThread.isInterrupted()) {
@@ -91,31 +89,32 @@ private static final class State<T> {
9189

9290
int batchSize;
9391
List<T> items;
94-
boolean retry;
9592
}
9693

9794
private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
9895
try {
99-
boolean completed = this.consumer.test(state.items, state.retry);
96+
boolean completed = this.consumer.process(state.items);
10097
if (completed) {
10198
if (increaseIfCompleted) {
10299
state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
103100
} else {
104101
state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
105102
}
106103
state.items = new ArrayList<>(state.batchSize);
107-
state.retry = false;
108-
} else {
109-
state.retry = true;
110104
}
111105
} catch (Exception e) {
112106
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
113-
state.retry = true;
114107
}
115108
}
116109

117110
@Override
118111
public void close() {
119112
this.thread.interrupt();
120113
}
114+
115+
@FunctionalInterface
116+
interface BatchConsumer<T> {
117+
118+
boolean process(List<T> items);
119+
}
121120
}

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,29 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
5959
observationCollector);
6060
this.producer = producer;
6161
this.observationCollector = (ObservationCollector<Object>) observationCollector;
62+
boolean shouldObserve = !this.observationCollector.isNoop();
6263
if (subEntrySize <= 1) {
6364
this.dynamicBatch =
6465
new DynamicBatch<>(
65-
(items, replay) -> {
66-
if (!replay) {
66+
items -> {
67+
boolean result = this.publish(items);
68+
if (result && shouldObserve) {
6769
items.forEach(
6870
i -> {
6971
AccumulatedEntity entity = (AccumulatedEntity) i;
7072
this.observationCollector.published(
7173
entity.observationContext(), entity.confirmationCallback().message());
7274
});
7375
}
74-
return this.publish(items);
76+
return result;
7577
},
7678
batchSize);
7779
} else {
7880
byte compressionCode =
7981
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
8082
this.dynamicBatch =
8183
new DynamicBatch<>(
82-
(items, replay) -> {
84+
items -> {
8385
List<Object> subBatches = new ArrayList<>();
8486
int count = 0;
8587
ProducerUtils.Batch batch =
@@ -88,10 +90,6 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
8890
AccumulatedEntity lastMessageInBatch = null;
8991
for (Object msg : items) {
9092
AccumulatedEntity message = (AccumulatedEntity) msg;
91-
if (!replay) {
92-
this.observationCollector.published(
93-
message.observationContext(), message.confirmationCallback().message());
94-
}
9593
lastMessageInBatch = message;
9694
batch.add(
9795
(Codec.EncodedMessage) message.encodedEntity(),
@@ -116,7 +114,15 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
116114
batch.encodedMessageBatch.close();
117115
subBatches.add(batch);
118116
}
119-
return this.publish(subBatches);
117+
boolean result = this.publish(subBatches);
118+
if (result && shouldObserve) {
119+
for (Object msg : items) {
120+
AccumulatedEntity message = (AccumulatedEntity) msg;
121+
this.observationCollector.published(
122+
message.observationContext(), message.confirmationCallback().message());
123+
}
124+
}
125+
return result;
120126
},
121127
batchSize * subEntrySize);
122128
}

src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,8 @@ public void add(Message message, ConfirmationHandler confirmationHandler) {
7575

7676
AccumulatedEntity get() {
7777
AccumulatedEntity entity = this.messages.poll();
78-
if (entity != null) {
79-
this.observationCollector.published(
80-
entity.observationContext(), entity.confirmationCallback().message());
81-
}
78+
this.observationCollector.published(
79+
entity.observationContext(), entity.confirmationCallback().message());
8280
return entity;
8381
}
8482

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

+84-34
Original file line numberDiff line numberDiff line change
@@ -14,58 +14,108 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17-
import com.codahale.metrics.ConsoleReporter;
18-
import com.codahale.metrics.Histogram;
19-
import com.codahale.metrics.MetricRegistry;
17+
import static com.rabbitmq.stream.impl.Assertions.assertThat;
18+
import static com.rabbitmq.stream.impl.TestUtils.sync;
19+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
20+
21+
import com.codahale.metrics.*;
2022
import com.google.common.util.concurrent.RateLimiter;
23+
import com.rabbitmq.stream.impl.TestUtils.Sync;
24+
import java.util.Locale;
2125
import java.util.Random;
22-
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicInteger;
2328
import java.util.stream.IntStream;
2429
import org.junit.jupiter.api.Test;
2530

2631
public class DynamicBatchTest {
2732

33+
private static void simulateActivity(long duration) {
34+
try {
35+
Thread.sleep(duration);
36+
} catch (InterruptedException e) {
37+
throw new RuntimeException(e);
38+
}
39+
}
40+
41+
private static void printHistogram(Histogram histogram) {
42+
Locale locale = Locale.getDefault();
43+
System.out.printf(locale, " count = %d%n", histogram.getCount());
44+
Snapshot snapshot = histogram.getSnapshot();
45+
System.out.printf(locale, " min = %d%n", snapshot.getMin());
46+
System.out.printf(locale, " max = %d%n", snapshot.getMax());
47+
System.out.printf(locale, " mean = %2.2f%n", snapshot.getMean());
48+
System.out.printf(locale, " stddev = %2.2f%n", snapshot.getStdDev());
49+
System.out.printf(locale, " median = %2.2f%n", snapshot.getMedian());
50+
System.out.printf(locale, " 75%% <= %2.2f%n", snapshot.get75thPercentile());
51+
System.out.printf(locale, " 95%% <= %2.2f%n", snapshot.get95thPercentile());
52+
System.out.printf(locale, " 98%% <= %2.2f%n", snapshot.get98thPercentile());
53+
System.out.printf(locale, " 99%% <= %2.2f%n", snapshot.get99thPercentile());
54+
System.out.printf(locale, " 99.9%% <= %2.2f%n", snapshot.get999thPercentile());
55+
}
56+
2857
@Test
29-
void test() {
58+
void itemAreProcessed() {
3059
MetricRegistry metrics = new MetricRegistry();
3160
Histogram batchSizeMetrics = metrics.histogram("batch-size");
32-
ConsoleReporter reporter =
33-
ConsoleReporter.forRegistry(metrics)
34-
.convertRatesTo(TimeUnit.SECONDS)
35-
.convertDurationsTo(TimeUnit.MILLISECONDS)
36-
.build();
37-
3861
int itemCount = 3000;
39-
TestUtils.Sync sync = TestUtils.sync(itemCount);
62+
Sync sync = sync(itemCount);
4063
Random random = new Random();
41-
DynamicBatch<String> batch =
42-
new DynamicBatch<>(
43-
(items, replay) -> {
44-
batchSizeMetrics.update(items.size());
45-
try {
46-
Thread.sleep(random.nextInt(10) + 1);
47-
} catch (InterruptedException e) {
48-
throw new RuntimeException(e);
49-
}
50-
sync.down(items.size());
51-
return true;
52-
},
53-
100);
54-
try {
55-
RateLimiter rateLimiter = RateLimiter.create(3000);
56-
long start = System.nanoTime();
64+
DynamicBatch.BatchConsumer<String> action =
65+
items -> {
66+
batchSizeMetrics.update(items.size());
67+
simulateActivity(random.nextInt(10) + 1);
68+
sync.down(items.size());
69+
return true;
70+
};
71+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
72+
RateLimiter rateLimiter = RateLimiter.create(10000);
5773
IntStream.range(0, itemCount)
5874
.forEach(
5975
i -> {
6076
rateLimiter.acquire();
6177
batch.add(String.valueOf(i));
6278
});
63-
Assertions.assertThat(sync).completes();
64-
long end = System.nanoTime();
65-
// System.out.println("Done in " + Duration.ofNanos(end - start));
66-
// reporter.report();
67-
} finally {
68-
batch.close();
79+
assertThat(sync).completes();
80+
// printHistogram(batchSizeMetrics);
81+
}
82+
}
83+
84+
@Test
85+
void failedProcessingIsReplayed() throws Exception {
86+
int itemCount = 10000;
87+
AtomicInteger collected = new AtomicInteger(0);
88+
AtomicInteger processed = new AtomicInteger(0);
89+
AtomicBoolean canProcess = new AtomicBoolean(true);
90+
DynamicBatch.BatchConsumer<String> action =
91+
items -> {
92+
boolean result;
93+
if (canProcess.get()) {
94+
collected.addAndGet(items.size());
95+
processed.addAndGet(items.size());
96+
result = true;
97+
} else {
98+
result = false;
99+
}
100+
return result;
101+
};
102+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
103+
int firstRoundCount = itemCount / 5;
104+
IntStream.range(0, firstRoundCount)
105+
.forEach(
106+
i -> {
107+
batch.add(String.valueOf(i));
108+
});
109+
waitAtMost(() -> processed.get() == firstRoundCount);
110+
canProcess.set(false);
111+
IntStream.range(firstRoundCount, itemCount)
112+
.forEach(
113+
i -> {
114+
batch.add(String.valueOf(i));
115+
});
116+
canProcess.set(true);
117+
waitAtMost(() -> processed.get() == itemCount);
118+
waitAtMost(() -> collected.get() == itemCount);
69119
}
70120
}
71121
}

0 commit comments

Comments
 (0)