Skip to content

Commit 0722b00

Browse files
committed
Take QueryOffsetResponse in Client, not only offset
References rabbitmq/rabbitmq-server#3783
1 parent 22aaad3 commit 0722b00

10 files changed

+68
-47
lines changed

src/main/java/com/rabbitmq/stream/impl/Client.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ public void storeOffset(String reference, String stream, long offset) {
10771077
channel.writeAndFlush(bb);
10781078
}
10791079

1080-
public long queryOffset(String reference, String stream) {
1080+
public QueryOffsetResponse queryOffset(String reference, String stream) {
10811081
if (reference == null || reference.isEmpty() || reference.length() > 256) {
10821082
throw new IllegalArgumentException(
10831083
"Reference must a non-empty string of less than 256 characters");
@@ -1103,10 +1103,7 @@ public long queryOffset(String reference, String stream) {
11031103
channel.writeAndFlush(bb);
11041104
request.block();
11051105
QueryOffsetResponse response = request.response.get();
1106-
if (!response.isOk()) {
1107-
LOGGER.info("Query offset failed with code {}", formatConstant(response.getResponseCode()));
1108-
}
1109-
return response.getOffset();
1106+
return response;
11101107
} catch (RuntimeException e) {
11111108
outstandingRequests.remove(correlationId);
11121109
throw new StreamException(e);
@@ -1775,7 +1772,7 @@ static class OpenResponse extends Response {
17751772
}
17761773
}
17771774

1778-
static class QueryOffsetResponse extends Response {
1775+
public static class QueryOffsetResponse extends Response {
17791776

17801777
private final long offset;
17811778

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.rabbitmq.stream.impl.Client.CreditNotification;
3030
import com.rabbitmq.stream.impl.Client.MessageListener;
3131
import com.rabbitmq.stream.impl.Client.MetadataListener;
32+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3233
import com.rabbitmq.stream.impl.Client.ShutdownListener;
3334
import com.rabbitmq.stream.impl.Utils.ClientFactory;
3435
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
@@ -647,9 +648,9 @@ synchronized void add(
647648

648649
String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
649650
if (offsetTrackingReference != null) {
650-
long trackedOffset =
651+
QueryOffsetResponse queryOffsetResponse =
651652
client.queryOffset(offsetTrackingReference, subscriptionTracker.stream);
652-
if (trackedOffset != 0) {
653+
if (queryOffsetResponse.isOk() && queryOffsetResponse.getOffset() != 0) {
653654
if (offsetSpecification != null && isInitialSubscription) {
654655
// subscription call (not recovery), so telling the user their offset specification is
655656
// ignored
@@ -660,11 +661,11 @@ synchronized void add(
660661
}
661662
LOGGER.debug(
662663
"Using offset {} to start consuming from {} with consumer {} " + "(instead of {})",
663-
trackedOffset,
664+
queryOffsetResponse.getOffset(),
664665
subscriptionTracker.stream,
665666
offsetTrackingReference,
666667
offsetSpecification);
667-
offsetSpecification = OffsetSpecification.offset(trackedOffset + 1);
668+
offsetSpecification = OffsetSpecification.offset(queryOffsetResponse.getOffset() + 1);
668669
}
669670
}
670671

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+30-19
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import com.rabbitmq.stream.MessageHandler;
1818
import com.rabbitmq.stream.MessageHandler.Context;
1919
import com.rabbitmq.stream.OffsetSpecification;
20+
import com.rabbitmq.stream.StreamException;
2021
import com.rabbitmq.stream.SubscriptionListener;
22+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
2123
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
2224
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
2325
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,29 +30,18 @@
2830
class StreamConsumer implements Consumer {
2931

3032
private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class);
31-
32-
private volatile Runnable closingCallback;
33-
3433
private final Runnable closingTrackingCallback;
35-
3634
private final AtomicBoolean closed = new AtomicBoolean(false);
37-
3835
private final String name;
39-
4036
private final String stream;
41-
4237
private final StreamEnvironment environment;
43-
38+
private final LongConsumer trackingCallback;
39+
private final Runnable initCallback;
40+
private volatile Runnable closingCallback;
4441
private volatile Client trackingClient;
45-
4642
private volatile Status status;
47-
4843
private volatile long lastRequestedStoredOffset = 0;
4944

50-
private final LongConsumer trackingCallback;
51-
52-
private final Runnable initCallback;
53-
5445
StreamConsumer(
5546
String stream,
5647
OffsetSpecification offsetSpecification,
@@ -195,15 +186,35 @@ void running() {
195186

196187
long lastStoredOffset() {
197188
if (canTrack()) {
189+
// the client can be null by now, so we catch any exception
190+
QueryOffsetResponse response;
198191
try {
199-
// the client can be null by now, but we catch the exception and return 0
200-
// callers should know how to deal with a stored offset of 0
201-
return this.trackingClient.queryOffset(this.name, this.stream);
192+
response = this.trackingClient.queryOffset(this.name, this.stream);
202193
} catch (Exception e) {
203-
return 0;
194+
throw new IllegalStateException(
195+
String.format(
196+
"Not possible to query offset for consumer %s on stream %s for now",
197+
this.name, this.stream),
198+
e);
204199
}
200+
if (response.isOk()) {
201+
return response.getOffset();
202+
} else {
203+
throw new StreamException(
204+
String.format(
205+
"QueryOffset for consumer %s on stream %s returned an error",
206+
this.name, this.stream),
207+
response.getResponseCode());
208+
}
209+
210+
} else if (this.name == null) {
211+
throw new UnsupportedOperationException(
212+
"Not possible to query stored offset for a consumer without a name");
205213
} else {
206-
return 0;
214+
throw new IllegalStateException(
215+
String.format(
216+
"Not possible to query offset for consumer %s on stream %s for now",
217+
this.name, this.stream));
207218
}
208219
}
209220

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,9 @@ TrackingConsumerRegistration registerTrackingConsumer(
570570
offsetTrackingRegistration.closingCallback().run();
571571
LOGGER.debug("Offset tracking registration closing sequence executed");
572572
} catch (Exception e) {
573-
LOGGER.warn("Error while executing offset tracking registration closing sequence");
573+
LOGGER.warn(
574+
"Error while executing offset tracking registration closing sequence: {}",
575+
e.getMessage());
574576
}
575577
closingCallable.run();
576578
};

src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ void storeQueryOffsetShouldSucceedOnAuthorisedStreamShouldFailOnUnauthorisedStre
289289
configurationClient.storeOffset("configuration", s, 10);
290290

291291
Duration timeToCheckOffsetTracking =
292-
waitAtMost(5, () -> configurationClient.queryOffset("configuration", s) == 10);
292+
waitAtMost(
293+
5, () -> configurationClient.queryOffset("configuration", s).getOffset() == 10);
293294

294295
Client client = client();
295296

@@ -300,7 +301,7 @@ void storeQueryOffsetShouldSucceedOnAuthorisedStreamShouldFailOnUnauthorisedStre
300301
assertThat(configurationClient.queryOffset("default-client", s)).isNotEqualTo(10);
301302

302303
// querying is not even authorised for the default client, it should return 0
303-
assertThat(client.queryOffset("configuration", s)).isZero();
304+
assertThat(client.queryOffset("configuration", s).getOffset()).isZero();
304305

305306
} finally {
306307
assertThat(configurationClient.delete(s).isOk()).isTrue();

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.rabbitmq.stream.SubscriptionListener;
3737
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
3838
import com.rabbitmq.stream.impl.Client.MessageListener;
39+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3940
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumersPoolInfo;
4041
import com.rabbitmq.stream.impl.Utils.ClientFactory;
4142
import java.time.Duration;
@@ -1095,8 +1096,8 @@ void shouldUseStoredOffsetOnRecovery(Consumer<ConsumersCoordinatorTest> configur
10951096
long lastStoredOffset = 5;
10961097
long lastReceivedOffset = 10;
10971098
when(client.queryOffset(consumerName, "stream"))
1098-
.thenReturn((long) 0)
1099-
.thenReturn(lastStoredOffset);
1099+
.thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L))
1100+
.thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, lastStoredOffset));
11001101

11011102
ArgumentCaptor<OffsetSpecification> offsetSpecificationArgumentCaptor =
11021103
ArgumentCaptor.forClass(OffsetSpecification.class);

src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ void trackAndQuery(
9797
client.storeOffset(trackingReference, s, storedOffset);
9898
}
9999
Thread.sleep(100L); // store offset is fire-and-forget
100-
long offset = client.queryOffset(queryReference, s);
100+
long offset = client.queryOffset(queryReference, s).getOffset();
101101
assertThat(offset).as(message).isEqualTo(expectedOffset);
102102
}
103103

@@ -213,7 +213,8 @@ void consumeAndStore(BiConsumer<String, Client> streamCreator, TestInfo info) th
213213
waitAtMost(
214214
5,
215215
() ->
216-
lastStoredOffset.get() == consumerReference.get().queryOffset(reference, s),
216+
lastStoredOffset.get()
217+
== consumerReference.get().queryOffset(reference, s).getOffset(),
217218
() ->
218219
"expecting last stored offset to be "
219220
+ lastStoredOffset
@@ -249,7 +250,7 @@ void consumeAndStore(BiConsumer<String, Client> streamCreator, TestInfo info) th
249250
client.credit(subscriptionId, 1))
250251
.messageListener(messageListener));
251252

252-
long offsetToStartFrom = consumer.queryOffset(reference, s) + 1;
253+
long offsetToStartFrom = consumer.queryOffset(reference, s).getOffset() + 1;
253254
consumer.subscribe(b(0), s, OffsetSpecification.offset(offsetToStartFrom), 1);
254255

255256
assertThat(consumeLatchSecondWave.await(10, TimeUnit.SECONDS)).isTrue();
@@ -330,7 +331,7 @@ void storeOffsetAndThenAttachByTimestampShouldWork() throws Exception {
330331

331332
IntStream.range(0, messageCount).forEach(i -> client.storeOffset("some reference", stream, i));
332333

333-
waitAtMost(() -> client.queryOffset("some reference", stream) == messageCount - 1);
334+
waitAtMost(() -> client.queryOffset("some reference", stream).getOffset() == messageCount - 1);
334335

335336
confirmLatch.set(new CountDownLatch(messageCount));
336337

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,8 @@ void autoTrackingShouldStorePeriodicallyAndAfterInactivity() throws Exception {
502502
waitAtMost(5, () -> messageCount.get() == storeEvery * 2);
503503

504504
Client client = cf.get();
505-
waitAtMost(5, () -> client.queryOffset(reference, stream) == lastReceivedOffset.get());
505+
waitAtMost(
506+
5, () -> client.queryOffset(reference, stream).getOffset() == lastReceivedOffset.get());
506507

507508
int extraMessages = storeEvery / 10;
508509
IntStream.range(0, extraMessages)
@@ -514,7 +515,8 @@ void autoTrackingShouldStorePeriodicallyAndAfterInactivity() throws Exception {
514515

515516
waitAtMost(5, () -> messageCount.get() == storeEvery * 2 + extraMessages);
516517

517-
waitAtMost(5, () -> client.queryOffset(reference, stream) == lastReceivedOffset.get());
518+
waitAtMost(
519+
5, () -> client.queryOffset(reference, stream).getOffset() == lastReceivedOffset.get());
518520
}
519521

520522
@Test
@@ -550,7 +552,8 @@ void autoTrackingShouldStoreAfterClosing() throws Exception {
550552
consumer.close();
551553

552554
Client client = cf.get();
553-
waitAtMost(5, () -> client.queryOffset(reference, stream) == lastReceivedOffset.get());
555+
waitAtMost(
556+
5, () -> client.queryOffset(reference, stream).getOffset() == lastReceivedOffset.get());
554557
}
555558

556559
@Test

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
174174
// offset near the end (the message count per partition minus a few messages)
175175
long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10);
176176
partitions.forEach(
177-
p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset));
177+
p ->
178+
assertThat(client.queryOffset(consumerName, p).getOffset())
179+
.isGreaterThan(almostLastOffset));
178180
consumer.close();
179181
}
180182

@@ -226,7 +228,9 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
226228
// offset near the end (the message count per partition minus a few messages)
227229
long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10);
228230
partitions.forEach(
229-
p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset));
231+
p ->
232+
assertThat(client.queryOffset(consumerName, p).getOffset())
233+
.isGreaterThan(almostLastOffset));
230234
consumer.close();
231235
}
232236
}

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,9 @@ void offsetShouldBeStoredWhenOptionIsEnabled() throws Exception {
184184
Future<?> run = run(builder().storeEvery(10).consumerNames("consumer-%2$d-on-stream-%1$s"));
185185
waitUntilStreamExists(s);
186186
String consumerName = "consumer-1-on-stream-" + s;
187-
long offset = client.queryOffset(consumerName, s);
187+
long offset = client.queryOffset(consumerName, s).getOffset();
188188
waitOneSecond();
189-
waitAtMost(() -> client.queryOffset(consumerName, s) > offset);
189+
waitAtMost(() -> client.queryOffset(consumerName, s).getOffset() > offset);
190190
run.cancel(true);
191191
waitRunEnds();
192192
}
@@ -196,9 +196,9 @@ void offsetShouldNotBeStoredWhenOptionIsNotEnabled() throws Exception {
196196
Future<?> run = run(builder());
197197
waitUntilStreamExists(s);
198198
String consumerName = s + "-0"; // default value when offset tracking is enabled
199-
assertThat(client.queryOffset(consumerName, s)).isZero();
199+
assertThat(client.queryOffset(consumerName, s).getOffset()).isZero();
200200
waitOneSecond();
201-
assertThat(client.queryOffset(consumerName, s)).isZero();
201+
assertThat(client.queryOffset(consumerName, s).getOffset()).isZero();
202202
run.cancel(true);
203203
waitRunEnds();
204204
}

0 commit comments

Comments
 (0)