Skip to content

Commit 49056e6

Browse files
committed
Handle offset 0 in tracking
Fixes #146
1 parent 0a31b95 commit 49056e6

File tree

6 files changed

+173
-36
lines changed

6 files changed

+173
-36
lines changed

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

+40-30
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
17+
1618
import com.rabbitmq.stream.Constants;
1719
import com.rabbitmq.stream.MessageHandler.Context;
1820
import com.rabbitmq.stream.StreamException;
@@ -25,6 +27,7 @@
2527
import java.util.concurrent.ScheduledExecutorService;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicLong;
2831
import java.util.function.Consumer;
2932
import java.util.function.LongConsumer;
3033
import org.slf4j.Logger;
@@ -176,7 +179,7 @@ private static final class AutoTrackingTracker implements Tracker {
176179
private final long flushIntervalInNs;
177180
private final LocalClock clock;
178181
private volatile long count = 0;
179-
private volatile long lastProcessedOffset = 0;
182+
private volatile AtomicLong lastProcessedOffset = null;
180183
private volatile long lastTrackingActivity = 0;
181184

182185
private AutoTrackingTracker(
@@ -193,24 +196,31 @@ public Consumer<Context> postProcessingCallback() {
193196
context.storeOffset();
194197
lastTrackingActivity = clock.time();
195198
}
196-
lastProcessedOffset = context.offset();
199+
if (lastProcessedOffset == null) {
200+
lastProcessedOffset = new AtomicLong(context.offset());
201+
} else {
202+
lastProcessedOffset.set(context.offset());
203+
}
197204
};
198205
}
199206

