Skip to content

Commit 8c54607

Browse files
committed
Add helper for message accumulator
1 parent f2ce304 commit 8c54607

File tree

7 files changed

+146
-136
lines changed

7 files changed

+146
-136
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,10 @@
2929

3030
final class DynamicBatchMessageAccumulator implements MessageAccumulator {
3131

32-
private static final Function<Message, String> NULL_FILTER_VALUE_EXTRACTOR = m -> null;
33-
3432
private final DynamicBatch<Object> dynamicBatch;
3533
private final ObservationCollector<Object> observationCollector;
36-
private final ToLongFunction<Message> publishSequenceFunction;
37-
private final String stream;
3834
private final StreamProducer producer;
39-
private final Codec codec;
40-
private final int maxFrameSize;
41-
private final Clock clock;
42-
private final Function<Message, String> filterValueExtractor;
35+
private final ProducerUtils.MessageAccumulatorHelper helper;
4336

4437
@SuppressWarnings("unchecked")
4538
DynamicBatchMessageAccumulator(
@@ -55,15 +48,17 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
5548
ByteBufAllocator byteBufAllocator,
5649
ObservationCollector<?> observationCollector,
5750
StreamProducer producer) {
51+
this.helper =
52+
new ProducerUtils.MessageAccumulatorHelper(
53+
codec,
54+
maxFrameSize,
55+
publishSequenceFunction,
56+
filterValueExtractor,
57+
clock,
58+
stream,
59+
observationCollector);
5860
this.producer = producer;
59-
this.stream = stream;
60-
this.publishSequenceFunction = publishSequenceFunction;
6161
this.observationCollector = (ObservationCollector<Object>) observationCollector;
62-
this.codec = codec;
63-
this.clock = clock;
64-
this.maxFrameSize = maxFrameSize;
65-
this.filterValueExtractor =
66-
filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
6762
if (subEntrySize <= 1) {
6863
this.dynamicBatch =
6964
new DynamicBatch<>(
@@ -93,11 +88,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
9388
List<Object> subBatches = new ArrayList<>();
9489
int count = 0;
9590
ProducerUtils.Batch batch =
96-
new ProducerUtils.Batch(
97-
Client.EncodedMessageBatch.create(
98-
byteBufAllocator, compressionCode, compressionCodec, subEntrySize),
99-
new ProducerUtils.CompositeConfirmationCallback(
100-
new ArrayList<>(subEntrySize)));
91+
this.helper.batch(
92+
byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
10193
AccumulatedEntity lastMessageInBatch = null;
10294
for (Object msg : items) {
10395
AccumulatedEntity message = (AccumulatedEntity) msg;
@@ -115,14 +107,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
115107
subBatches.add(batch);
116108
lastMessageInBatch = null;
117109
batch =
118-
new ProducerUtils.Batch(
119-
Client.EncodedMessageBatch.create(
120-
byteBufAllocator,
121-
compressionCode,
122-
compressionCodec,
123-
subEntrySize),
124-
new ProducerUtils.CompositeConfirmationCallback(
125-
new ArrayList<>(subEntrySize)));
110+
this.helper.batch(
111+
byteBufAllocator, compressionCode, compressionCodec, subEntrySize);
126112
count = 0;
127113
}
128114
}
@@ -145,18 +131,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
145131

146132
@Override
147133
public void add(Message message, ConfirmationHandler confirmationHandler) {
148-
Object observationContext = this.observationCollector.prePublish(this.stream, message);
149-
Codec.EncodedMessage encodedMessage = this.codec.encode(message);
150-
Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
151-
long publishingId = this.publishSequenceFunction.applyAsLong(message);
152-
this.dynamicBatch.add(
153-
new ProducerUtils.SimpleAccumulatedEntity(
154-
this.clock.time(),
155-
publishingId,
156-
this.filterValueExtractor.apply(message),
157-
this.codec.encode(message),
158-
new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler),
159-
observationContext));
134+
this.dynamicBatch.add(helper.entity(message, confirmationHandler));
160135
}
161136

162137
@Override

src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.rabbitmq.stream.*;
1818
import com.rabbitmq.stream.compression.CompressionCodec;
1919
import io.netty.buffer.ByteBufAllocator;
20+
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.function.Function;
2223
import java.util.function.ToLongFunction;
@@ -262,4 +263,60 @@ public Object observationContext() {
262263
"batch entity does not contain only one observation context");
263264
}
264265
}
266+
267+
static final class MessageAccumulatorHelper {
268+
269+
private static final Function<Message, String> NULL_FILTER_VALUE_EXTRACTOR = m -> null;
270+
271+
private final ObservationCollector<Object> observationCollector;
272+
private final ToLongFunction<Message> publishSequenceFunction;
273+
private final String stream;
274+
private final Codec codec;
275+
private final int maxFrameSize;
276+
private final Clock clock;
277+
private final Function<Message, String> filterValueExtractor;
278+
279+
@SuppressWarnings("unchecked")
280+
MessageAccumulatorHelper(
281+
Codec codec,
282+
int maxFrameSize,
283+
ToLongFunction<Message> publishSequenceFunction,
284+
Function<Message, String> filterValueExtractor,
285+
Clock clock,
286+
String stream,
287+
ObservationCollector<?> observationCollector) {
288+
this.publishSequenceFunction = publishSequenceFunction;
289+
this.codec = codec;
290+
this.clock = clock;
291+
this.maxFrameSize = maxFrameSize;
292+
this.filterValueExtractor =
293+
filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
294+
this.observationCollector = (ObservationCollector<Object>) observationCollector;
295+
this.stream = stream;
296+
}
297+
298+
AccumulatedEntity entity(Message message, ConfirmationHandler confirmationHandler) {
299+
Object observationContext = this.observationCollector.prePublish(this.stream, message);
300+
Codec.EncodedMessage encodedMessage = this.codec.encode(message);
301+
Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
302+
long publishingId = this.publishSequenceFunction.applyAsLong(message);
303+
return new ProducerUtils.SimpleAccumulatedEntity(
304+
this.clock.time(),
305+
publishingId,
306+
this.filterValueExtractor.apply(message),
307+
this.codec.encode(message),
308+
new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler),
309+
observationContext);
310+
}
311+
312+
Batch batch(
313+
ByteBufAllocator bba,
314+
byte compressionCode,
315+
CompressionCodec compressionCodec,
316+
int subEntrySize) {
317+
return new ProducerUtils.Batch(
318+
Client.EncodedMessageBatch.create(bba, compressionCode, compressionCodec, subEntrySize),
319+
new ProducerUtils.CompositeConfirmationCallback(new ArrayList<>(subEntrySize)));
320+
}
321+
}
265322
}

