Skip to content

Commit f2ce304

Browse files
committed
Test against dynamic-batch message accumulator
1 parent 127e3ca commit f2ce304

File tree

4 files changed

+42
-11
lines changed

4 files changed

+42
-11
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,24 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
6565
this.filterValueExtractor =
6666
filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
6767
if (subEntrySize <= 1) {
68-
this.dynamicBatch = new DynamicBatch<>(this::publish, batchSize);
68+
this.dynamicBatch =
69+
new DynamicBatch<>(
70+
items -> {
71+
// TODO add a "replay" flag to DynamicBatch to avoid checking the producer status
72+
// the status check helps to avoid collecting the observation another time
73+
if (producer.canSend()) {
74+
items.forEach(
75+
i -> {
76+
AccumulatedEntity entity = (AccumulatedEntity) i;
77+
this.observationCollector.published(
78+
entity.observationContext(), entity.confirmationCallback().message());
79+
});
80+
return this.publish(items);
81+
} else {
82+
return false;
83+
}
84+
},
85+
batchSize);
6986
} else {
7087
byte compressionCode =
7188
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public int fragmentLength(Object entity) {
174174
}
175175
this.accumulator =
176176
ProducerUtils.createMessageAccumulator(
177-
false,
177+
true,
178178
subEntrySize,
179179
batchSize,
180180
compressionCodec,
@@ -306,8 +306,10 @@ private long computeFirstValueOfPublishingSequence() {
306306
}
307307
}
308308

309+
// visible for testing
309310
void confirm(long publishingId) {
310311
AccumulatedEntity accumulatedEntity = this.unconfirmedMessages.remove(publishingId);
312+
311313
if (accumulatedEntity != null) {
312314
int confirmedCount =
313315
accumulatedEntity.confirmationCallback().handle(true, Constants.RESPONSE_CODE_OK);
@@ -317,6 +319,11 @@ void confirm(long publishingId) {
317319
}
318320
}
319321

322+
// for testing
323+
int unconfirmedCount() {
324+
return this.unconfirmedMessages.size();
325+
}
326+
320327
void error(long publishingId, short errorCode) {
321328
AccumulatedEntity accumulatedEntity = unconfirmedMessages.remove(publishingId);
322329
if (accumulatedEntity != null) {

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.codahale.metrics.Histogram;
1919
import com.codahale.metrics.MetricRegistry;
2020
import com.google.common.util.concurrent.RateLimiter;
21-
import java.time.Duration;
2221
import java.util.Random;
2322
import java.util.concurrent.TimeUnit;
2423
import java.util.stream.IntStream;
@@ -62,7 +61,7 @@ void test() {
6261
});
6362
Assertions.assertThat(sync).completes();
6463
long end = System.nanoTime();
65-
System.out.println("Done in " + Duration.ofNanos(end - start));
66-
reporter.report();
64+
// System.out.println("Done in " + Duration.ofNanos(end - start));
65+
// reporter.report();
6766
}
6867
}

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.rabbitmq.stream.impl;
1616

1717
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
18+
import static java.util.stream.IntStream.range;
1819
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2021
import static org.mockito.ArgumentMatchers.*;
@@ -43,8 +44,8 @@
4344
import java.util.concurrent.atomic.AtomicBoolean;
4445
import java.util.concurrent.atomic.AtomicInteger;
4546
import java.util.function.ToLongFunction;
46-
import java.util.stream.IntStream;
4747
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
48+
import org.assertj.core.data.Offset;
4849
import org.junit.jupiter.api.AfterEach;
4950
import org.junit.jupiter.api.BeforeEach;
5051
import org.junit.jupiter.params.ParameterizedTest;
@@ -184,26 +185,33 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
184185
null,
185186
env);
186187

187-
IntStream.range(0, messageCount)
188+
range(0, messageCount)
188189
.forEach(
189190
i ->
190191
producer.send(
191192
producer.messageBuilder().addData("".getBytes()).build(), confirmationHandler));
192193

193-
IntStream.range(0, confirmedPart).forEach(publishingId -> producer.confirm(publishingId));
194-
assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed);
194+
waitAtMost(() -> producer.unconfirmedCount() >= messageCount / subEntrySize);
195+
range(0, confirmedPart).forEach(producer::confirm);
196+
if (subEntrySize == 1) {
197+
assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed);
198+
} else {
199+
assertThat(confirmedCount.get()).isCloseTo(confirmedCount.get(), Offset.offset(subEntrySize));
200+
}
195201
assertThat(erroredCount.get()).isZero();
202+
int confirmedPreviously = confirmedCount.get();
196203

197204
executorService.scheduleAtFixedRate(() -> clock.refresh(), 100, 100, TimeUnit.MILLISECONDS);
198205

199206
Thread.sleep(waitTime.toMillis());
200-
assertThat(confirmedCount.get()).isEqualTo(expectedConfirmed);
207+
assertThat(confirmedCount.get()).isEqualTo(confirmedPreviously);
201208
if (confirmTimeout.isZero()) {
202209
assertThat(erroredCount.get()).isZero();
203210
assertThat(responseCodes).isEmpty();
204211
} else {
205212
waitAtMost(
206-
waitTime.multipliedBy(2), () -> erroredCount.get() == (messageCount - expectedConfirmed));
213+
waitTime.multipliedBy(2),
214+
() -> erroredCount.get() == (messageCount - confirmedPreviously));
207215
assertThat(responseCodes).hasSize(1).contains(Constants.CODE_PUBLISH_CONFIRM_TIMEOUT);
208216
}
209217
}

0 commit comments

Comments
 (0)