200207
public void flushIfNecessary() {
201208
if (this.count > 0) {
202209
if (this.clock.time() - this.lastTrackingActivity > this.flushIntervalInNs) {
203-
try {
204-
long lastStoredOffset = consumer.lastStoredOffset();
205-
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
206-
this.consumer.store(this.lastProcessedOffset);
210+
if (lastProcessedOffset != null) {
211+
try {
212+
long lastStoredOffset = consumer.lastStoredOffset();
213+
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
214+
this.consumer.store(this.lastProcessedOffset.get());
215+
}
207216
this.lastTrackingActivity = clock.time();
208-
}
209-
} catch (StreamException e) {
210-
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
211-
// probably nothing stored yet, let it go
212-
} else {
213-
throw e;
217+
} catch (StreamException e) {
218+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
219+
this.consumer.store(this.lastProcessedOffset.get());
220+
this.lastTrackingActivity = clock.time();
221+
} else {
222+
throw e;
223+
}
214224
}
215225
}
216226
}
@@ -230,22 +240,21 @@ public LongConsumer trackingCallback() {
230240
@Override
231241
public Runnable closingCallback() {
232242
return () -> {
233-
try {
234-
long lastStoredOffset = consumer.lastStoredOffset();
235-
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
236-
LOGGER.debug("Storing offset before closing");
237-
this.consumer.store(this.lastProcessedOffset);
238-
} else {
239-
LOGGER.debug(
240-
"Not storing offset before closing because last stored offset after last processed offset: {} > {}",
241-
lastStoredOffset,
242-
lastProcessedOffset);
243-
}
244-
} catch (StreamException e) {
245-
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
246-
// probably nothing stored yet, let it go
247-
} else {
248-
throw e;
243+
if (this.lastProcessedOffset == null) {
244+
LOGGER.debug("Not storing anything as nothing has been processed.");
245+
} else {
246+
try {
247+
long lastStoredOffset = consumer.lastStoredOffset();
248+
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
249+
LOGGER.debug("Storing {} offset before closing", this.lastProcessedOffset);
250+
this.consumer.store(this.lastProcessedOffset.get());
251+
}
252+
} catch (StreamException e) {
253+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
254+
LOGGER.debug(
255+
"Nothing stored yet, storing {} offset before closing", this.lastProcessedOffset);
256+
this.consumer.store(this.lastProcessedOffset.get());
257+
}
249258
}
250259
}
251260
};
@@ -277,13 +286,14 @@ public void flushIfNecessary() {
277286
if (this.clock.time() - this.lastTrackingActivity > this.checkIntervalInNs) {
278287
try {
279288
long lastStoredOffset = consumer.lastStoredOffset();
280-
if (Long.compareUnsigned(lastStoredOffset, lastRequestedOffset) < 0) {
289+
if (offsetBefore(lastStoredOffset, lastRequestedOffset)) {
281290
this.consumer.store(this.lastRequestedOffset);
282291
this.lastTrackingActivity = clock.time();
283292
}
284293
} catch (StreamException e) {
285294
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
286-
// probably nothing stored yet, let it go
295+
this.consumer.store(this.lastRequestedOffset);
296+
this.lastTrackingActivity = clock.time();
287297
} else {
288298
throw e;
289299
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
17+
1618
import com.rabbitmq.stream.Consumer;
1719
import com.rabbitmq.stream.MessageHandler;
1820
import com.rabbitmq.stream.MessageHandler.Context;
@@ -47,6 +49,7 @@ class StreamConsumer implements Consumer {
4749
private volatile Client subscriptionClient;
4850
private volatile Status status;
4951
private volatile long lastRequestedStoredOffset = 0;
52+
private final AtomicBoolean nothingStoredYet = new AtomicBoolean(true);
5053

5154
StreamConsumer(
5255
String stream,
@@ -130,7 +133,8 @@ void start() {
130133
public void store(long offset) {
131134
trackingCallback.accept(offset);
132135
if (canTrack()) {
133-
if (Long.compareUnsigned(this.lastRequestedStoredOffset, offset) < 0) {
136+
if (offsetBefore(this.lastRequestedStoredOffset, offset)
137+
|| nothingStoredYet.compareAndSet(true, false)) {
134138
try {
135139
this.trackingClient.storeOffset(this.name, this.stream, offset);
136140
this.lastRequestedStoredOffset = offset;

src/main/java/com/rabbitmq/stream/impl/Utils.java

+4
Original file line numberDiff line numberDiff line change
@@ -236,4 +236,8 @@ static Function<ClientConnectionType, String> defaultConnectionNamingStrategy(St
236236
return clientConnectionType ->
237237
prefixes.get(clientConnectionType) + sequences.get(clientConnectionType).getAndIncrement();
238238
}
239+
240+
static boolean offsetBefore(long x, long y) {
241+
return Long.compareUnsigned(x, y) < 0;
242+
}
239243
}

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+96-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import com.rabbitmq.stream.BackOffDelayPolicy;
2626
import com.rabbitmq.stream.ConfirmationHandler;
27+
import com.rabbitmq.stream.Constants;
2728
import com.rabbitmq.stream.Consumer;
2829
import com.rabbitmq.stream.ConsumerBuilder;
2930
import com.rabbitmq.stream.Environment;
@@ -32,6 +33,7 @@
3233
import com.rabbitmq.stream.OffsetSpecification;
3334
import com.rabbitmq.stream.Producer;
3435
import com.rabbitmq.stream.StreamDoesNotExistException;
36+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3537
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerInfo;
3638
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
3739
import io.netty.channel.EventLoopGroup;
@@ -528,6 +530,33 @@ void autoTrackingShouldStorePeriodicallyAndAfterInactivity() throws Exception {
528530
5, () -> client.queryOffset(reference, stream).getOffset() == lastReceivedOffset.get());
529531
}
530532

533+
@Test
534+
void autoTrackingShouldStoreOffsetZeroAfterInactivity() throws Exception {
535+
String reference = "ref-1";
536+
AtomicLong lastReceivedOffset = new AtomicLong(-1);
537+
environment.consumerBuilder().name(reference).stream(stream)
538+
.offset(OffsetSpecification.first())
539+
.messageHandler((context, message) -> lastReceivedOffset.set(context.offset()))
540+
.autoTrackingStrategy()
541+
.flushInterval(Duration.ofSeconds(1).plusMillis(100))
542+
.builder()
543+
.build();
544+
545+
Producer producer = environment.producerBuilder().stream(stream).build();
546+
producer.send(
547+
producer.messageBuilder().addData("".getBytes()).build(), confirmationStatus -> {});
548+
549+
waitAtMost(() -> lastReceivedOffset.get() == 0);
550+
551+
Client client = cf.get();
552+
waitAtMost(
553+
5,
554+
() -> {
555+
QueryOffsetResponse response = client.queryOffset(reference, stream);
556+
return response.isOk() && response.getOffset() == lastReceivedOffset.get();
557+
});
558+
}
559+
531560
@Test
532561
void autoTrackingShouldStoreAfterClosing() throws Exception {
533562
int storeEvery = 10_000;
@@ -563,7 +592,45 @@ void autoTrackingShouldStoreAfterClosing() throws Exception {
563592
Client client = cf.get();
564593
waitAtMost(
565594
5,
566-
() -> client.queryOffset(reference, stream).getOffset() == lastReceivedOffset.get(),
595+
() -> {
596+
QueryOffsetResponse response = client.queryOffset(reference, stream);
597+
return response.isOk() && response.getOffset() == lastReceivedOffset.get();
598+
},
599+
() ->
600+
format(
601+
"Expecting stored offset %d to be equal to last received offset %d",
602+
client.queryOffset(reference, stream).getOffset(), lastReceivedOffset.get()));
603+
}
604+
605+
@Test
606+
void autoTrackingShouldStoreOffsetZeroOnClosing() throws Exception {
607+
String reference = "ref-1";
608+
AtomicLong lastReceivedOffset = new AtomicLong(-1);
609+
Consumer consumer =
610+
environment.consumerBuilder().name(reference).stream(stream)
611+
.offset(OffsetSpecification.first())
612+
.messageHandler(
613+
(context, message) -> {
614+
lastReceivedOffset.set(context.offset());
615+
})
616+
.autoTrackingStrategy()
617+
.flushInterval(Duration.ofHours(1)) // long flush interval
618+
.builder()
619+
.build();
620+
621+
Producer producer = environment.producerBuilder().stream(stream).build();
622+
producer.send(
623+
producer.messageBuilder().addData("".getBytes()).build(), confirmationStatus -> {});
624+
waitAtMost(() -> lastReceivedOffset.get() == 0);
625+
consumer.close();
626+
Client client = cf.get();
627+
waitAtMost(
628+
5,
629+
() -> {
630+
QueryOffsetResponse response = client.queryOffset(reference, stream);
631+
System.out.println(response.isOk());
632+
return response.isOk() && response.getOffset() == lastReceivedOffset.get();
633+
},
567634
() ->
568635
format(
569636
"Expecting stored offset %d to be equal to last received offset %d",
@@ -733,4 +800,32 @@ void useSubscriptionListenerToRestartExactlyWhereDesired() throws Exception {
733800
// subscription listener
734801
assertThat(receivedMessages).hasValue(publishedMessages.get() + 1);
735802
}
803+
804+
@Test
805+
void offsetZeroShouldBeStored() throws Exception {
806+
String ref = "ref-1";
807+
Consumer consumer =
808+
environment.consumerBuilder().stream(stream)
809+
.name(ref)
810+
.offset(OffsetSpecification.first())
811+
.messageHandler((context, message) -> {})
812+
.manualTrackingStrategy()
813+
.checkInterval(Duration.ZERO)
814+
.builder()
815+
.build();
816+
Client client = cf.get();
817+
assertThat(client.queryOffset(ref, stream).getResponseCode())
818+
.isEqualTo(Constants.RESPONSE_CODE_NO_OFFSET);
819+
consumer.store(0);
820+
waitAtMost(
821+
5,
822+
() -> {
823+
QueryOffsetResponse response = client.queryOffset(ref, stream);
824+
return response.isOk() && response.getOffset() == 0;
825+
},
826+
() ->
827+
format(
828+
"Expecting stored offset %d to be equal to last received offset %d",
829+
client.queryOffset(ref, stream).getOffset(), 0));
830+
}
736831
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -119,17 +119,29 @@ public static Duration waitAtMost(
119119
int waitTime = 100;
120120
int waitedTime = 0;
121121
int timeoutInMs = timeoutInSeconds * 1000;
122+
Exception exception = null;
122123
while (waitedTime <= timeoutInMs) {
123124
Thread.sleep(waitTime);
124125
waitedTime += waitTime;
125-
if (condition.getAsBoolean()) {
126-
return Duration.ofMillis(waitedTime);
126+
try {
127+
if (condition.getAsBoolean()) {
128+
return Duration.ofMillis(waitedTime);
129+
}
130+
exception = null;
131+
} catch (Exception e) {
132+
exception = e;
127133
}
128134
}
135+
String msg;
129136
if (message == null) {
130-
fail("Waited " + timeoutInSeconds + " second(s), condition never got true");
137+
msg = "Waited " + timeoutInSeconds + " second(s), condition never got true";
138+
} else {
139+
msg = "Waited " + timeoutInSeconds + " second(s), " + message.get();
140+
}
141+
if (exception == null) {
142+
fail(msg);
131143
} else {
132-
fail("Waited " + timeoutInSeconds + " second(s), " + message.get());
144+
fail(msg, exception);
133145
}
134146
return Duration.ofMillis(waitedTime);
135147
}

src/test/java/com/rabbitmq/stream/impl/UtilsTest.java

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST;
1919
import static com.rabbitmq.stream.impl.Utils.defaultConnectionNamingStrategy;
2020
import static com.rabbitmq.stream.impl.Utils.formatConstant;
21+
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
2122
import static org.assertj.core.api.Assertions.assertThat;
2223
import static org.mockito.ArgumentMatchers.any;
2324
import static org.mockito.Mockito.mock;
@@ -90,4 +91,15 @@ void defaultConnectionNamingStrategyShouldIncrement() {
9091
});
9192
}
9293
}
94+
95+
@Test
96+
void testOffsetBefore() {
97+
assertThat(offsetBefore(1, 2)).isTrue();
98+
assertThat(offsetBefore(2, 2)).isFalse();
99+
assertThat(offsetBefore(Long.MAX_VALUE - 1, Long.MAX_VALUE)).isTrue();
100+
assertThat(offsetBefore(Long.MAX_VALUE, Long.MAX_VALUE)).isFalse();
101+
assertThat(offsetBefore(Long.MAX_VALUE, Long.MAX_VALUE + 1)).isTrue();
102+
assertThat(offsetBefore(Long.MAX_VALUE + 10, Long.MAX_VALUE + 10)).isFalse();
103+
assertThat(offsetBefore(Long.MAX_VALUE + 10, Long.MAX_VALUE + 20)).isTrue();
104+
}
93105
}

0 commit comments

Comments
 (0)