src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,11 @@
2626

2727
class SimpleMessageAccumulator implements MessageAccumulator {
2828

29-
private static final Function<Message, String> NULL_FILTER_VALUE_EXTRACTOR = m -> null;
30-
3129
protected final BlockingQueue<AccumulatedEntity> messages;
32-
protected final Clock clock;
3330
private final int capacity;
34-
protected final Codec codec;
35-
private final int maxFrameSize;
36-
private final ToLongFunction<Message> publishSequenceFunction;
37-
private final Function<Message, String> filterValueExtractor;
38-
final String stream;
3931
final ObservationCollector<Object> observationCollector;
4032
private final StreamProducer producer;
33+
final ProducerUtils.MessageAccumulatorHelper helper;
4134

4235
@SuppressWarnings("unchecked")
4336
SimpleMessageAccumulator(
@@ -50,36 +43,25 @@ class SimpleMessageAccumulator implements MessageAccumulator {
5043
String stream,
5144
ObservationCollector<?> observationCollector,
5245
StreamProducer producer) {
46+
this.helper =
47+
new ProducerUtils.MessageAccumulatorHelper(
48+
codec,
49+
maxFrameSize,
50+
publishSequenceFunction,
51+
filterValueExtractor,
52+
clock,
53+
stream,
54+
observationCollector);
5355
this.capacity = capacity;
54-
this.messages = new LinkedBlockingQueue<>(capacity);
55-
this.codec = codec;
56-
this.maxFrameSize = maxFrameSize;
57-
this.publishSequenceFunction = publishSequenceFunction;
58-
this.filterValueExtractor =
59-
filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
60-
this.clock = clock;
61-
this.stream = stream;
56+
this.messages = new LinkedBlockingQueue<>(this.capacity);
6257
this.observationCollector = (ObservationCollector<Object>) observationCollector;
6358
this.producer = producer;
6459
}
6560

6661
public void add(Message message, ConfirmationHandler confirmationHandler) {
67-
Object observationContext = this.observationCollector.prePublish(this.stream, message);
68-
Codec.EncodedMessage encodedMessage = this.codec.encode(message);
69-
Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
70-
long publishingId = this.publishSequenceFunction.applyAsLong(message);
62+
AccumulatedEntity entity = this.helper.entity(message, confirmationHandler);
7163
try {
72-
boolean offered =
73-
messages.offer(
74-
new ProducerUtils.SimpleAccumulatedEntity(
75-
clock.time(),
76-
publishingId,
77-
this.filterValueExtractor.apply(message),
78-
encodedMessage,
79-
new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler),
80-
observationContext),
81-
60,
82-
TimeUnit.SECONDS);
64+
boolean offered = messages.offer(entity, 60, TimeUnit.SECONDS);
8365
if (!offered) {
8466
throw new StreamException("Could not accumulate outbound message");
8567
}

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,10 @@ public int fragmentLength(Object entity) {
172172
if (compression != null) {
173173
compressionCodec = environment.compressionCodecFactory().get(compression);
174174
}
175+
boolean dynamicBatch = true;
175176
this.accumulator =
176177
ProducerUtils.createMessageAccumulator(
177-
true,
178+
dynamicBatch,
178179
subEntrySize,
179180
batchSize,
180181
compressionCodec,
@@ -188,7 +189,11 @@ public int fragmentLength(Object entity) {
188189
environment.observationCollector(),
189190
this);
190191

191-
if (!batchPublishingDelay.isNegative() && !batchPublishingDelay.isZero()) {
192+
boolean backgroundBatchPublishingTaskRequired =
193+
!dynamicBatch && batchPublishingDelay.toMillis() > 0;
194+
LOGGER.debug(
195+
"Background batch publishing task required? {}", backgroundBatchPublishingTaskRequired);
196+
if (backgroundBatchPublishingTaskRequired) {
192197
AtomicReference<Runnable> taskReference = new AtomicReference<>();
193198
Runnable task =
194199
() -> {

src/main/java/com/rabbitmq/stream/impl/SubEntryMessageAccumulator.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,15 @@
2020
import com.rabbitmq.stream.ObservationCollector;
2121
import com.rabbitmq.stream.compression.Compression;
2222
import com.rabbitmq.stream.compression.CompressionCodec;
23-
import com.rabbitmq.stream.impl.Client.EncodedMessageBatch;
2423
import io.netty.buffer.ByteBufAllocator;
25-
import java.util.ArrayList;
2624
import java.util.function.ToLongFunction;
2725

2826
final class SubEntryMessageAccumulator extends SimpleMessageAccumulator {
2927

3028
private final int subEntrySize;
3129
private final CompressionCodec compressionCodec;
3230
private final ByteBufAllocator byteBufAllocator;
33-
private final byte compression;
31+
private final byte compressionCode;
3432

3533
public SubEntryMessageAccumulator(
3634
int subEntrySize,
@@ -56,15 +54,14 @@ public SubEntryMessageAccumulator(
5654
producer);
5755
this.subEntrySize = subEntrySize;
5856
this.compressionCodec = compressionCodec;
59-
this.compression = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
57+
this.compressionCode =
58+
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
6059
this.byteBufAllocator = byteBufAllocator;
6160
}
6261

6362
private ProducerUtils.Batch createBatch() {
64-
return new ProducerUtils.Batch(
65-
EncodedMessageBatch.create(
66-
byteBufAllocator, compression, compressionCodec, this.subEntrySize),
67-
new ProducerUtils.CompositeConfirmationCallback(new ArrayList<>(this.subEntrySize)));
63+
return this.helper.batch(
64+
this.byteBufAllocator, this.compressionCode, this.compressionCodec, this.subEntrySize);
6865
}
6966

7067
@Override
@@ -73,7 +70,7 @@ protected ProducerUtils.AccumulatedEntity get() {
7370
return null;
7471
}
7572
int count = 0;
76-
ProducerUtils.Batch batch = createBatch();
73+
ProducerUtils.Batch batch = this.createBatch();
7774
ProducerUtils.AccumulatedEntity lastMessageInBatch = null;
7875
while (count != this.subEntrySize) {
7976
ProducerUtils.AccumulatedEntity message = messages.poll();

0 commit comments

Comments
 (0)