Skip to content

Commit 7649045

Browse files
committed
Add single active consumer API to Consumer
WIP. References rabbitmq/rabbitmq-server#3753
1 parent ac13ef0 commit 7649045

14 files changed

+515
-80
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public interface ConsumerBuilder {
6464
*/
6565
ConsumerBuilder name(String name);
6666

67+
ConsumerBuilder singleActiveConsumer();
68+
69+
ConsumerBuilder consumerUpdateListener(ConsumerUpdateListener consumerUpdateListener);
70+
6771
/**
6872
* Callback on subscription.
6973
*
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
public interface ConsumerUpdateListener {
17+
18+
OffsetSpecification update(Context context);
19+
20+
interface Context {
21+
22+
Consumer consumer();
23+
24+
Status status();
25+
26+
Status previousStatus();
27+
}
28+
29+
enum Status {
30+
STARTING,
31+
ACTIVE,
32+
PASSIVE
33+
}
34+
}

src/main/java/com/rabbitmq/stream/SubscriptionListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ public interface SubscriptionListener {
2424
/**
2525
* Callback called before the subscription is created.
2626
*
27-
* <p>The method is called when a {@link Consumer} is created and it registers to broker, and also
28-
* when the subscription must be re-created (after a disconnection or when the subscription must
29-
* moved because the stream member it was connection becomes unavailable).
27+
* <p>The method is called when a {@link Consumer} is created and it registers to the broker, and
28+
* also when the subscription must be re-created (after a disconnection or when the subscription
29+
* must moved because the stream member it was connected to becomes unavailable).
3030
*
3131
* <p>Application code can set the {@link OffsetSpecification} that will be used with the {@link
3232
* SubscriptionContext#offsetSpecification(OffsetSpecification)} method.

src/main/java/com/rabbitmq/stream/impl/AsyncRetry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ AsyncRetryBuilder<V> retry(Predicate<Exception> predicate) {
121121
return this;
122122
}
123123

124-
AsyncRetryBuilder<V> description(String description) {
125-
this.description = description;
124+
AsyncRetryBuilder<V> description(String description, Object... args) {
125+
this.description = String.format(description, args);
126126
return this;
127127
}
128128

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.impl.Utils.formatConstant;
17+
import static com.rabbitmq.stream.impl.Utils.isSac;
1718

1819
import com.rabbitmq.stream.BackOffDelayPolicy;
1920
import com.rabbitmq.stream.Constants;
@@ -26,6 +27,7 @@
2627
import com.rabbitmq.stream.SubscriptionListener;
2728
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
2829
import com.rabbitmq.stream.impl.Client.ChunkListener;
30+
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
2931
import com.rabbitmq.stream.impl.Client.CreditNotification;
3032
import com.rabbitmq.stream.impl.Client.MessageListener;
3133
import com.rabbitmq.stream.impl.Client.MetadataListener;
@@ -35,7 +37,6 @@
3537
import java.util.ArrayList;
3638
import java.util.Collection;
3739
import java.util.Collections;
38-
import java.util.HashMap;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.Map.Entry;
@@ -86,7 +87,8 @@ Runnable subscribe(
8687
OffsetSpecification offsetSpecification,
8788
String trackingReference,
8889
SubscriptionListener subscriptionListener,
89-
MessageHandler messageHandler) {
90+
MessageHandler messageHandler,
91+
Map<String, String> subscriptionProperties) {
9092
// FIXME fail immediately if there's no locator (can provide a supplier that does not retry)
9193
List<Client.Broker> candidates = findBrokersForStream(stream);
9294
Client.Broker newNode = pickBroker(candidates);
@@ -103,7 +105,8 @@ Runnable subscribe(
103105
offsetSpecification,
104106
trackingReference,
105107
subscriptionListener,
106-
messageHandler);
108+
messageHandler,
109+
subscriptionProperties);
107110

108111
String key = keyForClientSubscription(newNode);
109112

@@ -221,6 +224,7 @@ private static class SubscriptionTracker {
221224
private final MessageHandler messageHandler;
222225
private final StreamConsumer consumer;
223226
private final SubscriptionListener subscriptionListener;
227+
private final Map<String, String> subscriptionProperties;
224228
private volatile long offset;
225229
private volatile boolean hasReceivedSomething = false;
226230
private volatile byte subscriptionIdInClient;
@@ -233,13 +237,22 @@ private SubscriptionTracker(
233237
OffsetSpecification initialOffsetSpecification,
234238
String offsetTrackingReference,
235239
SubscriptionListener subscriptionListener,
236-
MessageHandler messageHandler) {
240+
MessageHandler messageHandler,
241+
Map<String, String> subscriptionProperties) {
237242
this.consumer = consumer;
238243
this.stream = stream;
239244
this.initialOffsetSpecification = initialOffsetSpecification;
240245
this.offsetTrackingReference = offsetTrackingReference;
241246
this.subscriptionListener = subscriptionListener;
242247
this.messageHandler = messageHandler;
248+
if (this.offsetTrackingReference == null) {
249+
this.subscriptionProperties = subscriptionProperties;
250+
} else {
251+
Map<String, String> properties = new ConcurrentHashMap<>(subscriptionProperties.size() + 1);
252+
properties.putAll(subscriptionProperties);
253+
properties.put("name", this.offsetTrackingReference);
254+
this.subscriptionProperties = Collections.unmodifiableMap(properties);
255+
}
243256
}
244257

245258
synchronized void cancel() {
@@ -518,6 +531,25 @@ private ClientSubscriptionsManager(
518531
});
519532
}
520533
};
534+
ConsumerUpdateListener consumerUpdateListener =
535+
(client, subscriptionId, active) -> {
536+
OffsetSpecification result = null;
537+
SubscriptionTracker subscriptionTracker =
538+
subscriptionTrackers.get(subscriptionId & 0xFF);
539+
if (subscriptionTracker != null) {
540+
if (isSac(subscriptionTracker.subscriptionProperties)) {
541+
result = subscriptionTracker.consumer.consumerUpdate(active);
542+
} else {
543+
LOGGER.debug(
544+
"Subscription {} is not a single active consumer, nothing to do.",
545+
subscriptionId);
546+
}
547+
} else {
548+
LOGGER.debug(
549+
"Could not find stream subscription {} for consumer update", subscriptionId);
550+
}
551+
return result;
552+
};
521553
ClientFactoryContext clientFactoryContext =
522554
ClientFactoryContext.fromParameters(
523555
clientParameters
@@ -526,7 +558,8 @@ private ClientSubscriptionsManager(
526558
.creditNotification(creditNotification)
527559
.messageListener(messageListener)
528560
.shutdownListener(shutdownListener)
529-
.metadataListener(metadataListener))
561+
.metadataListener(metadataListener)
562+
.consumerUpdateListener(consumerUpdateListener))
530563
.key(owner.name);
531564
this.client = clientFactory.client(clientFactoryContext);
532565
clientInitializedInManager.set(true);
@@ -631,10 +664,11 @@ synchronized void add(
631664
List<SubscriptionTracker> previousSubscriptions = this.subscriptionTrackers;
632665

633666
LOGGER.debug(
634-
"Subscribing to {}, requested offset specification is {}, offset tracking reference is {}",
667+
"Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}",
635668
subscriptionTracker.stream,
636669
offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification,
637-
subscriptionTracker.offsetTrackingReference);
670+
subscriptionTracker.offsetTrackingReference,
671+
subscriptionTracker.subscriptionProperties);
638672
try {
639673
// updating data structures before subscribing
640674
// (to make sure they are up-to-date in case message would arrive super fast)
@@ -671,12 +705,6 @@ synchronized void add(
671705
offsetSpecification =
672706
offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
673707

674-
Map<String, String> subscriptionProperties = Collections.emptyMap();
675-
if (subscriptionTracker.offsetTrackingReference != null) {
676-
subscriptionProperties = new HashMap<>(1);
677-
subscriptionProperties.put("name", subscriptionTracker.offsetTrackingReference);
678-
}
679-
680708
SubscriptionContext subscriptionContext =
681709
new DefaultSubscriptionContext(offsetSpecification);
682710
subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
@@ -692,7 +720,7 @@ synchronized void add(
692720
subscriptionTracker.stream,
693721
subscriptionContext.offsetSpecification(),
694722
10,
695-
subscriptionProperties);
723+
subscriptionTracker.subscriptionProperties);
696724
if (!subscribeResponse.isOk()) {
697725
String message =
698726
"Subscription to stream "

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ private interface Tracker {
135135

136136
void flushIfNecessary();
137137

138+
long flush();
139+
138140
StreamConsumer consumer();
139141

140142
LongConsumer trackingCallback();
@@ -161,6 +163,10 @@ LongConsumer trackingCallback() {
161163
Runnable closingCallback() {
162164
return this.tracker.closingCallback();
163165
}
166+
167+
long flush() {
168+
return this.tracker.flush();
169+
}
164170
}
165171

166172
private static final class AutoTrackingTracker implements Tracker {
@@ -181,6 +187,7 @@ private AutoTrackingTracker(
181187
this.clock = clock;
182188
}
183189

190+
@Override
184191
public Consumer<Context> postProcessingCallback() {
185192
return context -> {
186193
if (++count % messageCountBeforeStorage == 0) {
@@ -191,18 +198,25 @@ public Consumer<Context> postProcessingCallback() {
191198
};
192199
}
193200

201+
@Override
194202
public void flushIfNecessary() {
195203
if (this.count > 0) {
196204
if (this.clock.time() - this.lastTrackingActivity > this.flushIntervalInNs) {
197-
long lastStoredOffset = consumer.lastStoredOffset();
198-
if (lastStoredOffset < lastProcessedOffset) {
199-
this.consumer.store(this.lastProcessedOffset);
200-
this.lastTrackingActivity = clock.time();
201-
}
205+
this.flush();
202206
}
203207
}
204208
}
205209

210+
@Override
211+
public long flush() {
212+
long lastStoredOffset = consumer.lastStoredOffset();
213+
if (lastStoredOffset < lastProcessedOffset) {
214+
this.consumer.store(this.lastProcessedOffset);
215+
this.lastTrackingActivity = clock.time();
216+
}
217+
return lastStoredOffset;
218+
}
219+
206220
@Override
207221
public StreamConsumer consumer() {
208222
return this.consumer;
@@ -261,6 +275,11 @@ public void flushIfNecessary() {
261275
}
262276
}
263277

278+
@Override
279+
public long flush() {
280+
throw new UnsupportedOperationException();
281+
}
282+
264283
@Override
265284
public StreamConsumer consumer() {
266285
return this.consumer;

0 commit comments

Comments
 (0)