Skip to content

Commit c63e872

Browse files
committed
Add single active consumer API to Consumer
WIP. References rabbitmq/rabbitmq-server#3753 Conflicts: src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java Conflicts: src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java Conflicts: src/test/java/com/rabbitmq/stream/impl/SingleActiveConsumerTest.java
1 parent 00c0112 commit c63e872

15 files changed

+531
-83
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public interface ConsumerBuilder {
6464
*/
6565
ConsumerBuilder name(String name);
6666

67+
ConsumerBuilder singleActiveConsumer();
68+
69+
ConsumerBuilder consumerUpdateListener(ConsumerUpdateListener consumerUpdateListener);
70+
6771
/**
6872
* Callback on subscription.
6973
*
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
public interface ConsumerUpdateListener {
17+
18+
OffsetSpecification update(Context context);
19+
20+
interface Context {
21+
22+
Consumer consumer();
23+
24+
Status status();
25+
26+
Status previousStatus();
27+
}
28+
29+
enum Status {
30+
STARTING,
31+
ACTIVE,
32+
PASSIVE
33+
}
34+
}

src/main/java/com/rabbitmq/stream/SubscriptionListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public interface SubscriptionListener {
2626
/**
2727
* Callback called before the subscription is created.
2828
*
29-
* <p>The method is called when a {@link Consumer} is created and it registers to broker, and also
30-
* when the subscription must be re-created (after a disconnection or when the subscription must
31-
* moved because the stream member it was connection becomes unavailable).
29+
* <p>The method is called when a {@link Consumer} is created and it registers to the broker, and
30+
* also when the subscription must be re-created (after a disconnection or when the subscription
31+
* must moved because the stream member it was connected to becomes unavailable).
3232
*
3333
* <p>Application code can set the {@link OffsetSpecification} that will be used with the {@link
3434
* SubscriptionContext#offsetSpecification(OffsetSpecification)} method.

src/main/java/com/rabbitmq/stream/impl/AsyncRetry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ AsyncRetryBuilder<V> retry(Predicate<Exception> predicate) {
121121
return this;
122122
}
123123

124-
AsyncRetryBuilder<V> description(String description) {
125-
this.description = description;
124+
AsyncRetryBuilder<V> description(String description, Object... args) {
125+
this.description = String.format(description, args);
126126
return this;
127127
}
128128

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.impl.Utils.formatConstant;
17+
import static com.rabbitmq.stream.impl.Utils.isSac;
1718

1819
import com.rabbitmq.stream.BackOffDelayPolicy;
1920
import com.rabbitmq.stream.Constants;
@@ -26,6 +27,7 @@
2627
import com.rabbitmq.stream.SubscriptionListener;
2728
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
2829
import com.rabbitmq.stream.impl.Client.ChunkListener;
30+
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
2931
import com.rabbitmq.stream.impl.Client.CreditNotification;
3032
import com.rabbitmq.stream.impl.Client.MessageListener;
3133
import com.rabbitmq.stream.impl.Client.MetadataListener;
@@ -37,7 +39,6 @@
3739
import java.util.ArrayList;
3840
import java.util.Collection;
3941
import java.util.Collections;
40-
import java.util.HashMap;
4142
import java.util.List;
4243
import java.util.Map;
4344
import java.util.Map.Entry;
@@ -94,7 +95,8 @@ Runnable subscribe(
9495
OffsetSpecification offsetSpecification,
9596
String trackingReference,
9697
SubscriptionListener subscriptionListener,
97-
MessageHandler messageHandler) {
98+
MessageHandler messageHandler,
99+
Map<String, String> subscriptionProperties) {
98100
// FIXME fail immediately if there's no locator (can provide a supplier that does not retry)
99101
List<Client.Broker> candidates = findBrokersForStream(stream);
100102
Client.Broker newNode = pickBroker(candidates);
@@ -111,7 +113,8 @@ Runnable subscribe(
111113
offsetSpecification,
112114
trackingReference,
113115
subscriptionListener,
114-
messageHandler);
116+
messageHandler,
117+
subscriptionProperties);
115118

116119
String key = keyForClientSubscription(newNode);
117120

@@ -232,6 +235,7 @@ private static class SubscriptionTracker {
232235
private final MessageHandler messageHandler;
233236
private final StreamConsumer consumer;
234237
private final SubscriptionListener subscriptionListener;
238+
private final Map<String, String> subscriptionProperties;
235239
private volatile long offset;
236240
private volatile boolean hasReceivedSomething = false;
237241
private volatile byte subscriptionIdInClient;
@@ -243,13 +247,22 @@ private SubscriptionTracker(
243247
OffsetSpecification initialOffsetSpecification,
244248
String offsetTrackingReference,
245249
SubscriptionListener subscriptionListener,
246-
MessageHandler messageHandler) {
250+
MessageHandler messageHandler,
251+
Map<String, String> subscriptionProperties) {
247252
this.consumer = consumer;
248253
this.stream = stream;
249254
this.initialOffsetSpecification = initialOffsetSpecification;
250255
this.offsetTrackingReference = offsetTrackingReference;
251256
this.subscriptionListener = subscriptionListener;
252257
this.messageHandler = messageHandler;
258+
if (this.offsetTrackingReference == null) {
259+
this.subscriptionProperties = subscriptionProperties;
260+
} else {
261+
Map<String, String> properties = new ConcurrentHashMap<>(subscriptionProperties.size() + 1);
262+
properties.putAll(subscriptionProperties);
263+
properties.put("name", this.offsetTrackingReference);
264+
this.subscriptionProperties = Collections.unmodifiableMap(properties);
265+
}
253266
}
254267

255268
synchronized void cancel() {
@@ -546,6 +559,25 @@ private ClientSubscriptionsManager(
546559
});
547560
}
548561
};
562+
ConsumerUpdateListener consumerUpdateListener =
563+
(client, subscriptionId, active) -> {
564+
OffsetSpecification result = null;
565+
SubscriptionTracker subscriptionTracker =
566+
subscriptionTrackers.get(subscriptionId & 0xFF);
567+
if (subscriptionTracker != null) {
568+
if (isSac(subscriptionTracker.subscriptionProperties)) {
569+
result = subscriptionTracker.consumer.consumerUpdate(active);
570+
} else {
571+
LOGGER.debug(
572+
"Subscription {} is not a single active consumer, nothing to do.",
573+
subscriptionId);
574+
}
575+
} else {
576+
LOGGER.debug(
577+
"Could not find stream subscription {} for consumer update", subscriptionId);
578+
}
579+
return result;
580+
};
549581
ClientFactoryContext clientFactoryContext =
550582
ClientFactoryContext.fromParameters(
551583
clientParameters
@@ -556,7 +588,8 @@ private ClientSubscriptionsManager(
556588
.creditNotification(creditNotification)
557589
.messageListener(messageListener)
558590
.shutdownListener(shutdownListener)
559-
.metadataListener(metadataListener))
591+
.metadataListener(metadataListener)
592+
.consumerUpdateListener(consumerUpdateListener))
560593
.key(owner.name);
561594
this.client = clientFactory.client(clientFactoryContext);
562595
maybeExchangeCommandVersions(client);
@@ -664,10 +697,11 @@ void add(
664697
List<SubscriptionTracker> previousSubscriptions = this.subscriptionTrackers;
665698

666699
LOGGER.debug(
667-
"Subscribing to {}, requested offset specification is {}, offset tracking reference is {}",
700+
"Subscribing to {}, requested offset specification is {}, offset tracking reference is {}, properties are {}",
668701
subscriptionTracker.stream,
669702
offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification,
670-
subscriptionTracker.offsetTrackingReference);
703+
subscriptionTracker.offsetTrackingReference,
704+
subscriptionTracker.subscriptionProperties);
671705
try {
672706
// updating data structures before subscribing
673707
// (to make sure they are up-to-date in case message would arrive super fast)
@@ -706,12 +740,6 @@ void add(
706740
offsetSpecification =
707741
offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
708742

709-
Map<String, String> subscriptionProperties = Collections.emptyMap();
710-
if (subscriptionTracker.offsetTrackingReference != null) {
711-
subscriptionProperties = new HashMap<>(1);
712-
subscriptionProperties.put("name", subscriptionTracker.offsetTrackingReference);
713-
}
714-
715743
SubscriptionContext subscriptionContext =
716744
new DefaultSubscriptionContext(offsetSpecification);
717745
subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);
@@ -727,7 +755,7 @@ void add(
727755
subscriptionTracker.stream,
728756
subscriptionContext.offsetSpecification(),
729757
10,
730-
subscriptionProperties);
758+
subscriptionTracker.subscriptionProperties);
731759
if (!subscribeResponse.isOk()) {
732760
String message =
733761
"Subscription to stream "
@@ -745,7 +773,6 @@ void add(
745773
.remove(subscriptionTracker);
746774
throw e;
747775
}
748-
749776
LOGGER.debug("Subscribed to {}", subscriptionTracker.stream);
750777
}
751778
}

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ private interface Tracker {
144144

145145
void flushIfNecessary();
146146

147+
long flush();
148+
147149
StreamConsumer consumer();
148150

149151
LongConsumer trackingCallback();
@@ -170,6 +172,10 @@ LongConsumer trackingCallback() {
170172
Runnable closingCallback() {
171173
return this.tracker.closingCallback();
172174
}
175+
176+
long flush() {
177+
return this.tracker.flush();
178+
}
173179
}
174180

175181
private static final class AutoTrackingTracker implements Tracker {
@@ -190,6 +196,7 @@ private AutoTrackingTracker(
190196
this.clock = clock;
191197
}
192198

199+
@Override
193200
public Consumer<Context> postProcessingCallback() {
194201
return context -> {
195202
if (++count % messageCountBeforeStorage == 0) {
@@ -204,26 +211,37 @@ public Consumer<Context> postProcessingCallback() {
204211
};
205212
}
206213

214+
@Override
207215
public void flushIfNecessary() {
208216
if (this.count > 0) {
209217
if (this.clock.time() - this.lastTrackingActivity > this.flushIntervalInNs) {
210-
if (lastProcessedOffset != null) {
211-
try {
212-
long lastStoredOffset = consumer.lastStoredOffset();
213-
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
214-
this.consumer.store(this.lastProcessedOffset.get());
215-
}
216-
this.lastTrackingActivity = clock.time();
217-
} catch (StreamException e) {
218-
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
219-
this.consumer.store(this.lastProcessedOffset.get());
220-
this.lastTrackingActivity = clock.time();
221-
} else {
222-
throw e;
223-
}
224-
}
218+
this.flush();
219+
}
220+
}
221+
}
222+
223+
@Override
224+
public long flush() {
225+
if (lastProcessedOffset == null) {
226+
return 0;
227+
} else {
228+
long result;
229+
try {
230+
long lastStoredOffset = consumer.lastStoredOffset();
231+
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
232+
this.consumer.store(this.lastProcessedOffset.get());
233+
}
234+
result = lastProcessedOffset.get();
235+
} catch (StreamException e) {
236+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
237+
this.consumer.store(this.lastProcessedOffset.get());
238+
result = lastProcessedOffset.get();
239+
} else {
240+
throw e;
225241
}
226242
}
243+
this.lastTrackingActivity = clock.time();
244+
return result;
227245
}
228246
}
229247

@@ -301,6 +319,11 @@ public void flushIfNecessary() {
301319
}
302320
}
303321

322+
@Override
323+
public long flush() {
324+
throw new UnsupportedOperationException();
325+
}
326+
304327
@Override
305328
public StreamConsumer consumer() {
306329
return this.consumer;

0 commit comments

Comments
 (0)