Skip to content

Commit 181f412

Browse files
committed
Store offset on closing with automatic offset tracking
Fixes #40
1 parent 20e2bfc commit 181f412

File tree

4 files changed

+135
-13
lines changed

4 files changed

+135
-13
lines changed

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

+39-10
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,13 @@
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626
import java.util.function.Consumer;
2727
import java.util.function.LongConsumer;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830

2931
class OffsetTrackingCoordinator {
3032

33+
private static final Logger LOGGER = LoggerFactory.getLogger(OffsetTrackingCoordinator.class);
34+
3135
private final StreamEnvironment streamEnvironment;
3236

3337
private final AtomicBoolean started = new AtomicBoolean(false);
@@ -106,7 +110,7 @@ Registration registerTrackingConsumer(
106110
TimeUnit.MILLISECONDS);
107111
}
108112

109-
return new Registration(tracker.postProcessingCallback(), tracker.trackingCallback());
113+
return new Registration(tracker);
110114
}
111115

112116
private ScheduledExecutorService executor() {
@@ -134,24 +138,28 @@ private interface Tracker {
134138
StreamConsumer consumer();
135139

136140
LongConsumer trackingCallback();
141+
142+
Runnable closingCallback();
137143
}
138144

139145
static class Registration {
140146

141-
private final java.util.function.Consumer<Context> postMessageProcessingCallback;
142-
private final LongConsumer trackingCallback;
147+
private final Tracker tracker;
143148

144-
Registration(Consumer<Context> postMessageProcessingCallback, LongConsumer trackingCallback) {
145-
this.postMessageProcessingCallback = postMessageProcessingCallback;
146-
this.trackingCallback = trackingCallback;
149+
Registration(Tracker tracker) {
150+
this.tracker = tracker;
147151
}
148152

149-
public Consumer<Context> postMessageProcessingCallback() {
150-
return postMessageProcessingCallback;
153+
Consumer<Context> postMessageProcessingCallback() {
154+
return this.tracker.postProcessingCallback();
151155
}
152156

153-
public LongConsumer trackingCallback() {
154-
return trackingCallback;
157+
LongConsumer trackingCallback() {
158+
return this.tracker.trackingCallback();
159+
}
160+
161+
Runnable closingCallback() {
162+
return this.tracker.closingCallback();
155163
}
156164
}
157165

@@ -204,6 +212,22 @@ public StreamConsumer consumer() {
204212
public LongConsumer trackingCallback() {
205213
return Utils.NO_OP_LONG_CONSUMER;
206214
}
215+
216+
@Override
217+
public Runnable closingCallback() {
218+
return () -> {
219+
long lastStoredOffset = consumer.lastStoredOffset();
220+
if (lastStoredOffset < lastProcessedOffset) {
221+
LOGGER.debug("Storing offset before closing");
222+
this.consumer.store(this.lastProcessedOffset);
223+
} else {
224+
LOGGER.debug(
225+
"Not storing offset before closing because last stored offset after last processed offset: {} > {}",
226+
lastStoredOffset,
227+
lastProcessedOffset);
228+
}
229+
};
230+
}
207231
}
208232

209233
private static final class ManualTrackingTracker implements Tracker {
@@ -249,6 +273,11 @@ public LongConsumer trackingCallback() {
249273
lastTrackingActivity = clock.time();
250274
};
251275
}
276+
277+
@Override
278+
public Runnable closingCallback() {
279+
return () -> {};
280+
}
252281
}
253282

254283
private static class LocalClock {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -551,14 +551,33 @@ Client.ClientParameters clientParametersCopy() {
551551
TrackingConsumerRegistration registerTrackingConsumer(
552552
StreamConsumer streamConsumer, TrackingConfiguration configuration) {
553553
Runnable closingCallable = this.producersCoordinator.registerTrackingConsumer(streamConsumer);
554-
Registration offsetTrackingRegistration = null;
554+
Registration offsetTrackingRegistration;
555555
if (this.offsetTrackingCoordinator.needTrackingRegistration(configuration)) {
556556
offsetTrackingRegistration =
557557
this.offsetTrackingCoordinator.registerTrackingConsumer(streamConsumer, configuration);
558+
} else {
559+
offsetTrackingRegistration = null;
560+
}
561+
562+
Runnable closingSequence;
563+
if (offsetTrackingRegistration == null) {
564+
closingSequence = closingCallable;
565+
} else {
566+
closingSequence =
567+
() -> {
568+
try {
569+
LOGGER.debug("Executing offset tracking registration closing sequence");
570+
offsetTrackingRegistration.closingCallback().run();
571+
LOGGER.debug("Offset tracking registration closing sequence executed");
572+
} catch (Exception e) {
573+
LOGGER.warn("Error while executing offset tracking registration closing sequence");
574+
}
575+
closingCallable.run();
576+
};
558577
}
559578

560579
return new TrackingConsumerRegistration(
561-
closingCallable,
580+
closingSequence,
562581
offsetTrackingRegistration == null
563582
? null
564583
: offsetTrackingRegistration.postMessageProcessingCallback(),

src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import com.rabbitmq.stream.MessageHandler.Context;
22+
import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration;
2223
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
2324
import java.time.Duration;
2425
import java.util.concurrent.CountDownLatch;
@@ -123,6 +124,42 @@ void autoShouldStoreAfterSomeInactivity() {
123124
assertThat(latchAssert(flushLatch)).completes(5);
124125
}
125126

127+
@Test
128+
void autoShouldStoreLastProcessedOffsetOnClosing() {
129+
Duration checkInterval = Duration.ofMillis(100);
130+
OffsetTrackingCoordinator coordinator = new OffsetTrackingCoordinator(env, checkInterval);
131+
when(consumer.lastStoredOffset()).thenReturn(5L);
132+
133+
Registration registration =
134+
coordinator.registerTrackingConsumer(
135+
consumer, new TrackingConfiguration(true, true, 1, Duration.ofHours(1), Duration.ZERO));
136+
137+
long lastProcessedOffset = 10;
138+
registration.postMessageProcessingCallback().accept(context(lastProcessedOffset, () -> {}));
139+
140+
registration.closingCallback().run();
141+
142+
verify(consumer, times(1)).store(lastProcessedOffset);
143+
}
144+
145+
@Test
146+
void autoShouldNotStoreLastProcessedOffsetOnClosingIfBehindStoredOffset() {
147+
Duration checkInterval = Duration.ofMillis(100);
148+
OffsetTrackingCoordinator coordinator = new OffsetTrackingCoordinator(env, checkInterval);
149+
when(consumer.lastStoredOffset()).thenReturn(15L);
150+
151+
Registration registration =
152+
coordinator.registerTrackingConsumer(
153+
consumer, new TrackingConfiguration(true, true, 1, Duration.ofHours(1), Duration.ZERO));
154+
155+
long lastProcessedOffset = 10;
156+
registration.postMessageProcessingCallback().accept(context(lastProcessedOffset, () -> {}));
157+
158+
registration.closingCallback().run();
159+
160+
verify(consumer, never()).store(anyLong());
161+
}
162+
126163
@Test
127164
void autoShouldStoreFixedMessageCountAndAutoTrackingAfterInactivity() {
128165
Duration checkInterval = Duration.ofMillis(100);

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+38-1
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,42 @@ void autoTrackingShouldStorePeriodicallyAndAfterInactivity() throws Exception {
517517
waitAtMost(5, () -> client.queryOffset(reference, stream) == lastReceivedOffset.get());
518518
}
519519

520+
@Test
521+
void autoTrackingShouldStoreAfterClosing() throws Exception {
522+
int storeEvery = 10_000;
523+
int messageCount = storeEvery * 5 - 100;
524+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
525+
String reference = "ref-1";
526+
AtomicLong lastReceivedOffset = new AtomicLong(0);
527+
Consumer consumer =
528+
environment.consumerBuilder().name(reference).stream(stream)
529+
.offset(OffsetSpecification.first())
530+
.messageHandler(
531+
(context, message) -> {
532+
lastReceivedOffset.set(context.offset());
533+
consumeLatch.countDown();
534+
})
535+
.autoTrackingStrategy()
536+
.flushInterval(Duration.ofHours(1)) // long flush interval
537+
.messageCountBeforeStorage(storeEvery)
538+
.builder()
539+
.build();
540+
541+
Producer producer = environment.producerBuilder().stream(stream).build();
542+
IntStream.range(0, messageCount)
543+
.forEach(
544+
i ->
545+
producer.send(
546+
producer.messageBuilder().addData("".getBytes()).build(),
547+
confirmationStatus -> {}));
548+
549+
latchAssert(consumeLatch).completes();
550+
consumer.close();
551+
552+
Client client = cf.get();
553+
waitAtMost(5, () -> client.queryOffset(reference, stream) == lastReceivedOffset.get());
554+
}
555+
520556
@Test
521557
@DisabledIfRabbitMqCtlNotSet
522558
void externalOffsetTrackingWithSubscriptionListener() throws Exception {
@@ -560,7 +596,8 @@ void externalOffsetTrackingWithSubscriptionListener() throws Exception {
560596
assertThat(offsetTracking.get()).isGreaterThanOrEqualTo(messageCount - 1);
561597

562598
Host.killConnection("rabbitmq-stream-consumer");
563-
waitAtMost(recoveryInitialDelay.multipliedBy(2), () -> subscriptionListenerCallCount.get() == 2);
599+
waitAtMost(
600+
recoveryInitialDelay.multipliedBy(2), () -> subscriptionListenerCallCount.get() == 2);
564601

565602
publish.run();
566603
waitAtMost(5, () -> receivedMessages.get() == messageCount * 2);

0 commit comments

Comments
 (0)