Skip to content

Commit fc183ac

Browse files
authored
Merge pull request #159 from rabbitmq/rabbitmq-server-5307-committed-offset-to-deliver
Include committed offset to message delivery callback
2 parents d22c2ca + c6168ae commit fc183ac

23 files changed

+250
-76
lines changed

src/main/java/com/rabbitmq/stream/MessageHandler.java

+14
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,20 @@ interface Context {
5353
*/
5454
long timestamp();
5555

56+
/**
57+
* The committed offset on this stream.
58+
*
59+
* <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
60+
* (leader and replicas).
61+
*
62+
* <p>The committed offset is a good indication of what the last offset of a stream is at a
63+
* given time. The value can be stale as soon as the application reads it though, as the
64+
* committed offset for a stream that is published to changes all the time.
65+
*
66+
* @return committed offset on this stream
67+
*/
68+
long committedOffset();
69+
5670
/**
5771
* The consumer that receives the message.
5872
*

src/main/java/com/rabbitmq/stream/impl/Client.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -1443,7 +1443,12 @@ public interface ChunkListener {
14431443

14441444
public interface MessageListener {
14451445

1446-
void handle(byte subscriptionId, long offset, long chunkTimestamp, Message message);
1446+
void handle(
1447+
byte subscriptionId,
1448+
long offset,
1449+
long chunkTimestamp,
1450+
long committedOffset,
1451+
Message message);
14471452
}
14481453

14491454
public interface CreditNotification {
@@ -1964,7 +1969,7 @@ public static class ClientParameters {
19641969
private ChunkListener chunkListener =
19651970
(client, correlationId, offset, messageCount, dataSize) -> {};
19661971
private MessageListener messageListener =
1967-
(correlationId, offset, chunkTimestamp, message) -> {};
1972+
(correlationId, offset, chunkTimestamp, committedOffset, message) -> {};
19681973
private MetadataListener metadataListener = (stream, code) -> {};
19691974
private CreditNotification creditNotification =
19701975
(subscriptionId, responseCode) ->

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+23-3
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,14 @@ private static final class MessageHandlerContext implements Context {
283283

284284
private final long offset;
285285
private final long timestamp;
286+
private final long committedOffset;
286287
private final Consumer consumer;
287288

288-
private MessageHandlerContext(long offset, long timestamp, Consumer consumer) {
289+
private MessageHandlerContext(
290+
long offset, long timestamp, long committedOffset, Consumer consumer) {
289291
this.offset = offset;
290292
this.timestamp = timestamp;
293+
this.committedOffset = committedOffset;
291294
this.consumer = consumer;
292295
}
293296

@@ -306,6 +309,11 @@ public long timestamp() {
306309
return this.timestamp;
307310
}
308311

312+
@Override
313+
public long committedOffset() {
314+
return committedOffset;
315+
}
316+
309317
@Override
310318
public Consumer consumer() {
311319
return this.consumer;
@@ -430,14 +438,15 @@ private ClientSubscriptionsManager(
430438
subscriptionId & 0xFF,
431439
Utils.formatConstant(responseCode));
432440
MessageListener messageListener =
433-
(subscriptionId, offset, chunkTimestamp, message) -> {
441+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
434442
SubscriptionTracker subscriptionTracker =
435443
subscriptionTrackers.get(subscriptionId & 0xFF);
436444
if (subscriptionTracker != null) {
437445
subscriptionTracker.offset = offset;
438446
subscriptionTracker.hasReceivedSomething = true;
439447
subscriptionTracker.messageHandler.handle(
440-
new MessageHandlerContext(offset, chunkTimestamp, subscriptionTracker.consumer),
448+
new MessageHandlerContext(
449+
offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer),
441450
message);
442451
// FIXME set offset here as well, best effort to avoid duplicates
443452
} else {
@@ -550,6 +559,7 @@ private ClientSubscriptionsManager(
550559
.metadataListener(metadataListener))
551560
.key(owner.name);
552561
this.client = clientFactory.client(clientFactoryContext);
562+
maybeExchangeCommandVersions(client);
553563
clientInitializedInManager.set(true);
554564
}
555565

@@ -839,4 +849,14 @@ public String toString() {
839849
return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}';
840850
}
841851
}
852+
853+
private static void maybeExchangeCommandVersions(Client client) {
854+
try {
855+
if (Utils.is3_11_OrMore(client.brokerVersion())) {
856+
client.exchangeCommandVersions();
857+
}
858+
} catch (Exception e) {
859+
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
860+
}
861+
}
842862
}

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ static int handleMessage(
325325
long offset,
326326
long offsetLimit,
327327
long chunkTimestamp,
328+
long committedOffset,
328329
Codec codec,
329330
MessageListener messageListener,
330331
byte subscriptionId) {
@@ -338,7 +339,7 @@ static int handleMessage(
338339
messageFiltered.set(true);
339340
} else {
340341
Message message = codec.decode(data);
341-
messageListener.handle(subscriptionId, offset, chunkTimestamp, message);
342+
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedOffset, message);
342343
}
343344
return read;
344345
}
@@ -377,7 +378,7 @@ static int handleDeliver(
377378
ChunkChecksum chunkChecksum,
378379
MetricsCollector metricsCollector,
379380
byte subscriptionId,
380-
long lastCommittedOffset,
381+
long committedOffset,
381382
int read) {
382383
/*
383384
%% <<
@@ -473,6 +474,7 @@ static int handleDeliver(
473474
offset,
474475
offsetLimit,
475476
chunkTimestamp,
477+
committedOffset,
476478
codec,
477479
messageListener,
478480
subscriptionId);
@@ -538,6 +540,7 @@ static int handleDeliver(
538540
offset,
539541
offsetLimit,
540542
chunkTimestamp,
543+
committedOffset,
541544
codec,
542545
messageListener,
543546
subscriptionId);

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+5
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public long timestamp() {
115115
return context.timestamp();
116116
}
117117

118+
@Override
119+
public long committedOffset() {
120+
return context.committedOffset();
121+
}
122+
118123
@Override
119124
public void storeOffset() {
120125
for (ConsumerState state : consumerStates) {

src/main/java/com/rabbitmq/stream/impl/Utils.java

+41
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,45 @@ static Function<ClientConnectionType, String> defaultConnectionNamingStrategy(St
240240
static boolean offsetBefore(long x, long y) {
241241
return Long.compareUnsigned(x, y) < 0;
242242
}
243+
244+
private static String currentVersion(String currentVersion) {
245+
// versions built from source: 3.7.0+rc.1.4.gedc5d96
246+
if (currentVersion.contains("+")) {
247+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
248+
}
249+
// alpha (snapshot) versions: 3.7.0~alpha.449-1
250+
if (currentVersion.contains("~")) {
251+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
252+
}
253+
// alpha (snapshot) versions: 3.7.1-alpha.40
254+
if (currentVersion.contains("-")) {
255+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
256+
}
257+
return currentVersion;
258+
}
259+
260+
/**
261+
* https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
262+
*/
263+
static int versionCompare(String str1, String str2) {
264+
String[] vals1 = str1.split("\\.");
265+
String[] vals2 = str2.split("\\.");
266+
int i = 0;
267+
// set index to first non-equal ordinal or length of shortest version string
268+
while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) {
269+
i++;
270+
}
271+
// compare first non-equal ordinal number
272+
if (i < vals1.length && i < vals2.length) {
273+
int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i]));
274+
return Integer.signum(diff);
275+
}
276+
// the strings are equal or one string is a substring of the other
277+
// e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4"
278+
return Integer.signum(vals1.length - vals2.length);
279+
}
280+
281+
static boolean is3_11_OrMore(String brokerVersion) {
282+
return versionCompare(currentVersion(brokerVersion), "3.11.0") >= 0;
283+
}
243284
}

