Skip to content

Commit 32fa40b

Browse files
committed
Include committed offset to message delivery callback
Fixes #158
1 parent e9204a4 commit 32fa40b

19 files changed

+146
-54
lines changed

src/main/java/com/rabbitmq/stream/MessageHandler.java

+14
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,20 @@ interface Context {
5353
*/
5454
long timestamp();
5555

56+
/**
57+
* The committed offset on this stream.
58+
*
59+
* <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
60+
* (leader and replicas).
61+
*
62+
* <p>The committed offset is a good indication of what the last offset of a stream is at a
63+
* given time. The value can be stale as soon as the application reads it though, as the
64+
* committed offset for a stream that is published to changes all the time.
65+
*
66+
* @return committed offset on this stream
67+
*/
68+
long committedOffset();
69+
5670
/**
5771
* The consumer that receives the message.
5872
*

src/main/java/com/rabbitmq/stream/impl/Client.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -1443,7 +1443,12 @@ public interface ChunkListener {
14431443

14441444
public interface MessageListener {
14451445

1446-
void handle(byte subscriptionId, long offset, long chunkTimestamp, Message message);
1446+
void handle(
1447+
byte subscriptionId,
1448+
long offset,
1449+
long chunkTimestamp,
1450+
long committedOffset,
1451+
Message message);
14471452
}
14481453

14491454
public interface CreditNotification {
@@ -1964,7 +1969,7 @@ public static class ClientParameters {
19641969
private ChunkListener chunkListener =
19651970
(client, correlationId, offset, messageCount, dataSize) -> {};
19661971
private MessageListener messageListener =
1967-
(correlationId, offset, chunkTimestamp, message) -> {};
1972+
(correlationId, offset, chunkTimestamp, committedOffset, message) -> {};
19681973
private MetadataListener metadataListener = (stream, code) -> {};
19691974
private CreditNotification creditNotification =
19701975
(subscriptionId, responseCode) ->

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,15 @@ private static final class MessageHandlerContext implements Context {
283283

284284
private final long offset;
285285
private final long timestamp;
286+
private final long committedOffset;
286287
private final Consumer consumer;
287288

288-
private MessageHandlerContext(long offset, long timestamp, Consumer consumer) {
289+
private MessageHandlerContext(
290+
long offset, long timestamp, long committedOffset, Consumer consumer) {
289291
this.offset = offset;
290292
this.timestamp = timestamp;
291293
this.consumer = consumer;
294+
this.committedOffset = committedOffset;
292295
}
293296

294297
@Override
@@ -306,6 +309,11 @@ public long timestamp() {
306309
return this.timestamp;
307310
}
308311

312+
@Override
313+
public long committedOffset() {
314+
return committedOffset;
315+
}
316+
309317
@Override
310318
public Consumer consumer() {
311319
return this.consumer;
@@ -430,14 +438,15 @@ private ClientSubscriptionsManager(
430438
subscriptionId & 0xFF,
431439
Utils.formatConstant(responseCode));
432440
MessageListener messageListener =
433-
(subscriptionId, offset, chunkTimestamp, message) -> {
441+
(subscriptionId, offset, committedOffset, chunkTimestamp, message) -> {
434442
SubscriptionTracker subscriptionTracker =
435443
subscriptionTrackers.get(subscriptionId & 0xFF);
436444
if (subscriptionTracker != null) {
437445
subscriptionTracker.offset = offset;
438446
subscriptionTracker.hasReceivedSomething = true;
439447
subscriptionTracker.messageHandler.handle(
440-
new MessageHandlerContext(offset, chunkTimestamp, subscriptionTracker.consumer),
448+
new MessageHandlerContext(
449+
offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer),
441450
message);
442451
// FIXME set offset here as well, best effort to avoid duplicates
443452
} else {

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ static int handleMessage(
325325
long offset,
326326
long offsetLimit,
327327
long chunkTimestamp,
328+
long committedOffset,
328329
Codec codec,
329330
MessageListener messageListener,
330331
byte subscriptionId) {
@@ -338,7 +339,7 @@ static int handleMessage(
338339
messageFiltered.set(true);
339340
} else {
340341
Message message = codec.decode(data);
341-
messageListener.handle(subscriptionId, offset, chunkTimestamp, message);
342+
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedOffset, message);
342343
}
343344
return read;
344345
}
@@ -377,7 +378,7 @@ static int handleDeliver(
377378
ChunkChecksum chunkChecksum,
378379
MetricsCollector metricsCollector,
379380
byte subscriptionId,
380-
long lastCommittedOffset,
381+
long committedOffset,
381382
int read) {
382383
/*
383384
%% <<
@@ -473,6 +474,7 @@ static int handleDeliver(
473474
offset,
474475
offsetLimit,
475476
chunkTimestamp,
477+
committedOffset,
476478
codec,
477479
messageListener,
478480
subscriptionId);
@@ -538,6 +540,7 @@ static int handleDeliver(
538540
offset,
539541
offsetLimit,
540542
chunkTimestamp,
543+
committedOffset,
541544
codec,
542545
messageListener,
543546
subscriptionId);

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public long timestamp() {
115115
return context.timestamp();
116116
}
117117

118+
@Override
119+
public long committedOffset() {
120+
return context.committedOffset();
121+
}
122+
118123
@Override
119124
public void storeOffset() {
120125
for (ConsumerState state : consumerStates) {

src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
210210
(client1, subscriptionId, offset, messageCount1, dataSize) ->
211211
client1.credit(subscriptionId, 1))
212212
.messageListener(
213-
(subscriptionId, offset, chunkTimestamp, message) -> {
213+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
214214
messages.add(message);
215215
messageBodies.add(new String(message.getBodyAsBinary(), UTF8));
216216
consumedLatch.countDown();

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

+44-6
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ void publishConsumeComplexMessage(
259259
Client.ChunkListener chunkListener =
260260
(client, correlationId, offset, messageCount, dataSize) -> client.credit(correlationId, 1);
261261
Client.MessageListener messageListener =
262-
(correlationId, offset, chunkTimestamp, message) -> {
262+
(correlationId, offset, chunkTimestamp, committedOffset, message) -> {
263263
messages.add(message);
264264
latch.countDown();
265265
};
@@ -395,7 +395,7 @@ void publishConsumeWithSimpleCodec() throws Exception {
395395
(client, subscriptionId, offset, messageCount1, dataSize) ->
396396
client.credit(subscriptionId, 1))
397397
.messageListener(
398-
(subscriptionId, offset, chunkTimestamp, message) -> {
398+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
399399
messageBodies.add(new String(message.getBodyAsBinary()));
400400
consumeLatch.countDown();
401401
}));
@@ -448,7 +448,7 @@ void batchPublishing() throws Exception {
448448
(client, subscriptionId, offset, messageCount1, dataSize) ->
449449
client.credit(subscriptionId, 1))
450450
.messageListener(
451-
(subscriptionId, offset, chunkTimestamp, message) -> {
451+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
452452
ByteBuffer bb = ByteBuffer.wrap(message.getBodyAsBinary());
453453
sizes.add(message.getBodyAsBinary().length);
454454
sequences.add(bb.getInt());
@@ -479,7 +479,7 @@ void consume() throws Exception {
479479

480480
AtomicLong chunkTimestamp = new AtomicLong();
481481
Client.MessageListener messageListener =
482-
(corr, offset, chkTimestamp, message) -> {
482+
(corr, offset, chkTimestamp, committedOffset, message) -> {
483483
chunkTimestamp.set(chkTimestamp);
484484
latch.countDown();
485485
};
@@ -515,7 +515,7 @@ void publishAndConsume(boolean directBuffer) throws Exception {
515515
};
516516

517517
Client.MessageListener messageListener =
518-
(corr, offset, chunkTimestamp, data) -> consumedLatch.countDown();
518+
(corr, offset, chunkTimestamp, committedOffset, data) -> consumedLatch.countDown();
519519

520520
Client client =
521521
cf.get(
@@ -659,7 +659,7 @@ void declareAmqpStreamQueueAndUseItAsStream(TestInfo info) throws Exception {
659659
(client1, subscriptionId, offset, messageCount1, dataSize) ->
660660
client1.credit(subscriptionId, 1))
661661
.messageListener(
662-
(subscriptionId, offset, chunkTimestamp, message) ->
662+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) ->
663663
consumedLatch.countDown()));
664664
ConnectionFactory connectionFactory = new ConnectionFactory();
665665
try (Connection amqpConnection = connectionFactory.newConnection()) {
@@ -837,4 +837,42 @@ void exchangeCommandVersions() {
837837
assertThat(infos.stream().filter(info -> info.getKey() == Constants.COMMAND_DECLARE_PUBLISHER))
838838
.isNotEmpty();
839839
}
840+
841+
@Test
842+
void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception {
843+
int publishCount = 20_000;
844+
byte correlationId = 42;
845+
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
846+
847+
CountDownLatch latch = new CountDownLatch(publishCount);
848+
849+
Client.ChunkListener chunkListener =
850+
(client, corr, offset, messageCountInChunk, dataSize) -> {
851+
client.credit(correlationId, 1);
852+
};
853+
854+
AtomicLong committedOffset = new AtomicLong();
855+
Client.MessageListener messageListener =
856+
(corr, offset, chkTimestamp, committedOfft, message) -> {
857+
committedOffset.set(committedOfft);
858+
latch.countDown();
859+
};
860+
861+
Client client =
862+
cf.get(
863+
new Client.ClientParameters()
864+
.chunkListener(chunkListener)
865+
.messageListener(messageListener));
866+
867+
client.exchangeCommandVersions();
868+
869+
Response response =
870+
client.subscribe(correlationId, stream, OffsetSpecification.first(), credit);
871+
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
872+
assertThat(response.isOk()).isTrue();
873+
874+
assertThat(latch.await(10, SECONDS)).isTrue();
875+
assertThat(committedOffset.get()).isPositive();
876+
client.close();
877+
}
840878
}

0 commit comments

Comments
 (0)