Skip to content

Commit 09f3918

Browse files
committed
Support clean SAC change in super stream consumer
References rabbitmq/rabbitmq-server#3753 Conflicts: src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java src/main/java/com/rabbitmq/stream/impl/Utils.java Conflicts: src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java Conflicts: src/main/java/com/rabbitmq/stream/MessageHandler.java src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java src/main/java/com/rabbitmq/stream/impl/Utils.java src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java
1 parent 8543515 commit 09f3918

12 files changed

+447
-96
lines changed

src/main/java/com/rabbitmq/stream/MessageHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ interface Context {
7070
*/
7171
long committedChunkId();
7272

73+
/**
74+
* The stream the message comes from.
75+
*
76+
* @return the stream the message comes from
77+
*/
78+
String stream();
79+
7380
/**
7481
* The consumer that receives the message.
7582
*

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ Runnable subscribe(
9595
OffsetSpecification offsetSpecification,
9696
String trackingReference,
9797
SubscriptionListener subscriptionListener,
98+
Runnable trackingClosingCallback,
9899
MessageHandler messageHandler,
99100
Map<String, String> subscriptionProperties) {
100101
// FIXME fail immediately if there's no locator (can provide a supplier that does not retry)
@@ -113,6 +114,7 @@ Runnable subscribe(
113114
offsetSpecification,
114115
trackingReference,
115116
subscriptionListener,
117+
trackingClosingCallback,
116118
messageHandler,
117119
subscriptionProperties);
118120

@@ -235,6 +237,7 @@ private static class SubscriptionTracker {
235237
private final MessageHandler messageHandler;
236238
private final StreamConsumer consumer;
237239
private final SubscriptionListener subscriptionListener;
240+
private final Runnable trackingClosingCallback;
238241
private final Map<String, String> subscriptionProperties;
239242
private volatile long offset;
240243
private volatile boolean hasReceivedSomething = false;
@@ -247,13 +250,15 @@ private SubscriptionTracker(
247250
OffsetSpecification initialOffsetSpecification,
248251
String offsetTrackingReference,
249252
SubscriptionListener subscriptionListener,
253+
Runnable trackingClosingCallback,
250254
MessageHandler messageHandler,
251255
Map<String, String> subscriptionProperties) {
252256
this.consumer = consumer;
253257
this.stream = stream;
254258
this.initialOffsetSpecification = initialOffsetSpecification;
255259
this.offsetTrackingReference = offsetTrackingReference;
256260
this.subscriptionListener = subscriptionListener;
261+
this.trackingClosingCallback = trackingClosingCallback;
257262
this.messageHandler = messageHandler;
258263
if (this.offsetTrackingReference == null) {
259264
this.subscriptionProperties = subscriptionProperties;
@@ -267,6 +272,11 @@ private SubscriptionTracker(
267272
}
268273

269274
synchronized void cancel() {
275+
// the flow of messages in the user message handler should stop, we can call the tracking
276+
// closing callback
277+
// with automatic offset tracking, it will store the last dispatched offset
278+
LOGGER.debug("Calling tracking consumer closing callback (may be no-op)");
279+
this.trackingClosingCallback.run();
270280
if (this.manager != null) {
271281
LOGGER.debug("Removing consumer from manager " + this.consumer);
272282
this.manager.remove(this);
@@ -298,10 +308,10 @@ private static final class MessageHandlerContext implements Context {
298308
private final long offset;
299309
private final long timestamp;
300310
private final long committedOffset;
301-
private final Consumer consumer;
311+
private final StreamConsumer consumer;
302312

303313
private MessageHandlerContext(
304-
long offset, long timestamp, long committedOffset, Consumer consumer) {
314+
long offset, long timestamp, long committedOffset, StreamConsumer consumer) {
305315
this.offset = offset;
306316
this.timestamp = timestamp;
307317
this.committedOffset = committedOffset;
@@ -328,6 +338,10 @@ public long committedChunkId() {
328338
return committedOffset;
329339
}
330340

341+
public String stream() {
342+
return this.consumer.stream();
343+
}
344+
331345
@Override
332346
public Consumer consumer() {
333347
return this.consumer;
@@ -455,14 +469,14 @@ private ClientSubscriptionsManager(
455469
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
456470
SubscriptionTracker subscriptionTracker =
457471
subscriptionTrackers.get(subscriptionId & 0xFF);
458-
if (subscriptionTracker != null) {
472+
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
459473
subscriptionTracker.offset = offset;
460474
subscriptionTracker.hasReceivedSomething = true;
461475
subscriptionTracker.messageHandler.handle(
462476
new MessageHandlerContext(
463477
offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer),
464478
message);
465-
// FIXME set offset here as well, best effort to avoid duplicates
479+
// FIXME set offset here as well, best effort to avoid duplicates?
466480
} else {
467481
LOGGER.debug("Could not find stream subscription {}", subscriptionId);
468482
}
@@ -741,6 +755,8 @@ void add(
741755
offsetSpecification =
742756
offsetSpecification == null ? DEFAULT_OFFSET_SPECIFICATION : offsetSpecification;
743757

758+
// TODO consider using/emulating ConsumerUpdateListener, to have only one API, not 2
759+
// even when the consumer is not a SAC.
744760
SubscriptionContext subscriptionContext =
745761
new DefaultSubscriptionContext(offsetSpecification);
746762
subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext);

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
1717

1818
import com.rabbitmq.stream.Constants;
19+
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
1920
import com.rabbitmq.stream.MessageHandler.Context;
2021
import com.rabbitmq.stream.StreamException;
2122
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
@@ -258,20 +259,34 @@ public LongConsumer trackingCallback() {
258259
@Override
259260
public Runnable closingCallback() {
260261
return () -> {
261-
if (this.lastProcessedOffset == null) {
262-
LOGGER.debug("Not storing anything as nothing has been processed.");
262+
if (this.consumer.isSac() && this.consumer.sacStatus() != Status.ACTIVE) {
263+
LOGGER.debug("Not storing offset on closing because consumer is a non-active SAC");
263264
} else {
264-
try {
265-
long lastStoredOffset = consumer.lastStoredOffset();
266-
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
267-
LOGGER.debug("Storing {} offset before closing", this.lastProcessedOffset);
268-
this.consumer.store(this.lastProcessedOffset.get());
269-
}
270-
} catch (StreamException e) {
271-
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
272-
LOGGER.debug(
273-
"Nothing stored yet, storing {} offset before closing", this.lastProcessedOffset);
274-
this.consumer.store(this.lastProcessedOffset.get());
265+
if (this.lastProcessedOffset == null) {
266+
LOGGER.debug("Not storing anything as nothing has been processed.");
267+
} else {
268+
Runnable storageOperation =
269+
() -> {
270+
this.consumer.store(this.lastProcessedOffset.get());
271+
if (this.consumer.isSac()) {
272+
LOGGER.debug(
273+
"Consumer is SAC, making sure offset has been stored, in case another SAC takes over");
274+
this.consumer.waitForOffsetToBeStored(lastProcessedOffset.get());
275+
}
276+
};
277+
try {
278+
long lastStoredOffset = consumer.lastStoredOffset();
279+
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
280+
LOGGER.debug("Storing {} offset before closing", this.lastProcessedOffset);
281+
storageOperation.run();
282+
}
283+
} catch (StreamException e) {
284+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
285+
LOGGER.debug(
286+
"Nothing stored yet, storing {} offset before closing",
287+
this.lastProcessedOffset);
288+
storageOperation.run();
289+
}
275290
}
276291
}
277292
}

0 commit comments

Comments
 (0)