Skip to content

Include committed offset to message delivery callback #159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/main/java/com/rabbitmq/stream/MessageHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ interface Context {
*/
long timestamp();

/**
* The committed offset on this stream.
*
* <p>It is the offset of the last message confirmed by a quorum of the stream cluster members
* (leader and replicas).
*
* <p>The committed offset is a good indication of what the last offset of a stream is at a
* given time. The value can be stale as soon as the application reads it though, as the
* committed offset for a stream that is published to changes all the time.
*
* @return committed offset on this stream
*/
long committedOffset();

/**
* The consumer that receives the message.
*
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,12 @@ public interface ChunkListener {

public interface MessageListener {

void handle(byte subscriptionId, long offset, long chunkTimestamp, Message message);
void handle(
byte subscriptionId,
long offset,
long chunkTimestamp,
long committedOffset,
Message message);
}

public interface CreditNotification {
Expand Down Expand Up @@ -1964,7 +1969,7 @@ public static class ClientParameters {
private ChunkListener chunkListener =
(client, correlationId, offset, messageCount, dataSize) -> {};
private MessageListener messageListener =
(correlationId, offset, chunkTimestamp, message) -> {};
(correlationId, offset, chunkTimestamp, committedOffset, message) -> {};
private MetadataListener metadataListener = (stream, code) -> {};
private CreditNotification creditNotification =
(subscriptionId, responseCode) ->
Expand Down
26 changes: 23 additions & 3 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,14 @@ private static final class MessageHandlerContext implements Context {

private final long offset;
private final long timestamp;
private final long committedOffset;
private final Consumer consumer;

private MessageHandlerContext(long offset, long timestamp, Consumer consumer) {
private MessageHandlerContext(
long offset, long timestamp, long committedOffset, Consumer consumer) {
this.offset = offset;
this.timestamp = timestamp;
this.committedOffset = committedOffset;
this.consumer = consumer;
}

Expand All @@ -306,6 +309,11 @@ public long timestamp() {
return this.timestamp;
}

@Override
public long committedOffset() {
return committedOffset;
}

@Override
public Consumer consumer() {
return this.consumer;
Expand Down Expand Up @@ -430,14 +438,15 @@ private ClientSubscriptionsManager(
subscriptionId & 0xFF,
Utils.formatConstant(responseCode));
MessageListener messageListener =
(subscriptionId, offset, chunkTimestamp, message) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
SubscriptionTracker subscriptionTracker =
subscriptionTrackers.get(subscriptionId & 0xFF);
if (subscriptionTracker != null) {
subscriptionTracker.offset = offset;
subscriptionTracker.hasReceivedSomething = true;
subscriptionTracker.messageHandler.handle(
new MessageHandlerContext(offset, chunkTimestamp, subscriptionTracker.consumer),
new MessageHandlerContext(
offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer),
message);
// FIXME set offset here as well, best effort to avoid duplicates
} else {
Expand Down Expand Up @@ -550,6 +559,7 @@ private ClientSubscriptionsManager(
.metadataListener(metadataListener))
.key(owner.name);
this.client = clientFactory.client(clientFactoryContext);
maybeExchangeCommandVersions(client);
clientInitializedInManager.set(true);
}

Expand Down Expand Up @@ -839,4 +849,14 @@ public String toString() {
return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}';
}
}

private static void maybeExchangeCommandVersions(Client client) {
try {
if (Utils.is3_11_OrMore(client.brokerVersion())) {
client.exchangeCommandVersions();
}
} catch (Exception e) {
LOGGER.info("Error while exchanging command versions: {}", e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ static int handleMessage(
long offset,
long offsetLimit,
long chunkTimestamp,
long committedOffset,
Codec codec,
MessageListener messageListener,
byte subscriptionId) {
Expand All @@ -338,7 +339,7 @@ static int handleMessage(
messageFiltered.set(true);
} else {
Message message = codec.decode(data);
messageListener.handle(subscriptionId, offset, chunkTimestamp, message);
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedOffset, message);
}
return read;
}
Expand Down Expand Up @@ -377,7 +378,7 @@ static int handleDeliver(
ChunkChecksum chunkChecksum,
MetricsCollector metricsCollector,
byte subscriptionId,
long lastCommittedOffset,
long committedOffset,
int read) {
/*
%% <<
Expand Down Expand Up @@ -473,6 +474,7 @@ static int handleDeliver(
offset,
offsetLimit,
chunkTimestamp,
committedOffset,
codec,
messageListener,
subscriptionId);
Expand Down Expand Up @@ -538,6 +540,7 @@ static int handleDeliver(
offset,
offsetLimit,
chunkTimestamp,
committedOffset,
codec,
messageListener,
subscriptionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public long timestamp() {
return context.timestamp();
}

@Override
public long committedOffset() {
return context.committedOffset();
}

@Override
public void storeOffset() {
for (ConsumerState state : consumerStates) {
Expand Down
41 changes: 41 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,45 @@ static Function<ClientConnectionType, String> defaultConnectionNamingStrategy(St
static boolean offsetBefore(long x, long y) {
return Long.compareUnsigned(x, y) < 0;
}

private static String currentVersion(String currentVersion) {
// versions built from source: 3.7.0+rc.1.4.gedc5d96
if (currentVersion.contains("+")) {
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
}
// alpha (snapshot) versions: 3.7.0~alpha.449-1
if (currentVersion.contains("~")) {
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
}
// alpha (snapshot) versions: 3.7.1-alpha.40
if (currentVersion.contains("-")) {
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
}
return currentVersion;
}

/**
* https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
*/
static int versionCompare(String str1, String str2) {
String[] vals1 = str1.split("\\.");
String[] vals2 = str2.split("\\.");
int i = 0;
// set index to first non-equal ordinal or length of shortest version string
while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) {
i++;
}
// compare first non-equal ordinal number
if (i < vals1.length && i < vals2.length) {
int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i]));
return Integer.signum(diff);
}
// the strings are equal or one string is a substring of the other
// e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4"
return Integer.signum(vals1.length - vals2.length);
}

