forked from rabbitmq/rabbitmq-stream-java-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsumerBuilder.java
261 lines (236 loc) · 8.41 KB
/
ConsumerBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
package com.rabbitmq.stream;
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategy;
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilder;
import com.rabbitmq.stream.flow.ConsumerFlowControlStrategyBuilderFactory;
import java.time.Duration;
/** API to configure and create a {@link Consumer}. */
public interface ConsumerBuilder {
/**
* The stream to consume from.
*
* @param stream
* @return this builder instance
*/
ConsumerBuilder stream(String stream);
/**
* Set the consumer to consume from a super stream (partitioned stream).
*
* <p>This is meant to be used with {@link #singleActiveConsumer()}.
*
* <p>This is an experimental API, subject to change.
*
* <p>RabbitMQ 3.11 or more is required.
*
* @param superStream
* @return this builder instance
* @see #singleActiveConsumer()
*/
ConsumerBuilder superStream(String superStream);
/**
* The offset to start consuming from.
*
* <p>The default is {@link OffsetSpecification#next()} (the end of the stream).
*
* @param offsetSpecification
* @return this builder instance
*/
ConsumerBuilder offset(OffsetSpecification offsetSpecification);
/**
* The callback for inbound messages.
*
* @param messageHandler
* @return this builder instance
*/
ConsumerBuilder messageHandler(MessageHandler messageHandler);
/**
* Configure prefetching parameters for synchronous flow control.
*
* <p>
* Treat the parameters as an abstract scale at the {@link Consumer} level.
* </p>
*
* @param initialPrefetchLevel The initial level of message pre-fetching.
* This may me implemented as the credits to initially
* ask for with each new subscription,
* but do not depend strongly on this aspect.
* @param prefetchLevelAfterDelivery The level of message pre-fetching after the initial batch.
* This may be implemented as the credits to ask for after a chunk
* is delivered on each subscription,
* but do not depend strongly on this aspect.
* <b>
* The recommended value is <code>1</code>.
* Higher values may cause excessive over-fetching.
* </b>
* @return this {@link ConsumerBuilder}
*/
ConsumerBuilder synchronousControlFlow(int initialPrefetchLevel, int prefetchLevelAfterDelivery);
/**
* Configure prefetching parameters for asynchronous flow control.
*
* <p>
* Treat the parameters as an abstract scale at the {@link Consumer} level.
* </p>
*
* @param prefetchLevel The desired level of message pre-fetching.
* This may be implemented as the maximum chunks to have in processing or pending arrival
* per subscription at a given moment, but do not depend strongly on this aspect.
* @return this {@link ConsumerBuilder}
*/
ConsumerBuilder asynchronousControlFlow(int prefetchLevel);
/**
* Factory for the flow control strategy to be used when consuming messages.
* When there is no need to use a custom strategy, which is the majority of cases,
* prefer using {@link ConsumerBuilder#synchronousControlFlow} or {@link ConsumerBuilder#asynchronousControlFlow} instead.
*
* @param consumerFlowControlStrategyBuilderFactory the factory
* @return a fluent configurable builder for the flow control strategy
* @param <T> The type of the builder for the provided factory
*/
<T extends ConsumerFlowControlStrategyBuilder<S>, S extends ConsumerFlowControlStrategy>
T customFlowControlStrategy(ConsumerFlowControlStrategyBuilderFactory<S, T> consumerFlowControlStrategyBuilderFactory);
/**
* The logical name of the {@link Consumer}.
*
* <p>Set a logical name to enable offset tracking.
*
* @param name
* @return this builder instance
*/
ConsumerBuilder name(String name);
/**
* Declare the consumer as a single active consumer.
*
* <p>A single active consumer must set up a name with {@link #name(String)}.
*
* <p>Instances of the same application can declare several single active consumer instances with
* the same name and only one will be active at a time, meaning it will be the only one to get
* messages from the broker.
*
* <p>If the active consumer instance stops or crashes, the broker will choose a new active
* instance among the remaining ones.
*
* <p>This is an experimental API, subject to change.
*
* <p>RabbitMQ 3.11 or more is required.
*
* @return this builder instance
* @since 0.6.0
* @see #name(String)
*/
ConsumerBuilder singleActiveConsumer();
/**
* Set the listener for single active consumer updates.
*
* <p>This listener is usually set when manual offset tracking is used, either server-side or with
* an external datastore.
*
* <p>This is an experimental API, subject to change.
*
* <p>RabbitMQ 3.11 or more is required.
*
* @param consumerUpdateListener
* @return this builder instance
* @since 0.6.0
* @see #singleActiveConsumer()
* @see ConsumerUpdateListener
* @see #manualTrackingStrategy()
*/
ConsumerBuilder consumerUpdateListener(ConsumerUpdateListener consumerUpdateListener);
/**
* Callback on subscription.
*
* <p>Can be used to set the offset specification before subscribing to the stream.
*
* <p>This is an experimental API, subject to change.
*
* @see SubscriptionListener
* @param subscriptionListener the listener
* @return this builder instance
* @since 0.5.0
*/
ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener);
/**
* Enable {@link ManualTrackingStrategy}.
*
* @return the manual tracking strategy
*/
ManualTrackingStrategy manualTrackingStrategy();
/**
* Enable {@link AutoTrackingStrategy}.
*
* <p>This is the default tracking strategy.
*
* @return the auto-tracking strategy
*/
AutoTrackingStrategy autoTrackingStrategy();
/**
* Disable server-side offset tracking.
*
* <p>Useful when {@link #singleActiveConsumer()} is enabled and an external store is used for
* offset tracking. This avoids automatic server-side offset tracking to kick in.
*
* @return this builder instance
* @since 0.6.0
*/
ConsumerBuilder noTrackingStrategy();
/**
* Create the configured {@link Consumer}
*
* @return the configured consumer
*/
Consumer build();
/** Manual tracking strategy. */
interface ManualTrackingStrategy extends ConsumerBuilderAccessor {
/**
* Interval to check if the last requested stored offset has been actually stored.
*
* <p>Default is 5 seconds.
*
* @param checkInterval
* @return the manual tracking strategy
*/
ManualTrackingStrategy checkInterval(Duration checkInterval);
}
/** Auto-tracking strategy. */
interface AutoTrackingStrategy extends ConsumerBuilderAccessor {
/**
* Number of messages before storing.
*
* <p>Default is 10,000.
*
* @param messageCountBeforeStorage
* @return the auto-tracking strategy
*/
AutoTrackingStrategy messageCountBeforeStorage(int messageCountBeforeStorage);
/**
* Interval to check and stored the last received offset in case of inactivity.
*
* <p>Default is 5 seconds.
*
* @param flushInterval
* @return the auto-tracking strategy
*/
AutoTrackingStrategy flushInterval(Duration flushInterval);
}
interface ConsumerBuilderAccessor {
/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}
}