Skip to content

Commit 817d57a

Browse files
committed
Add dynamic-batch message accumulator
1 parent 36c264d commit 817d57a

8 files changed

+312
-80
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,21 @@
2222
import java.util.concurrent.BlockingQueue;
2323
import java.util.concurrent.LinkedBlockingQueue;
2424
import java.util.concurrent.TimeUnit;
25-
import java.util.concurrent.atomic.AtomicLong;
26-
import java.util.function.Consumer;
25+
import java.util.function.Predicate;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2728

2829
class DynamicBatch<T> {
2930

31+
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
3032
private static final int MIN_BATCH_SIZE = 32;
3133
private static final int MAX_BATCH_SIZE = 8192;
3234

33-
final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
34-
final Consumer<List<T>> consumer;
35-
final int configuredBatchSize;
36-
private final AtomicLong count = new AtomicLong(0);
35+
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
36+
private final Predicate<List<T>> consumer;
37+
private final int configuredBatchSize;
3738

38-
DynamicBatch(Consumer<List<T>> consumer, int batchSize) {
39+
DynamicBatch(Predicate<List<T>> consumer, int batchSize) {
3940
this.consumer = consumer;
4041
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
4142
new Thread(this::loop).start();
@@ -44,7 +45,6 @@ class DynamicBatch<T> {
4445
void add(T item) {
4546
try {
4647
requests.put(item);
47-
this.count.incrementAndGet();
4848
} catch (InterruptedException e) {
4949
throw new RuntimeException(e);
5050
}
@@ -65,33 +65,42 @@ private void loop() {
6565
if (item != null) {
6666
batch.add(item);
6767
if (batch.size() >= batchSize) {
68-
this.completeBatch(batch);
69-
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
70-
batch = new ArrayList<>(batchSize);
68+
if (this.completeBatch(batch)) {
69+
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
70+
batch = new ArrayList<>(batchSize);
71+
}
7172
} else {
7273
item = this.requests.poll();
7374
if (item == null) {
74-
this.completeBatch(batch);
75-
batchSize = max(batchSize / 2, MIN_BATCH_SIZE);
76-
batch = new ArrayList<>(batchSize);
75+
if (this.completeBatch(batch)) {
76+
batchSize = max(batchSize / 2, MIN_BATCH_SIZE);
77+
batch = new ArrayList<>(batchSize);
78+
}
7779
} else {
7880
batch.add(item);
7981
if (batch.size() >= batchSize) {
80-
this.completeBatch(batch);
81-
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
82-
batch = new ArrayList<>(batchSize);
82+
if (this.completeBatch(batch)) {
83+
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
84+
batch = new ArrayList<>(batchSize);
85+
}
8386
}
8487
}
8588
}
8689
} else {
87-
this.completeBatch(batch);
88-
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
89-
batch = new ArrayList<>(batchSize);
90+
if (this.completeBatch(batch)) {
91+
batchSize = min(batchSize * 2, MAX_BATCH_SIZE);
92+
batch = new ArrayList<>(batchSize);
93+
}
9094
}
9195
}
9296
}
9397

94-
private void completeBatch(List<T> items) {
95-
this.consumer.accept(items);
98+
private boolean completeBatch(List<T> items) {
99+
try {
100+
return this.consumer.test(items);
101+
} catch (Exception e) {
102+
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
103+
return false;
104+
}
96105
}
97106
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import com.rabbitmq.stream.Codec;
18+
import com.rabbitmq.stream.ConfirmationHandler;
19+
import com.rabbitmq.stream.Message;
20+
import com.rabbitmq.stream.ObservationCollector;
21+
import com.rabbitmq.stream.compression.Compression;
22+
import com.rabbitmq.stream.compression.CompressionCodec;
23+
import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity;
24+
import io.netty.buffer.ByteBufAllocator;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.function.Function;
28+
import java.util.function.ToLongFunction;
29+
30+
final class DynamicBatchMessageAccumulator implements MessageAccumulator {
31+
32+
private static final Function<Message, String> NULL_FILTER_VALUE_EXTRACTOR = m -> null;
33+
34+
private final DynamicBatch<Object> dynamicBatch;
35+
private final ObservationCollector<Object> observationCollector;
36+
private final ToLongFunction<Message> publishSequenceFunction;
37+
private final String stream;
38+
private final StreamProducer producer;
39+
private final Codec codec;
40+
private final int maxFrameSize;
41+
private final Clock clock;
42+
private final Function<Message, String> filterValueExtractor;
43+
44+
@SuppressWarnings("unchecked")
45+
DynamicBatchMessageAccumulator(
46+
int subEntrySize,
47+
int batchSize,
48+
Codec codec,
49+
int maxFrameSize,
50+
ToLongFunction<Message> publishSequenceFunction,
51+
Function<Message, String> filterValueExtractor,
52+
Clock clock,
53+
String stream,
54+
CompressionCodec compressionCodec,
55+
ByteBufAllocator byteBufAllocator,
56+
ObservationCollector<?> observationCollector,
57+
StreamProducer producer) {
58+
this.producer = producer;
59+
this.stream = stream;
60+
this.publishSequenceFunction = publishSequenceFunction;
61+
this.observationCollector = (ObservationCollector<Object>) observationCollector;
62+
this.codec = codec;
63+
this.clock = clock;
64+
this.maxFrameSize = maxFrameSize;
65+
this.filterValueExtractor =
66+
filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
67+
if (subEntrySize <= 1) {
68+
this.dynamicBatch = new DynamicBatch<>(this::publish, batchSize);
69+
} else {
70+
byte compressionCode =
71+
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
72+
this.dynamicBatch =
73+
new DynamicBatch<>(
74+
items -> {
75+
if (this.producer.canSend()) {
76+
List<Object> subBatches = new ArrayList<>();
77+
int count = 0;
78+
ProducerUtils.Batch batch =
79+
new ProducerUtils.Batch(
80+
Client.EncodedMessageBatch.create(
81+
byteBufAllocator, compressionCode, compressionCodec, subEntrySize),
82+
new ProducerUtils.CompositeConfirmationCallback(
83+
new ArrayList<>(subEntrySize)));
84+
AccumulatedEntity lastMessageInBatch = null;
85+
for (Object msg : items) {
86+
AccumulatedEntity message = (AccumulatedEntity) msg;
87+
this.observationCollector.published(
88+
message.observationContext(), message.confirmationCallback().message());
89+
lastMessageInBatch = message;
90+
batch.add(
91+
(Codec.EncodedMessage) message.encodedEntity(),
92+
message.confirmationCallback());
93+
count++;
94+
if (count == subEntrySize) {
95+
batch.time = lastMessageInBatch.time();
96+
batch.publishingId = lastMessageInBatch.publishingId();
97+
batch.encodedMessageBatch.close();
98+
subBatches.add(batch);
99+
lastMessageInBatch = null;
100+
batch =
101+
new ProducerUtils.Batch(
102+
Client.EncodedMessageBatch.create(
103+
byteBufAllocator,
104+
compressionCode,
105+
compressionCodec,
106+
subEntrySize),
107+
new ProducerUtils.CompositeConfirmationCallback(
108+
new ArrayList<>(subEntrySize)));
109+
count = 0;
110+
}
111+
}
112+
113+
if (!batch.isEmpty() && count < subEntrySize) {
114+
batch.time = lastMessageInBatch.time();
115+
batch.publishingId = lastMessageInBatch.publishingId();
116+
batch.encodedMessageBatch.close();
117+
subBatches.add(batch);
118+
}
119+
120+
return this.publish(subBatches);
121+
} else {
122+
return false;
123+
}
124+
},
125+
batchSize * subEntrySize);
126+
}
127+
}
128+
129+
@Override
130+
public void add(Message message, ConfirmationHandler confirmationHandler) {
131+
Object observationContext = this.observationCollector.prePublish(this.stream, message);
132+
Codec.EncodedMessage encodedMessage = this.codec.encode(message);
133+
Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
134+
long publishingId = this.publishSequenceFunction.applyAsLong(message);
135+
this.dynamicBatch.add(
136+
new ProducerUtils.SimpleAccumulatedEntity(
137+
this.clock.time(),
138+
publishingId,
139+
this.filterValueExtractor.apply(message),
140+
this.codec.encode(message),
141+
new ProducerUtils.SimpleConfirmationCallback(message, confirmationHandler),
142+
observationContext));
143+
}
144+
145+
@Override
146+
public int size() {
147+
// TODO compute dynamic batch message accumulator pending message count
148+
return 0;
149+
}
150+
151+
@Override
152+
public void flush(boolean force) {}
153+
154+
private boolean publish(List<Object> entities) {
155+
if (this.producer.canSend()) {
156+
this.producer.publishInternal(entities);
157+
return true;
158+
} else {
159+
return false;
160+
}
161+
}
162+
}

src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,74 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17-
import com.rabbitmq.stream.Codec;
18-
import com.rabbitmq.stream.ConfirmationHandler;
19-
import com.rabbitmq.stream.ConfirmationStatus;
20-
import com.rabbitmq.stream.Message;
17+
import com.rabbitmq.stream.*;
18+
import com.rabbitmq.stream.compression.CompressionCodec;
19+
import io.netty.buffer.ByteBufAllocator;
2120
import java.util.List;
21+
import java.util.function.Function;
22+
import java.util.function.ToLongFunction;
2223

2324
final class ProducerUtils {
2425

2526
private ProducerUtils() {}
2627

28+
static MessageAccumulator createMessageAccumulator(
29+
boolean dynamicBatch,
30+
int subEntrySize,
31+
int batchSize,
32+
CompressionCodec compressionCodec,
33+
Codec codec,
34+
ByteBufAllocator byteBufAllocator,
35+
int maxFrameSize,
36+
ToLongFunction<Message> publishSequenceFunction,
37+
Function<Message, String> filterValueExtractor,
38+
Clock clock,
39+
String stream,
40+
ObservationCollector<?> observationCollector,
41+
StreamProducer producer) {
42+
if (dynamicBatch) {
43+
return new DynamicBatchMessageAccumulator(
44+
subEntrySize,
45+
batchSize,
46+
codec,
47+
maxFrameSize,
48+
publishSequenceFunction,
49+
filterValueExtractor,
50+
clock,
51+
stream,
52+
compressionCodec,
53+
byteBufAllocator,
54+
observationCollector,
55+
producer);
56+
} else {
57+
if (subEntrySize <= 1) {
58+
return new SimpleMessageAccumulator(
59+
batchSize,
60+
codec,
61+
maxFrameSize,
62+
publishSequenceFunction,
63+
filterValueExtractor,
64+
clock,
65+
stream,
66+
observationCollector,
67+
producer);
68+
} else {
69+
return new SubEntryMessageAccumulator(
70+
subEntrySize,
71+
batchSize,
72+
compressionCodec,
73+
codec,
74+
byteBufAllocator,
75+
maxFrameSize,
76+
publishSequenceFunction,
77+
clock,
78+
stream,
79+
observationCollector,
80+
producer);
81+
}
82+
}
83+
}
84+
2785
interface ConfirmationCallback {
2886

2987
int handle(boolean confirmed, short code);

src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,20 @@
1515
package com.rabbitmq.stream.impl;
1616

1717
import com.rabbitmq.stream.*;
18+
import com.rabbitmq.stream.impl.ProducerUtils.AccumulatedEntity;
1819
import java.util.ArrayList;
1920
import java.util.List;
2021
import java.util.concurrent.BlockingQueue;
2122
import java.util.concurrent.LinkedBlockingQueue;
2223
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.atomic.AtomicInteger;
2424
import java.util.function.Function;
2525
import java.util.function.ToLongFunction;
2626

2727
class SimpleMessageAccumulator implements MessageAccumulator {
2828

2929
private static final Function<Message, String> NULL_FILTER_VALUE_EXTRACTOR = m -> null;
3030

31-
protected final BlockingQueue<ProducerUtils.AccumulatedEntity> messages;
31+
protected final BlockingQueue<AccumulatedEntity> messages;
3232
protected final Clock clock;
3333
private final int capacity;
3434
protected final Codec codec;
@@ -93,8 +93,8 @@ public void add(Message message, ConfirmationHandler confirmationHandler) {
9393
}
9494
}
9595

96-
ProducerUtils.AccumulatedEntity get() {
97-
ProducerUtils.AccumulatedEntity entity = this.messages.poll();
96+
AccumulatedEntity get() {
97+
AccumulatedEntity entity = this.messages.poll();
9898
if (entity != null) {
9999
this.observationCollector.published(
100100
entity.observationContext(), entity.confirmationCallback().message());
@@ -113,24 +113,20 @@ public void flush(boolean force) {
113113
synchronized (this.producer) {
114114
publishBatch(stateCheck);
115115
}
116-
// System.out.println(sent.get());
117116
}
118117

119-
AtomicInteger sent = new AtomicInteger();
120-
121118
private void publishBatch(boolean stateCheck) {
122119
if ((!stateCheck || this.producer.canSend()) && !this.messages.isEmpty()) {
123120
List<Object> entities = new ArrayList<>(this.capacity);
124121
int batchCount = 0;
125122
while (batchCount != this.capacity) {
126-
ProducerUtils.AccumulatedEntity entity = this.get();
123+
AccumulatedEntity entity = this.get();
127124
if (entity == null) {
128125
break;
129126
}
130127
entities.add(entity);
131128
batchCount++;
132129
}
133-
this.sent.addAndGet(entities.size());
134130
producer.publishInternal(entities);
135131
}
136132
}

0 commit comments

Comments
 (0)