static boolean is3_11_OrMore(String brokerVersion) {
return versionCompare(currentVersion(brokerVersion), "3.11.0") >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception {
(client1, subscriptionId, offset, messageCount1, dataSize) ->
client1.credit(subscriptionId, 1))
.messageListener(
(subscriptionId, offset, chunkTimestamp, message) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
messages.add(message);
messageBodies.add(new String(message.getBodyAsBinary(), UTF8));
consumedLatch.countDown();
Expand Down
54 changes: 47 additions & 7 deletions src/test/java/com/rabbitmq/stream/impl/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.Client.StreamParametersBuilder;
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -259,7 +260,7 @@ void publishConsumeComplexMessage(
Client.ChunkListener chunkListener =
(client, correlationId, offset, messageCount, dataSize) -> client.credit(correlationId, 1);
Client.MessageListener messageListener =
(correlationId, offset, chunkTimestamp, message) -> {
(correlationId, offset, chunkTimestamp, committedOffset, message) -> {
messages.add(message);
latch.countDown();
};
Expand Down Expand Up @@ -395,7 +396,7 @@ void publishConsumeWithSimpleCodec() throws Exception {
(client, subscriptionId, offset, messageCount1, dataSize) ->
client.credit(subscriptionId, 1))
.messageListener(
(subscriptionId, offset, chunkTimestamp, message) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
messageBodies.add(new String(message.getBodyAsBinary()));
consumeLatch.countDown();
}));
Expand Down Expand Up @@ -448,7 +449,7 @@ void batchPublishing() throws Exception {
(client, subscriptionId, offset, messageCount1, dataSize) ->
client.credit(subscriptionId, 1))
.messageListener(
(subscriptionId, offset, chunkTimestamp, message) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
ByteBuffer bb = ByteBuffer.wrap(message.getBodyAsBinary());
sizes.add(message.getBodyAsBinary().length);
sequences.add(bb.getInt());
Expand Down Expand Up @@ -479,7 +480,7 @@ void consume() throws Exception {

AtomicLong chunkTimestamp = new AtomicLong();
Client.MessageListener messageListener =
(corr, offset, chkTimestamp, message) -> {
(corr, offset, chkTimestamp, committedOffset, message) -> {
chunkTimestamp.set(chkTimestamp);
latch.countDown();
};
Expand Down Expand Up @@ -515,7 +516,7 @@ void publishAndConsume(boolean directBuffer) throws Exception {
};

Client.MessageListener messageListener =
(corr, offset, chunkTimestamp, data) -> consumedLatch.countDown();
(corr, offset, chunkTimestamp, committedOffset, data) -> consumedLatch.countDown();

Client client =
cf.get(
Expand Down Expand Up @@ -659,7 +660,7 @@ void declareAmqpStreamQueueAndUseItAsStream(TestInfo info) throws Exception {
(client1, subscriptionId, offset, messageCount1, dataSize) ->
client1.credit(subscriptionId, 1))
.messageListener(
(subscriptionId, offset, chunkTimestamp, message) ->
(subscriptionId, offset, chunkTimestamp, committedOffset, message) ->
consumedLatch.countDown()));
ConnectionFactory connectionFactory = new ConnectionFactory();
try (Connection amqpConnection = connectionFactory.newConnection()) {
Expand Down Expand Up @@ -830,11 +831,50 @@ void closingPublisherWhilePublishingShouldNotCloseConnection(String publisherRef
}

@Test
@Disabled
@BrokerVersionAtLeast("3.11.0")
void exchangeCommandVersions() {
Client client = cf.get();
List<FrameHandlerInfo> infos = client.exchangeCommandVersions();
assertThat(infos.stream().filter(info -> info.getKey() == Constants.COMMAND_DECLARE_PUBLISHER))
.isNotEmpty();
}

@Test
@BrokerVersionAtLeast("3.11.0")
void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception {
int publishCount = 20_000;
byte correlationId = 42;
TestUtils.publishAndWaitForConfirms(cf, publishCount, stream);

CountDownLatch latch = new CountDownLatch(publishCount);

Client.ChunkListener chunkListener =
(client, corr, offset, messageCountInChunk, dataSize) -> {
client.credit(correlationId, 1);
};

AtomicLong committedOffset = new AtomicLong();
Client.MessageListener messageListener =
(corr, offset, chkTimestamp, committedOfft, message) -> {
committedOffset.set(committedOfft);
latch.countDown();
};

Client client =
cf.get(
new Client.ClientParameters()
.chunkListener(chunkListener)
.messageListener(messageListener));

client.exchangeCommandVersions();

Response response =
client.subscribe(correlationId, stream, OffsetSpecification.first(), credit);
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(response.isOk()).isTrue();

assertThat(latch.await(10, SECONDS)).isTrue();
assertThat(committedOffset.get()).isPositive();
client.close();
}
}
Loading