Skip to content

Commit b1861f6

Browse files
committed
Add SAC test with manual offset tracking
Requires to add Consumer#storedOffset() to let the user make sure the stored offset request made it to the broker before closing the consumer. References #46, rabbitmq/rabbitmq-server#3753 Conflicts: src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java
1 parent a0bafba commit b1861f6

File tree

6 files changed

+89
-15
lines changed

6 files changed

+89
-15
lines changed

src/main/java/com/rabbitmq/stream/Consumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ public interface Consumer extends AutoCloseable {
3333
/** Close the consumer. */
3434
@Override
3535
void close();
36+
37+
long storedOffset();
3638
}

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public long flush() {
228228
} else {
229229
long result;
230230
try {
231-
long lastStoredOffset = consumer.lastStoredOffset();
231+
long lastStoredOffset = consumer.storedOffset();
232232
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
233233
this.consumer.store(this.lastProcessedOffset.get());
234234
}
@@ -275,7 +275,7 @@ public Runnable closingCallback() {
275275
}
276276
};
277277
try {
278-
long lastStoredOffset = consumer.lastStoredOffset();
278+
long lastStoredOffset = consumer.storedOffset();
279279
if (offsetBefore(lastStoredOffset, lastProcessedOffset.get())) {
280280
LOGGER.debug("Storing {} offset before closing", this.lastProcessedOffset);
281281
storageOperation.run();
@@ -318,7 +318,7 @@ public Consumer<Context> postProcessingCallback() {
318318
public void flushIfNecessary() {
319319
if (this.clock.time() - this.lastTrackingActivity > this.checkIntervalInNs) {
320320
try {
321-
long lastStoredOffset = consumer.lastStoredOffset();
321+
long lastStoredOffset = consumer.storedOffset();
322322
if (offsetBefore(lastStoredOffset, lastRequestedOffset)) {
323323
this.consumer.store(this.lastRequestedOffset);
324324
this.lastTrackingActivity = clock.time();

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.rabbitmq.stream.Constants;
2020
import com.rabbitmq.stream.Consumer;
2121
import com.rabbitmq.stream.ConsumerUpdateListener;
22-
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
2322
import com.rabbitmq.stream.MessageHandler;
2423
import com.rabbitmq.stream.MessageHandler.Context;
2524
import com.rabbitmq.stream.OffsetSpecification;
@@ -138,7 +137,7 @@ class StreamConsumer implements Consumer {
138137
&& context.status() == ConsumerUpdateListener.Status.ACTIVE) {
139138
LOGGER.debug("Looking up offset (stream {})", this.stream);
140139
StreamConsumer consumer = (StreamConsumer) context.consumer();
141-
long offset = consumer.lastStoredOffset();
140+
long offset = consumer.storedOffset();
142141
LOGGER.debug(
143142
"Stored offset is {}, returning the value + 1 to the server", offset);
144143
return OffsetSpecification.offset(offset + 1);
@@ -177,7 +176,7 @@ class StreamConsumer implements Consumer {
177176
&& context.status() == ConsumerUpdateListener.Status.ACTIVE) {
178177
LOGGER.debug("Going from passive to active, looking up offset");
179178
StreamConsumer consumer = (StreamConsumer) context.consumer();
180-
long offset = consumer.lastStoredOffset();
179+
long offset = consumer.storedOffset();
181180
LOGGER.debug(
182181
"Stored offset is {}, returning the value + 1 to the server", offset);
183182
result = OffsetSpecification.offset(offset + 1);
@@ -238,7 +237,7 @@ void waitForOffsetToBeStored(long expectedStoredOffset) {
238237
AsyncRetry.asyncRetry(
239238
() -> {
240239
try {
241-
long lastStoredOffset = lastStoredOffset();
240+
long lastStoredOffset = storedOffset();
242241
boolean stored = lastStoredOffset == expectedStoredOffset;
243242
LOGGER.debug(
244243
"Last stored offset from consumer {} on {} is {}, expecting {}",
@@ -448,7 +447,8 @@ void running() {
448447
this.status = Status.RUNNING;
449448
}
450449

451-
long lastStoredOffset() {
450+
@Override
451+
public long storedOffset() {
452452
if (canTrack()) {
453453
// the client can be null by now, so we catch any exception
454454
QueryOffsetResponse response;

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ public void store(long offset) {
157157
"Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
158158
}
159159

160+
@Override
161+
public long storedOffset() {
162+
throw new UnsupportedOperationException(
163+
"Consumer#storedOffset() does not work for super streams");
164+
}
165+
160166
@Override
161167
public void close() {
162168
for (Entry<String, Consumer> entry : consumers.entrySet()) {

src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ void autoShouldStoreAfterSomeInactivity() {
130130
void autoShouldStoreLastProcessedOffsetOnClosing() {
131131
Duration checkInterval = Duration.ofMillis(100);
132132
OffsetTrackingCoordinator coordinator = new OffsetTrackingCoordinator(env, checkInterval);
133-
when(consumer.lastStoredOffset()).thenReturn(5L);
133+
when(consumer.storedOffset()).thenReturn(5L);
134134

135135
Registration registration =
136136
coordinator.registerTrackingConsumer(
@@ -148,7 +148,7 @@ void autoShouldStoreLastProcessedOffsetOnClosing() {
148148
void autoShouldNotStoreLastProcessedOffsetOnClosingIfBehindStoredOffset() {
149149
Duration checkInterval = Duration.ofMillis(100);
150150
OffsetTrackingCoordinator coordinator = new OffsetTrackingCoordinator(env, checkInterval);
151-
when(consumer.lastStoredOffset()).thenReturn(15L);
151+
when(consumer.storedOffset()).thenReturn(15L);
152152

153153
Registration registration =
154154
coordinator.registerTrackingConsumer(
@@ -200,7 +200,7 @@ void autoShouldNotStoreIfOffsetAlreadyStored() throws Exception {
200200

201201
long storedOffset = 10;
202202

203-
when(consumer.lastStoredOffset()).thenReturn(storedOffset);
203+
when(consumer.storedOffset()).thenReturn(storedOffset);
204204

205205
Duration autoFlushInterval = Duration.ofMillis(checkInterval.toMillis() * 2);
206206
Consumer<Context> postProcessedMessageCallback =
@@ -223,7 +223,7 @@ void autoShouldNotFlushAfterInactivityIfLastStoreIsOnModulo() throws Exception {
223223

224224
int storeEvery = 10;
225225

226-
when(consumer.lastStoredOffset()).thenReturn((long) storeEvery - 1);
226+
when(consumer.storedOffset()).thenReturn((long) storeEvery - 1);
227227

228228
Duration autoFlushInterval = Duration.ofMillis(checkInterval.toMillis() * 2);
229229
Consumer<Context> postProcessedMessageCallback =
@@ -252,7 +252,7 @@ void autoShouldStoreLastProcessedAfterInactivity() {
252252
int extraMessages = 3;
253253

254254
long expectedLastStoredOffset = storeEvery + extraMessages - 1;
255-
when(consumer.lastStoredOffset()).thenReturn((long) (storeEvery - 1));
255+
when(consumer.storedOffset()).thenReturn((long) (storeEvery - 1));
256256

257257
ArgumentCaptor<Long> lastStoredOffsetCaptor = ArgumentCaptor.forClass(Long.class);
258258
CountDownLatch flushLatch = new CountDownLatch(1);
@@ -285,7 +285,7 @@ void manualShouldNotStoreIfAlreadyUpToDate() throws Exception {
285285
OffsetTrackingCoordinator coordinator = new OffsetTrackingCoordinator(env, checkInterval);
286286

287287
long lastStoredOffset = 50;
288-
when(consumer.lastStoredOffset()).thenReturn(lastStoredOffset);
288+
when(consumer.storedOffset()).thenReturn(lastStoredOffset);
289289

290290
LongConsumer storeCallback =
291291
coordinator
@@ -309,7 +309,7 @@ void manualShouldStoreIfRequestedStoredOffsetIsBehind() {
309309

310310
long lastRequestedOffset = 50;
311311
long lastStoredOffset = 40;
312-
when(consumer.lastStoredOffset()).thenReturn(lastStoredOffset);
312+
when(consumer.storedOffset()).thenReturn(lastStoredOffset);
313313

314314
ArgumentCaptor<Long> lastStoredOffsetCaptor = ArgumentCaptor.forClass(Long.class);
315315
CountDownLatch storeLatch = new CountDownLatch(1);

src/test/java/com/rabbitmq/stream/impl/StreamConsumerSacTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,72 @@ void autoTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Ex
108108
consumer2.close();
109109
}
110110

111+
@Test
112+
void manualTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Exception {
113+
int messageCount = 10000;
114+
int storeEvery = 1000;
115+
Map<Integer, AtomicInteger> receivedMessages = new ConcurrentHashMap<>();
116+
receivedMessages.put(0, new AtomicInteger(0));
117+
receivedMessages.put(1, new AtomicInteger(0));
118+
AtomicLong lastReceivedOffset = new AtomicLong(0);
119+
String consumerName = "foo";
120+
121+
Consumer consumer1 =
122+
environment.consumerBuilder().stream(stream)
123+
.name(consumerName)
124+
.singleActiveConsumer()
125+
.messageHandler(
126+
(context, message) -> {
127+
lastReceivedOffset.set(context.offset());
128+
int count = receivedMessages.get(0).incrementAndGet();
129+
if (count % storeEvery == 0) {
130+
context.storeOffset();
131+
}
132+
})
133+
.offset(OffsetSpecification.first())
134+
.manualTrackingStrategy()
135+
.builder()
136+
.build();
137+
138+
Consumer consumer2 =
139+
environment.consumerBuilder().stream(stream)
140+
.name(consumerName)
141+
.singleActiveConsumer()
142+
.messageHandler(
143+
(context, message) -> {
144+
lastReceivedOffset.set(context.offset());
145+
int count = receivedMessages.get(1).incrementAndGet();
146+
if (count % storeEvery == 0) {
147+
context.storeOffset();
148+
}
149+
})
150+
.offset(OffsetSpecification.first())
151+
.manualTrackingStrategy()
152+
.builder()
153+
.build();
154+
155+
publishAndWaitForConfirms(cf, messageCount, stream);
156+
waitAtMost(() -> receivedMessages.getOrDefault(0, new AtomicInteger(0)).get() == messageCount);
157+
158+
assertThat(lastReceivedOffset).hasPositiveValue();
159+
assertThat(receivedMessages.get(1)).hasValue(0);
160+
161+
long firstWaveLimit = lastReceivedOffset.get();
162+
163+
consumer1.store(firstWaveLimit);
164+
waitAtMost(() -> consumer1.storedOffset() == firstWaveLimit);
165+
166+
consumer1.close();
167+
168+
publishAndWaitForConfirms(cf, messageCount, stream);
169+
170+
waitAtMost(() -> receivedMessages.getOrDefault(0, new AtomicInteger(1)).get() == messageCount);
171+
assertThat(lastReceivedOffset).hasValueGreaterThan(firstWaveLimit);
172+
assertThat(receivedMessages.get(0)).hasValue(messageCount);
173+
174+
consumer2.close();
175+
}
176+
111177
@Test
112178
void customTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Exception {
113179
int messageCount = 10000;

0 commit comments

Comments
 (0)