Skip to content

Commit 1eee1d3

Browse files
committed
Handle offset 0 in tracking strategies
Conflicts: src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
1 parent 0ff3b6f commit 1eee1d3

File tree

6 files changed

+102
-28
lines changed

6 files changed

+102
-28
lines changed

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
1717

18-
import com.rabbitmq.stream.Constants;
1918
import com.rabbitmq.stream.MessageHandler.Context;
2019
import com.rabbitmq.stream.NoOffsetException;
2120
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
@@ -233,8 +232,8 @@ public long flush() {
233232
}
234233
result = lastProcessedOffset.get();
235234
} catch (NoOffsetException e) {
236-
this.consumer.store(this.lastProcessedOffset.get());
237-
result = lastProcessedOffset.get();
235+
this.consumer.store(this.lastProcessedOffset.get());
236+
result = lastProcessedOffset.get();
238237
}
239238
this.lastTrackingActivity = clock.time();
240239
return result;

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
17-
import static com.rabbitmq.stream.impl.Utils.isSac;
1817
import com.rabbitmq.stream.BackOffDelayPolicy;
1918
import com.rabbitmq.stream.Constants;
2019
import com.rabbitmq.stream.Consumer;
@@ -279,6 +278,11 @@ void waitForOffsetToBeStored(long expectedStoredOffset) {
279278
}
280279
} catch (StreamException e) {
281280
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
281+
LOGGER.debug(
282+
"No stored offset for consumer {} on {}, expecting {}",
283+
this.id,
284+
this.stream,
285+
expectedStoredOffset);
282286
throw new IllegalStateException();
283287
} else {
284288
throw e;

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ TrackingConsumerRegistration registerTrackingConsumer(
585585
closingSequence =
586586
() -> {
587587
try {
588-
LOGGER.debug("Executing offset tracking registration closing sequence");
588+
LOGGER.debug("Executing offset tracking registration closing sequence ");
589589
offsetTrackingRegistration.closingCallback().run();
590590
LOGGER.debug("Offset tracking registration closing sequence executed");
591591
} catch (Exception e) {

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.rabbitmq.stream.Environment;
3232
import com.rabbitmq.stream.EnvironmentBuilder;
3333
import com.rabbitmq.stream.Host;
34+
import com.rabbitmq.stream.NoOffsetException;
3435
import com.rabbitmq.stream.OffsetSpecification;
3536
import com.rabbitmq.stream.Producer;
3637
import com.rabbitmq.stream.StreamDoesNotExistException;
@@ -621,7 +622,6 @@ void autoTrackingShouldStoreOffsetZeroOnClosing() throws Exception {
621622
5,
622623
() -> {
623624
QueryOffsetResponse response = client.queryOffset(reference, stream);
624-
System.out.println(response.isOk());
625625
return response.isOk() && response.getOffset() == lastReceivedOffset.get();
626626
},
627627
() ->
@@ -806,19 +806,8 @@ void offsetZeroShouldBeStored() throws Exception {
806806
.checkInterval(Duration.ZERO)
807807
.builder()
808808
.build();
809-
Client client = cf.get();
810-
assertThat(client.queryOffset(ref, stream).getResponseCode())
811-
.isEqualTo(Constants.RESPONSE_CODE_NO_OFFSET);
809+
assertThatThrownBy(() -> consumer.storedOffset()).isInstanceOf(NoOffsetException.class);
812810
consumer.store(0);
813-
waitAtMost(
814-
5,
815-
() -> {
816-
QueryOffsetResponse response = client.queryOffset(ref, stream);
817-
return response.isOk() && response.getOffset() == 0;
818-
},
819-
() ->
820-
format(
821-
"Expecting stored offset %d to be equal to last received offset %d",
822-
client.queryOffset(ref, stream).getOffset(), 0));
811+
waitAtMost(() -> consumer.storedOffset() == 0);
823812
}
824813
}

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

Lines changed: 75 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1919
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
2020
import static com.rabbitmq.stream.impl.TestUtils.localhost;
21+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
22+
import static com.rabbitmq.stream.impl.TestUtils.wrap;
2123
import static org.assertj.core.api.Assertions.assertThat;
2224

2325
import com.rabbitmq.client.Connection;
@@ -27,8 +29,10 @@
2729
import com.rabbitmq.stream.EnvironmentBuilder;
2830
import com.rabbitmq.stream.OffsetSpecification;
2931
import com.rabbitmq.stream.impl.Client.ClientParameters;
32+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3033
import io.netty.channel.EventLoopGroup;
3134
import java.nio.charset.StandardCharsets;
35+
import java.time.Duration;
3236
import java.util.Collections;
3337
import java.util.List;
3438
import java.util.concurrent.ConcurrentHashMap;
@@ -174,9 +178,14 @@ void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
174178
// offset near the end (the message count per partition minus a few messages)
175179
long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10);
176180
partitions.forEach(
177-
p ->
178-
assertThat(client.queryOffset(consumerName, p).getOffset())
179-
.isGreaterThan(almostLastOffset));
181+
wrap(
182+
p -> {
183+
waitAtMost(
184+
() -> {
185+
QueryOffsetResponse response = client.queryOffset(consumerName, p);
186+
return response.isOk() && response.getOffset() > almostLastOffset;
187+
});
188+
}));
180189
consumer.close();
181190
}
182191

@@ -211,9 +220,6 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
211220
messagesReceived.get(partition).incrementAndGet();
212221
lastOffsets.put(partition, context.offset());
213222
totalCount.incrementAndGet();
214-
if (totalCount.get() % 50 == 0) {
215-
context.storeOffset();
216-
}
217223
consumeLatch.countDown();
218224
})
219225
.build();
@@ -228,9 +234,69 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
228234
// offset near the end (the message count per partition minus a few messages)
229235
long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10);
230236
partitions.forEach(
231-
p ->
232-
assertThat(client.queryOffset(consumerName, p).getOffset())
233-
.isGreaterThan(almostLastOffset));
237+
wrap(
238+
p -> {
239+
waitAtMost(
240+
() -> {
241+
QueryOffsetResponse response = client.queryOffset(consumerName, p);
242+
return response.isOk() && response.getOffset() > almostLastOffset;
243+
});
244+
}));
234245
consumer.close();
235246
}
247+
248+
@Test
249+
void autoOffsetTrackingShouldStoreOffsetZero() throws Exception {
250+
declareSuperStreamTopology(connection, superStream, partitionCount);
251+
Client client = cf.get();
252+
List<String> partitions = client.partitions(superStream);
253+
int messageCount = partitionCount;
254+
publishToPartitions(cf, partitions, messageCount);
255+
ConcurrentMap<String, AtomicInteger> messagesReceived = new ConcurrentHashMap<>(partitionCount);
256+
ConcurrentMap<String, Long> lastOffsets = new ConcurrentHashMap<>(partitionCount);
257+
partitions.forEach(
258+
p -> {
259+
messagesReceived.put(p, new AtomicInteger(0));
260+
});
261+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
262+
String consumerName = "my-app";
263+
AtomicInteger totalCount = new AtomicInteger();
264+
Consumer consumer =
265+
environment
266+
.consumerBuilder()
267+
.superStream(superStream)
268+
.offset(OffsetSpecification.first())
269+
.name(consumerName)
270+
.autoTrackingStrategy()
271+
.flushInterval(Duration.ofHours(1)) // long enough
272+
.builder()
273+
.messageHandler(
274+
(context, message) -> {
275+
String partition = new String(message.getBodyAsBinary());
276+
messagesReceived.get(partition).incrementAndGet();
277+
lastOffsets.put(partition, context.offset());
278+
totalCount.incrementAndGet();
279+
consumeLatch.countDown();
280+
})
281+
.build();
282+
latchAssert(consumeLatch).completes();
283+
assertThat(messagesReceived).hasSize(partitionCount);
284+
partitions.forEach(
285+
p -> {
286+
assertThat(messagesReceived).containsKey(p);
287+
assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount);
288+
});
289+
consumer.close();
290+
partitions.forEach(
291+
wrap(
292+
p -> {
293+
assertThat(lastOffsets.get(p)).isZero();
294+
waitAtMost(
295+
() -> {
296+
QueryOffsetResponse response = client.queryOffset(consumerName, p);
297+
return response.isOk()
298+
&& response.getOffset() == lastOffsets.get(p).longValue();
299+
});
300+
}));
301+
}
236302
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ public static Duration waitAtMost(int timeoutInSeconds, CallableBooleanSupplier
112112
return waitAtMost(timeoutInSeconds, condition, null);
113113
}
114114

115+
// public static Duration waitAtMost(
116+
// int timeoutInSeconds, CallableBooleanSupplier condition, Supplier<String> message)
117+
// throws Exception {
118+
// return waitAtMost(timeoutInSeconds, condition, message, true);
119+
// }
120+
115121
public static Duration waitAtMost(
116122
int timeoutInSeconds, CallableBooleanSupplier condition, Supplier<String> message)
117123
throws Exception {
@@ -504,6 +510,16 @@ public interface CallableConsumer<T> {
504510
void accept(T t) throws Exception;
505511
}
506512

513+
static <T> java.util.function.Consumer<T> wrap(CallableConsumer<T> action) {
514+
return t -> {
515+
try {
516+
action.accept(t);
517+
} catch (Exception e) {
518+
throw new RuntimeException(e);
519+
}
520+
};
521+
}
522+
507523
@FunctionalInterface
508524
public interface CallableBooleanSupplier {
509525
boolean getAsBoolean() throws Exception;

0 commit comments

Comments
 (0)