Skip to content

Commit 6705c24

Browse files
committed
Add SAC test with manual offset tracking
Requires to add Consumer#storedOffset() to let the user make sure the stored offset request made it to the broker before closing the consumer. References #46, rabbitmq/rabbitmq-server#3753
1 parent c326c40 commit 6705c24

15 files changed

+216
-114
lines changed

src/main/java/com/rabbitmq/stream/Consumer.java

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public interface Consumer extends AutoCloseable {
3030
*/
3131
void store(long offset);
3232

33+
long storedOffset();
34+
3335
/** Close the consumer. */
3436
@Override
3537
void close();

src/main/java/com/rabbitmq/stream/impl/Client.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,7 @@ public void storeOffset(String reference, String stream, long offset) {
10811081
channel.writeAndFlush(bb);
10821082
}
10831083

1084-
public long queryOffset(String reference, String stream) {
1084+
public QueryOffsetResponse queryOffset(String reference, String stream) {
10851085
if (reference == null || reference.isEmpty() || reference.length() > 256) {
10861086
throw new IllegalArgumentException(
10871087
"Reference must a non-empty string of less than 256 characters");
@@ -1110,7 +1110,7 @@ public long queryOffset(String reference, String stream) {
11101110
if (!response.isOk()) {
11111111
LOGGER.info("Query offset failed with code {}", formatConstant(response.getResponseCode()));
11121112
}
1113-
return response.getOffset();
1113+
return response;
11141114
} catch (RuntimeException e) {
11151115
outstandingRequests.remove(correlationId);
11161116
throw new StreamException(e);
@@ -1807,7 +1807,7 @@ static class OpenResponse extends Response {
18071807
}
18081808
}
18091809

1810-
static class QueryOffsetResponse extends Response {
1810+
public static class QueryOffsetResponse extends Response {
18111811

18121812
private final long offset;
18131813

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.rabbitmq.stream.impl.Client.CreditNotification;
3232
import com.rabbitmq.stream.impl.Client.MessageListener;
3333
import com.rabbitmq.stream.impl.Client.MetadataListener;
34+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3435
import com.rabbitmq.stream.impl.Client.ShutdownListener;
3536
import com.rabbitmq.stream.impl.Utils.ClientFactory;
3637
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
@@ -697,9 +698,9 @@ synchronized void add(
697698

698699
String offsetTrackingReference = subscriptionTracker.offsetTrackingReference;
699700
if (offsetTrackingReference != null) {
700-
long trackedOffset =
701+
QueryOffsetResponse queryOffsetResponse =
701702
client.queryOffset(offsetTrackingReference, subscriptionTracker.stream);
702-
if (trackedOffset != 0) {
703+
if (queryOffsetResponse.isOk() && queryOffsetResponse.getOffset() != 0) {
703704
if (offsetSpecification != null && isInitialSubscription) {
704705
// subscription call (not recovery), so telling the user their offset specification is
705706
// ignored
@@ -710,11 +711,11 @@ synchronized void add(
710711
}
711712
LOGGER.debug(
712713
"Using offset {} to start consuming from {} with consumer {} " + "(instead of {})",
713-
trackedOffset,
714+
queryOffsetResponse.getOffset(),
714715
subscriptionTracker.stream,
715716
offsetTrackingReference,
716717
offsetSpecification);
717-
offsetSpecification = OffsetSpecification.offset(trackedOffset + 1);
718+
offsetSpecification = OffsetSpecification.offset(queryOffsetResponse.getOffset() + 1);
718719
}
719720
}
720721

src/main/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinator.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,8 @@ public void flushIfNecessary() {
210210

211211
@Override
212212
public long flush() {
213-
long lastStoredOffset = consumer.lastStoredOffset();
214-
if (lastStoredOffset < lastProcessedOffset) {
213+
long lastStoredOffset = consumer.storedOffset();
214+
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
215215
this.consumer.store(this.lastProcessedOffset);
216216
this.lastTrackingActivity = clock.time();
217217
}
@@ -234,8 +234,8 @@ public Runnable closingCallback() {
234234
if (this.consumer.isSac() && this.consumer.sacStatus() != Status.ACTIVE) {
235235
LOGGER.debug("Not storing offset on closing because consumer a is non-active SAC");
236236
} else {
237-
long lastStoredOffset = consumer.lastStoredOffset();
238-
if (lastStoredOffset < lastProcessedOffset) {
237+
long lastStoredOffset = consumer.storedOffset();
238+
if (Long.compareUnsigned(lastStoredOffset, lastProcessedOffset) < 0) {
239239
LOGGER.debug("Storing {} offset before closing", this.lastProcessedOffset);
240240
this.consumer.store(this.lastProcessedOffset);
241241
if (this.consumer.isSac()) {
@@ -277,8 +277,8 @@ public Consumer<Context> postProcessingCallback() {
277277
@Override
278278
public void flushIfNecessary() {
279279
if (this.clock.time() - this.lastTrackingActivity > this.checkIntervalInNs) {
280-
long lastStoredOffset = consumer.lastStoredOffset();
281-
if (lastStoredOffset < lastRequestedOffset) {
280+
long lastStoredOffset = consumer.storedOffset();
281+
if (Long.compareUnsigned(lastStoredOffset, lastRequestedOffset) < 0) {
282282
this.consumer.store(this.lastRequestedOffset);
283283
this.lastTrackingActivity = clock.time();
284284
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+82-70
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616
import com.rabbitmq.stream.BackOffDelayPolicy;
1717
import com.rabbitmq.stream.Consumer;
1818
import com.rabbitmq.stream.ConsumerUpdateListener;
19-
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
2019
import com.rabbitmq.stream.MessageHandler;
2120
import com.rabbitmq.stream.MessageHandler.Context;
2221
import com.rabbitmq.stream.OffsetSpecification;
22+
import com.rabbitmq.stream.StreamException;
2323
import com.rabbitmq.stream.SubscriptionListener;
24+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
2425
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
2526
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
2627
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
@@ -131,7 +132,7 @@ class StreamConsumer implements Consumer {
131132
&& context.status() == ConsumerUpdateListener.Status.ACTIVE) {
132133
LOGGER.debug("Looking up offset (stream {})", this.stream);
133134
StreamConsumer consumer = (StreamConsumer) context.consumer();
134-
long offset = consumer.lastStoredOffset();
135+
long offset = consumer.storedOffset();
135136
LOGGER.debug(
136137
"Stored offset is {}, returning the value + 1 to the server", offset);
137138
return OffsetSpecification.offset(offset + 1);
@@ -169,8 +170,7 @@ class StreamConsumer implements Consumer {
169170
if (context.previousStatus() == ConsumerUpdateListener.Status.PASSIVE
170171
&& context.status() == ConsumerUpdateListener.Status.ACTIVE) {
171172
LOGGER.debug("Going from passive to active, looking up offset");
172-
StreamConsumer consumer = (StreamConsumer) context.consumer();
173-
long offset = consumer.lastStoredOffset();
173+
long offset = context.consumer().storedOffset();
174174
LOGGER.debug(
175175
"Stored offset is {}, returning the value + 1 to the server", offset);
176176
result = OffsetSpecification.offset(offset + 1);
@@ -221,8 +221,9 @@ void waitForOffsetToBeStored(long expectedStoredOffset) {
221221
CompletableFuture<Boolean> storedTask =
222222
AsyncRetry.asyncRetry(
223223
() -> {
224-
long lastStoredOffset = lastStoredOffset();
225-
boolean stored = lastStoredOffset == expectedStoredOffset;
224+
long lastStoredOffset = storedOffset();
225+
boolean stored =
226+
Long.compareUnsigned(lastStoredOffset, expectedStoredOffset) == 0;
226227
LOGGER.debug(
227228
"Last stored offset from consumer {} on {} is {}, expecting {}",
228229
this.id,
@@ -316,56 +317,6 @@ OffsetSpecification consumerUpdate(boolean active) {
316317
return result;
317318
}
318319

319-
private static class DefaultConsumerUpdateContext implements ConsumerUpdateListener.Context {
320-
321-
private final StreamConsumer consumer;
322-
private final ConsumerUpdateListener.Status status;
323-
private final ConsumerUpdateListener.Status previousStatus;
324-
325-
private DefaultConsumerUpdateContext(
326-
StreamConsumer consumer,
327-
ConsumerUpdateListener.Status status,
328-
ConsumerUpdateListener.Status previousStatus) {
329-
this.consumer = consumer;
330-
this.status = status;
331-
this.previousStatus = previousStatus;
332-
}
333-
334-
@Override
335-
public Consumer consumer() {
336-
return this.consumer;
337-
}
338-
339-
@Override
340-
public String stream() {
341-
return this.consumer.stream;
342-
}
343-
344-
@Override
345-
public ConsumerUpdateListener.Status status() {
346-
return this.status;
347-
}
348-
349-
@Override
350-
public ConsumerUpdateListener.Status previousStatus() {
351-
return this.previousStatus;
352-
}
353-
354-
@Override
355-
public String toString() {
356-
return "DefaultConsumerUpdateContext{"
357-
+ "consumer="
358-
+ consumer
359-
+ ", stream="
360-
+ stream()
361-
+ ", status="
362-
+ status
363-
+ ", previousStatus="
364-
+ previousStatus
365-
+ '}';
366-
}
367-
}
368-
369320
boolean isSac() {
370321
return this.sacStatus != null;
371322
}
@@ -418,30 +369,35 @@ void running() {
418369
this.status = Status.RUNNING;
419370
}
420371

421-
long lastStoredOffset() {
372+
@Override
373+
public long storedOffset() {
422374
if (canTrack()) {
423-
try {
424-
// the client can be null by now, but we catch the exception and return 0
425-
// callers should know how to deal with a stored offset of 0
426-
return this.trackingClient.queryOffset(this.name, this.stream);
427-
} catch (Exception e) {
428-
return 0;
375+
// the client can be null by now, but we catch the exception and return 0
376+
// callers should know how to deal with a stored offset of 0
377+
QueryOffsetResponse response = this.trackingClient.queryOffset(this.name, this.stream);
378+
if (!response.isOk()) {
379+
throw new StreamException(
380+
String.format(
381+
"QueryOffset for consumer %s on stream %s returned an error",
382+
this.name, this.stream),
383+
response.getResponseCode());
429384
}
385+
return response.getOffset();
386+
} else if (this.name == null) {
387+
throw new UnsupportedOperationException(
388+
"Not possible to query stored offset for a consumer without a name");
430389
} else {
431-
return 0;
390+
throw new IllegalStateException(
391+
String.format(
392+
"Not possible to query offset for consumer %s on stream %s for now",
393+
this.name, this.stream));
432394
}
433395
}
434396

435397
String stream() {
436398
return this.stream;
437399
}
438400

439-
enum Status {
440-
RUNNING,
441-
NOT_AVAILABLE,
442-
CLOSED
443-
}
444-
445401
@Override
446402
public boolean equals(Object o) {
447403
if (this == o) {
@@ -463,4 +419,60 @@ public int hashCode() {
463419
public String toString() {
464420
return "StreamConsumer{" + "id=" + id + ", stream='" + stream + '\'' + '}';
465421
}
422+
423+
enum Status {
424+
RUNNING,
425+
NOT_AVAILABLE,
426+
CLOSED
427+
}
428+
429+
private static class DefaultConsumerUpdateContext implements ConsumerUpdateListener.Context {
430+
431+
private final StreamConsumer consumer;
432+
private final ConsumerUpdateListener.Status status;
433+
private final ConsumerUpdateListener.Status previousStatus;
434+
435+
private DefaultConsumerUpdateContext(
436+
StreamConsumer consumer,
437+
ConsumerUpdateListener.Status status,
438+
ConsumerUpdateListener.Status previousStatus) {
439+
this.consumer = consumer;
440+
this.status = status;
441+
this.previousStatus = previousStatus;
442+
}
443+
444+
@Override
445+
public Consumer consumer() {
446+
return this.consumer;
447+
}
448+
449+
@Override
450+
public String stream() {
451+
return this.consumer.stream;
452+
}
453+
454+
@Override
455+
public ConsumerUpdateListener.Status status() {
456+
return this.status;
457+
}
458+
459+
@Override
460+
public ConsumerUpdateListener.Status previousStatus() {
461+
return this.previousStatus;
462+
}
463+
464+
@Override
465+
public String toString() {
466+
return "DefaultConsumerUpdateContext{"
467+
+ "consumer="
468+
+ consumer
469+
+ ", stream="
470+
+ stream()
471+
+ ", status="
472+
+ status
473+
+ ", previousStatus="
474+
+ previousStatus
475+
+ '}';
476+
}
477+
}
466478
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+6
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,12 @@ public void store(long offset) {
157157
"Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead");
158158
}
159159

160+
@Override
161+
public long storedOffset() {
162+
throw new UnsupportedOperationException(
163+
"Consumer#storedOffset() does not work for super streams");
164+
}
165+
160166
@Override
161167
public void close() {
162168
for (Entry<String, Consumer> entry : consumers.entrySet()) {

src/test/java/com/rabbitmq/stream/impl/AuthorisationTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ void storeQueryOffsetShouldSucceedOnAuthorisedStreamShouldFailOnUnauthorisedStre
289289
configurationClient.storeOffset("configuration", s, 10);
290290

291291
Duration timeToCheckOffsetTracking =
292-
waitAtMost(5, () -> configurationClient.queryOffset("configuration", s) == 10);
292+
waitAtMost(
293+
5, () -> configurationClient.queryOffset("configuration", s).getOffset() == 10);
293294

294295
Client client = client();
295296

@@ -300,7 +301,7 @@ void storeQueryOffsetShouldSucceedOnAuthorisedStreamShouldFailOnUnauthorisedStre
300301
assertThat(configurationClient.queryOffset("default-client", s)).isNotEqualTo(10);
301302

302303
// querying is not even authorised for the default client, it should return 0
303-
assertThat(client.queryOffset("configuration", s)).isZero();
304+
assertThat(client.queryOffset("configuration", s).getOffset()).isZero();
304305

305306
} finally {
306307
assertThat(configurationClient.delete(s).isOk()).isTrue();

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.rabbitmq.stream.SubscriptionListener;
3737
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
3838
import com.rabbitmq.stream.impl.Client.MessageListener;
39+
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3940
import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumersPoolInfo;
4041
import com.rabbitmq.stream.impl.Utils.ClientFactory;
4142
import java.time.Duration;
@@ -1144,8 +1145,8 @@ void shouldUseStoredOffsetOnRecovery(Consumer<ConsumersCoordinatorTest> configur
11441145
long lastStoredOffset = 5;
11451146
long lastReceivedOffset = 10;
11461147
when(client.queryOffset(consumerName, "stream"))
1147-
.thenReturn((long) 0)
1148-
.thenReturn(lastStoredOffset);
1148+
.thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L))
1149+
.thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, lastStoredOffset));
11491150

11501151
ArgumentCaptor<OffsetSpecification> offsetSpecificationArgumentCaptor =
11511152
ArgumentCaptor.forClass(OffsetSpecification.class);

0 commit comments

Comments
 (0)