Skip to content

Commit 57247d5

Browse files
committed
Handle "no offset" (19) response code (query offset)
References rabbitmq/rabbitmq-server#3783 Fixes #48
1 parent 0722b00 commit 57247d5

File tree

3 files changed

+60
-18
lines changed

3 files changed

+60
-18
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public final class Constants {
3434
public static final short RESPONSE_CODE_ACCESS_REFUSED = 16;
3535
public static final short RESPONSE_CODE_PRECONDITION_FAILED = 17;
3636
public static final short RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST = 18;
37+
public static final short RESPONSE_CODE_NO_OFFSET = 19;
3738

3839
public static final short CODE_MESSAGE_ENQUEUEING_FAILED = 10_001;
3940
public static final short CODE_PRODUCER_NOT_AVAILABLE = 10_002;

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

+48-18
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import com.rabbitmq.stream.Constants;
1617
import com.rabbitmq.stream.MessageHandler.Context;
18+
import com.rabbitmq.stream.StreamException;
1719
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
1820
import java.time.Duration;
1921
import java.util.Collection;
@@ -91,7 +93,11 @@ Registration registerTrackingConsumer(
9193
}
9294
Tracker t = iterator.next();
9395
if (t.consumer().isOpen()) {
94-
t.flushIfNecessary();
96+
try {
97+
t.flushIfNecessary();
98+
} catch (Exception e) {
99+
LOGGER.info("Error while flushing tracker: {}", e.getMessage());
100+
}
95101
} else {
96102
iterator.remove();
97103
}
@@ -194,10 +200,18 @@ public Consumer<Context> postProcessingCallback() {
194200
public void flushIfNecessary() {
195201
if (this.count > 0) {
196202
if (this.clock.time() - this.lastTrackingActivity > this.flushIntervalInNs) {
197-
long lastStoredOffset = consumer.lastStoredOffset();
198-
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
199-
this.consumer.store(this.lastProcessedOffset);
200-
this.lastTrackingActivity = clock.time();
203+
try {
204+
long lastStoredOffset = consumer.lastStoredOffset();
205+
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
206+
this.consumer.store(this.lastProcessedOffset);
207+
this.lastTrackingActivity = clock.time();
208+
}
209+
} catch (StreamException e) {
210+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
211+
// probably nothing stored yet, let it go
212+
} else {
213+
throw e;
214+
}
201215
}
202216
}
203217
}
@@ -216,15 +230,23 @@ public LongConsumer trackingCallback() {
216230
@Override
217231
public Runnable closingCallback() {
218232
return () -> {
219-
long lastStoredOffset = consumer.lastStoredOffset();
220-
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
221-
LOGGER.debug("Storing offset before closing");
222-
this.consumer.store(this.lastProcessedOffset);
223-
} else {
224-
LOGGER.debug(
225-
"Not storing offset before closing because last stored offset after last processed offset: {} > {}",
226-
lastStoredOffset,
227-
lastProcessedOffset);
233+
try {
234+
long lastStoredOffset = consumer.lastStoredOffset();
235+
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
236+
LOGGER.debug("Storing offset before closing");
237+
this.consumer.store(this.lastProcessedOffset);
238+
} else {
239+
LOGGER.debug(
240+
"Not storing offset before closing because last stored offset after last processed offset: {} > {}",
241+
lastStoredOffset,
242+
lastProcessedOffset);
243+
}
244+
} catch (StreamException e) {
245+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
246+
// probably nothing stored yet, let it go
247+
} else {
248+
throw e;
249+
}
228250
}
229251
};
230252
}
@@ -253,10 +275,18 @@ public Consumer<Context> postProcessingCallback() {
253275
@Override
254276
public void flushIfNecessary() {
255277
if (this.clock.time() - this.lastTrackingActivity > this.checkIntervalInNs) {
256-
long lastStoredOffset = consumer.lastStoredOffset();
257-
if (Long.compareUnsigned(lastStoredOffset, lastRequestedOffset) < 0) {
258-
this.consumer.store(this.lastRequestedOffset);
259-
this.lastTrackingActivity = clock.time();
278+
try {
279+
long lastStoredOffset = consumer.lastStoredOffset();
280+
if (Long.compareUnsigned(lastStoredOffset, lastRequestedOffset) < 0) {
281+
this.consumer.store(this.lastRequestedOffset);
282+
this.lastTrackingActivity = clock.time();
283+
}
284+
} catch (StreamException e) {
285+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
286+
// probably nothing stored yet, let it go
287+
} else {
288+
throw e;
289+
}
260290
}
261291
}
262292
}

src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java

+11
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
import static com.rabbitmq.stream.impl.TestUtils.b;
1717
import static com.rabbitmq.stream.impl.TestUtils.forEach;
1818
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
19+
import static com.rabbitmq.stream.impl.TestUtils.responseCode;
1920
import static com.rabbitmq.stream.impl.TestUtils.streamName;
2021
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2122
import static org.assertj.core.api.Assertions.assertThat;
2223
import static org.assertj.core.api.Assertions.assertThatNoException;
2324

25+
import com.rabbitmq.stream.Constants;
2426
import com.rabbitmq.stream.OffsetSpecification;
2527
import com.rabbitmq.stream.impl.Client.ClientParameters;
2628
import com.rabbitmq.stream.impl.Client.MessageListener;
29+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
2730
import com.rabbitmq.stream.impl.Client.Response;
2831
import com.rabbitmq.stream.impl.Client.StreamParametersBuilder;
2932
import io.vavr.Tuple;
@@ -101,6 +104,14 @@ void trackAndQuery(
101104
assertThat(offset).as(message).isEqualTo(expectedOffset);
102105
}
103106

107+
@Test
108+
void shouldReturnNoOffsetIfNothingStoredForReference() {
109+
QueryOffsetResponse response = cf.get().queryOffset(UUID.randomUUID().toString(), stream);
110+
assertThat(response.isOk()).isFalse();
111+
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_NO_OFFSET);
112+
assertThat(response.getOffset()).isEqualTo(0);
113+
}
114+
104115
@ParameterizedTest
105116
@MethodSource
106117
void consumeAndStore(BiConsumer<String, Client> streamCreator, TestInfo info) throws Exception {

0 commit comments

Comments
 (0)