src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
210210
(client1, subscriptionId, offset, messageCount1, dataSize) ->
211211
client1.credit(subscriptionId, 1))
212212
.messageListener(
213-
(subscriptionId, offset, chunkTimestamp, message) -> {
213+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
214214
messages.add(message);
215215
messageBodies.add(new String(message.getBodyAsBinary(), UTF8));
216216
consumedLatch.countDown();

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

+47-7
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.rabbitmq.stream.impl.Client.Response;
3838
import com.rabbitmq.stream.impl.Client.StreamParametersBuilder;
3939
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
40+
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
4041
import io.netty.buffer.ByteBufAllocator;
4142
import io.netty.buffer.UnpooledByteBufAllocator;
4243
import java.io.ByteArrayOutputStream;
@@ -259,7 +260,7 @@ void publishConsumeComplexMessage(
259260
Client.ChunkListener chunkListener =
260261
(client, correlationId, offset, messageCount, dataSize) -> client.credit(correlationId, 1);
261262
Client.MessageListener messageListener =
262-
(correlationId, offset, chunkTimestamp, message) -> {
263+
(correlationId, offset, chunkTimestamp, committedOffset, message) -> {
263264
messages.add(message);
264265
latch.countDown();
265266
};
@@ -395,7 +396,7 @@ void publishConsumeWithSimpleCodec() throws Exception {
395396
(client, subscriptionId, offset, messageCount1, dataSize) ->
396397
client.credit(subscriptionId, 1))
397398
.messageListener(
398-
(subscriptionId, offset, chunkTimestamp, message) -> {
399+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
399400
messageBodies.add(new String(message.getBodyAsBinary()));
400401
consumeLatch.countDown();
401402
}));
@@ -448,7 +449,7 @@ void batchPublishing() throws Exception {
448449
(client, subscriptionId, offset, messageCount1, dataSize) ->
449450
client.credit(subscriptionId, 1))
450451
.messageListener(
451-
(subscriptionId, offset, chunkTimestamp, message) -> {
452+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
452453
ByteBuffer bb = ByteBuffer.wrap(message.getBodyAsBinary());
453454
sizes.add(message.getBodyAsBinary().length);
454455
sequences.add(bb.getInt());
@@ -479,7 +480,7 @@ void consume() throws Exception {
479480

480481
AtomicLong chunkTimestamp = new AtomicLong();
481482
Client.MessageListener messageListener =
482-
(corr, offset, chkTimestamp, message) -> {
483+
(corr, offset, chkTimestamp, committedOffset, message) -> {
483484
chunkTimestamp.set(chkTimestamp);
484485
latch.countDown();
485486
};
@@ -515,7 +516,7 @@ void publishAndConsume(boolean directBuffer) throws Exception {
515516
};
516517

517518
Client.MessageListener messageListener =
518-
(corr, offset, chunkTimestamp, data) -> consumedLatch.countDown();
519+
(corr, offset, chunkTimestamp, committedOffset, data) -> consumedLatch.countDown();
519520

520521
Client client =
521522
cf.get(
@@ -659,7 +660,7 @@ void declareAmqpStreamQueueAndUseItAsStream(TestInfo info) throws Exception {
659660
(client1, subscriptionId, offset, messageCount1, dataSize) ->
660661
client1.credit(subscriptionId, 1))
661662
.messageListener(
662-
(subscriptionId, offset, chunkTimestamp, message) ->
663+
(subscriptionId, offset, chunkTimestamp, committedOffset, message) ->
663664
consumedLatch.countDown()));
664665
ConnectionFactory connectionFactory = new ConnectionFactory();
665666
try (Connection amqpConnection = connectionFactory.newConnection()) {
@@ -830,11 +831,50 @@ void closingPublisherWhilePublishingShouldNotCloseConnection(String publisherRef
830831
}
831832

832833
@Test
833-
@Disabled
834+
@BrokerVersionAtLeast("3.11.0")
834835
void exchangeCommandVersions() {
835836
Client client = cf.get();
836837
List<FrameHandlerInfo> infos = client.exchangeCommandVersions();
837838
assertThat(infos.stream().filter(info -> info.getKey() == Constants.COMMAND_DECLARE_PUBLISHER))
838839
.isNotEmpty();
839840
}
841+
842+
@Test
843+
@BrokerVersionAtLeast("3.11.0")
844+
void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception {
845+
int publishCount = 20_000;
846+
byte correlationId = 42;
847+
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);
848+
849+
CountDownLatch latch = new CountDownLatch(publishCount);
850+
851+
Client.ChunkListener chunkListener =
852+
(client, corr, offset, messageCountInChunk, dataSize) -> {
853+
client.credit(correlationId, 1);
854+
};
855+
856+
AtomicLong committedOffset = new AtomicLong();
857+
Client.MessageListener messageListener =
858+
(corr, offset, chkTimestamp, committedOfft, message) -> {
859+
committedOffset.set(committedOfft);
860+
latch.countDown();
861+
};
862+
863+
Client client =
864+
cf.get(
865+
new Client.ClientParameters()
866+
.chunkListener(chunkListener)
867+
.messageListener(messageListener));
868+
869+
client.exchangeCommandVersions();
870+
871+
Response response =
872+
client.subscribe(correlationId, stream, OffsetSpecification.first(), credit);
873+
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
874+
assertThat(response.isOk()).isTrue();
875+
876+
assertThat(latch.await(10, SECONDS)).isTrue();
877+
assertThat(committedOffset.get()).isPositive();
878+
client.close();
879+
}
840880
}

0 commit comments

Comments
 (0)