Skip to content

Commit 36c264d

Browse files
committed
Refactor message accumulator abstraction for dynamic batching
1 parent 16396df commit 36c264d

File tree

6 files changed

+335
-306
lines changed

6 files changed

+335
-306
lines changed

src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,9 @@
1919

2020
interface MessageAccumulator {
2121

22-
boolean add(Message message, ConfirmationHandler confirmationHandler);
23-
24-
AccumulatedEntity get();
25-
26-
boolean isEmpty();
22+
void add(Message message, ConfirmationHandler confirmationHandler);
2723

2824
int size();
2925

30-
interface AccumulatedEntity {
31-
32-
long time();
33-
34-
long publishingId();
35-
36-
String filterValue();
37-
38-
Object encodedEntity();
39-
40-
StreamProducer.ConfirmationCallback confirmationCallback();
41-
42-
Object observationContext();
43-
}
26+
void flush(boolean force);
4427
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import com.rabbitmq.stream.Codec;
18+
import com.rabbitmq.stream.ConfirmationHandler;
19+
import com.rabbitmq.stream.ConfirmationStatus;
20+
import com.rabbitmq.stream.Message;
21+
import java.util.List;
22+
23+
final class ProducerUtils {
24+
25+
private ProducerUtils() {}
26+
27+
interface ConfirmationCallback {
28+
29+
int handle(boolean confirmed, short code);
30+
31+
Message message();
32+
}
33+
34+
interface AccumulatedEntity {
35+
36+
long time();
37+
38+
long publishingId();
39+
40+
String filterValue();
41+
42+
Object encodedEntity();
43+
44+
ConfirmationCallback confirmationCallback();
45+
46+
Object observationContext();
47+
}
48+
49+
static final class SimpleConfirmationCallback implements ConfirmationCallback {
50+
51+
private final Message message;
52+
private final ConfirmationHandler confirmationHandler;
53+
54+
SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
55+
this.message = message;
56+
this.confirmationHandler = confirmationHandler;
57+
}
58+
59+
@Override
60+
public int handle(boolean confirmed, short code) {
61+
confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code));
62+
return 1;
63+
}
64+
65+
@Override
66+
public Message message() {
67+
return this.message;
68+
}
69+
}
70+
71+
static final class SimpleAccumulatedEntity implements AccumulatedEntity {
72+
73+
private final long time;
74+
private final long publishingId;
75+
private final String filterValue;
76+
private final Codec.EncodedMessage encodedMessage;
77+
private final ConfirmationCallback confirmationCallback;
78+
private final Object observationContext;
79+
80+
SimpleAccumulatedEntity(
81+
long time,
82+
long publishingId,
83+
String filterValue,
84+
Codec.EncodedMessage encodedMessage,
85+
ConfirmationCallback confirmationCallback,
86+
Object observationContext) {
87+
this.time = time;
88+
this.publishingId = publishingId;
89+
this.encodedMessage = encodedMessage;
90+
this.filterValue = filterValue;
91+
this.confirmationCallback = confirmationCallback;
92+
this.observationContext = observationContext;
93+
}
94+
95+
@Override
96+
public long publishingId() {
97+
return publishingId;
98+
}
99+
100+
@Override
101+
public String filterValue() {
102+
return filterValue;
103+
}
104+
105+
@Override
106+
public Object encodedEntity() {
107+
return encodedMessage;
108+
}
109+
110+
@Override
111+
public long time() {
112+
return time;
113+
}
114+
115+
@Override
116+
public ConfirmationCallback confirmationCallback() {
117+
return confirmationCallback;
118+
}
119+
120+
@Override
121+
public Object observationContext() {
122+
return this.observationContext;
123+
}
124+
}
125+
126+
static final class CompositeConfirmationCallback implements ConfirmationCallback {
127+
128+
private final List<ConfirmationCallback> callbacks;
129+
130+
CompositeConfirmationCallback(List<ConfirmationCallback> callbacks) {
131+
this.callbacks = callbacks;
132+
}
133+
134+
private void add(ConfirmationCallback confirmationCallback) {
135+
this.callbacks.add(confirmationCallback);
136+
}
137+
138+
@Override
139+
public int handle(boolean confirmed, short code) {
140+
for (ConfirmationCallback callback : callbacks) {
141+
callback.handle(confirmed, code);
142+
}
143+
return callbacks.size();
144+
}
145+
146+
@Override
147+
public Message message() {
148+
throw new UnsupportedOperationException(
149+
"composite confirmation callback does not contain just one message");
150+
}
151+
}
152+
153+
static final class Batch implements AccumulatedEntity {
154+
155+
final Client.EncodedMessageBatch encodedMessageBatch;
156+
private final CompositeConfirmationCallback confirmationCallback;
157+
volatile long publishingId;
158+
volatile long time;
159+
160+
Batch(
161+
Client.EncodedMessageBatch encodedMessageBatch,
162+
CompositeConfirmationCallback confirmationCallback) {
163+
this.encodedMessageBatch = encodedMessageBatch;
164+
this.confirmationCallback = confirmationCallback;
165+
}
166+
167+
void add(Codec.EncodedMessage encodedMessage, ConfirmationCallback confirmationCallback) {
168+
this.encodedMessageBatch.add(encodedMessage);
169+
this.confirmationCallback.add(confirmationCallback);
170+
}
171+
172+
boolean isEmpty() {
173+
return this.confirmationCallback.callbacks.isEmpty();
174+
}
175+
176+
@Override
177+
public long publishingId() {
178+
return publishingId;
179+
}
180+
181+
@Override
182+
public String filterValue() {
183+
return null;
184+
}
185+
186+
@Override
187+
public Object encodedEntity() {
188+
return encodedMessageBatch;
189+
}
190+
191+
@Override
192+
public long time() {
193+
return time;
194+
}
195+
196+
@Override
197+
public ConfirmationCallback confirmationCallback() {
198+
return confirmationCallback;
199+
}
200+
201+
@Override
202+
public Object observationContext() {
203+
throw new UnsupportedOperationException(
204+
"batch entity does not contain only one observation context");
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)