-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathConsumerBuilder.java
309 lines (277 loc) · 7.93 KB
/
ConsumerBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
package com.rabbitmq.stream;
import java.time.Duration;
import java.util.function.Predicate;
/** API to configure and create a {@link Consumer}. */
public interface ConsumerBuilder {
/**
* The stream to consume from.
*
* @param stream
* @return this builder instance
*/
ConsumerBuilder stream(String stream);
/**
* Set the consumer to consume from a super stream (partitioned stream).
*
* <p>This is meant to be used with {@link #singleActiveConsumer()}.
*
* <p>This is an experimental API, subject to change.
*
* <p>RabbitMQ 3.11 or more is required.
*
* @param superStream
* @return this builder instance
* @see #singleActiveConsumer()
*/
ConsumerBuilder superStream(String superStream);
/**
* The offset to start consuming from.
*
* <p>The default is {@link OffsetSpecification#next()} (the end of the stream).
*
* @param offsetSpecification
* @return this builder instance
*/
ConsumerBuilder offset(OffsetSpecification offsetSpecification);
/**
* The callback for inbound messages.
*
* @param messageHandler
* @return this builder instance
*/
ConsumerBuilder messageHandler(MessageHandler messageHandler);
/**
* The logical name of the {@link Consumer}.
*
* <p>Set a logical name to enable offset tracking.
*
* @param name
* @return this builder instance
*/
ConsumerBuilder name(String name);
/**
* Declare the consumer as a single active consumer.
*
* <p>A single active consumer must set up a name with {@link #name(String)}.
*
* <p>Instances of the same application can declare several single active consumer instances with
* the same name and only one will be active at a time, meaning it will be the only one to get
* messages from the broker.
*
* <p>If the active consumer instance stops or crashes, the broker will choose a new active
* instance among the remaining ones.
*
* <p>This is an experimental API, subject to change.
*
* <p>RabbitMQ 3.11 or more is required.
*
* @return this builder instance
* @since 0.6.0
* @see #name(String)
*/
ConsumerBuilder singleActiveConsumer();
/**
* Set the listener for single active consumer updates.
*
* <p>This listener is usually set when manual offset tracking is used, either server-side or with
* an external datastore.
*
* <p>This is an experimental API, subject to change.
*
* <p>RabbitMQ 3.11 or more is required.
*
* @param consumerUpdateListener
* @return this builder instance
* @since 0.6.0
* @see #singleActiveConsumer()
* @see ConsumerUpdateListener
* @see #manualTrackingStrategy()
*/
ConsumerBuilder consumerUpdateListener(ConsumerUpdateListener consumerUpdateListener);
/**
* Callback on subscription.
*
* <p>Can be used to set the offset specification before subscribing to the stream.
*
* <p>This is an experimental API, subject to change.
*
* @see SubscriptionListener
* @param subscriptionListener the listener
* @return this builder instance
* @since 0.5.0
*/
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
/**
* Enable {@link ManualTrackingStrategy}.
*
* @return the manual tracking strategy
*/
ManualTrackingStrategy manualTrackingStrategy();
/**
* Enable {@link AutoTrackingStrategy}.
*
* <p>This is the default tracking strategy.
*
* @return the auto-tracking strategy
*/
AutoTrackingStrategy autoTrackingStrategy();
/**
* Disable server-side offset tracking.
*
* <p>Useful when {@link #singleActiveConsumer()} is enabled and an external store is used for
* offset tracking. This avoids automatic server-side offset tracking to kick in.
*
* @return this builder instance
* @since 0.6.0
*/
ConsumerBuilder noTrackingStrategy();
/**
* Configure the filtering.
*
* <p>RabbitMQ 3.13 or more is required.
*
* @return the filtering configuration
*/
FilterConfiguration filter();
/**
* Configure flow of messages.
*
* @return the flow configuration
* @since 0.11.0
*/
FlowConfiguration flow();
/**
* Create the configured {@link Consumer}
*
* @return the configured consumer
*/
Consumer build();
/** Manual tracking strategy. */
interface ManualTrackingStrategy {
/**
* Interval to check if the last requested stored offset has been actually stored.
*
* <p>Default is 5 seconds.
*
* @param checkInterval
* @return the manual tracking strategy
*/
ManualTrackingStrategy checkInterval(Duration checkInterval);
/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}
/** Auto-tracking strategy. */
interface AutoTrackingStrategy {
/**
* Number of messages before storing.
*
* <p>Default is 10,000.
*
* @param messageCountBeforeStorage
* @return the auto-tracking strategy
*/
AutoTrackingStrategy messageCountBeforeStorage(int messageCountBeforeStorage);
/**
* Interval to check and stored the last received offset in case of inactivity.
*
* <p>Default is 5 seconds.
*
* @param flushInterval
* @return the auto-tracking strategy
*/
AutoTrackingStrategy flushInterval(Duration flushInterval);
/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}
/**
* Message flow configuration.
*
* @since 0.11.0
*/
interface FlowConfiguration {
/**
* The number of initial credits for the subscription.
*
* <p>Default is 1.
*
* @param initialCredits the number of initial credits
* @return this configuration instance
*/
FlowConfiguration initialCredits(int initialCredits);
/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}
/**
* Filter configuration.
*
* <p>RabbitMQ 3.13 or more is required.
*/
interface FilterConfiguration {
/**
* Set the filter values.
*
* @param filterValues
* @return this filter configuration instance
*/
FilterConfiguration values(String... filterValues);
/**
* Client-side filtering logic, occurring after the server-side filtering.
*
* <p>It must be consistent with the requested filter {@link #values( String...)} and the {@link
* #matchUnfiltered()} flag.
*
* @param filter a predicate that returns <code>true</code> if a message should go to the {@link
* MessageHandler}
* @return this filter configuration instance
*/
FilterConfiguration postFilter(Predicate<Message> filter);
/**
* Whether messages without a filter value should be sent as well.
*
* <p>Default is false.
*
* @return this filter configuration instance
*/
FilterConfiguration matchUnfiltered();
/**
* Whether messages without a filter value should be sent as well.
*
* <p>Default is false.
*
* @param matchUnfiltered
* @return this filter configuration instance
*/
FilterConfiguration matchUnfiltered(boolean matchUnfiltered);
/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}
}