From 32fa40b342a88b5faac9bab4c840d0f604bb5b46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 26 Jul 2022 11:09:51 +0200 Subject: [PATCH 1/3] Include committed offset to message delivery callback Fixes #158 --- .../com/rabbitmq/stream/MessageHandler.java | 14 ++++++ .../java/com/rabbitmq/stream/impl/Client.java | 9 +++- .../stream/impl/ConsumersCoordinator.java | 15 ++++-- .../stream/impl/ServerFrameHandler.java | 7 ++- .../stream/impl/SuperStreamConsumer.java | 5 ++ .../stream/impl/AmqpInteroperabilityTest.java | 2 +- .../com/rabbitmq/stream/impl/ClientTest.java | 50 ++++++++++++++++--- .../stream/impl/ConsumersCoordinatorTest.java | 38 ++++++++------ .../rabbitmq/stream/impl/DeliveryTest.java | 2 +- .../stream/impl/MetricsCollectionTest.java | 7 +-- .../stream/impl/NotificationTest.java | 3 +- .../com/rabbitmq/stream/impl/OffsetTest.java | 16 +++--- .../impl/OffsetTrackingCoordinatorTest.java | 5 ++ .../stream/impl/OffsetTrackingTest.java | 6 +-- .../rabbitmq/stream/impl/PublisherTest.java | 4 +- .../stream/impl/RetentionClientTest.java | 2 +- .../stream/impl/SubEntryBatchingTest.java | 4 +- .../stream/impl/SubscriptionTest.java | 9 ++-- .../com/rabbitmq/stream/impl/TlsTest.java | 2 +- 19 files changed, 146 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/MessageHandler.java b/src/main/java/com/rabbitmq/stream/MessageHandler.java index 5557fe0be6..31b5b080ef 100644 --- a/src/main/java/com/rabbitmq/stream/MessageHandler.java +++ b/src/main/java/com/rabbitmq/stream/MessageHandler.java @@ -53,6 +53,20 @@ interface Context { */ long timestamp(); + /** + * The committed offset on this stream. + * + *

It is the offset of the last message confirmed by a quorum of the stream cluster members + * (leader and replicas). + * + *

The committed offset is a good indication of what the last offset of a stream is at a + * given time. The value can be stale as soon as the application reads it though, as the + * committed offset for a stream that is published to changes all the time. + * + * @return committed offset on this stream + */ + long committedOffset(); + /** * The consumer that receives the message. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 9e99d3acb6..8aac0c7743 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -1443,7 +1443,12 @@ public interface ChunkListener { public interface MessageListener { - void handle(byte subscriptionId, long offset, long chunkTimestamp, Message message); + void handle( + byte subscriptionId, + long offset, + long chunkTimestamp, + long committedOffset, + Message message); } public interface CreditNotification { @@ -1964,7 +1969,7 @@ public static class ClientParameters { private ChunkListener chunkListener = (client, correlationId, offset, messageCount, dataSize) -> {}; private MessageListener messageListener = - (correlationId, offset, chunkTimestamp, message) -> {}; + (correlationId, offset, chunkTimestamp, committedOffset, message) -> {}; private MetadataListener metadataListener = (stream, code) -> {}; private CreditNotification creditNotification = (subscriptionId, responseCode) -> diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index f97df336a8..995b69b5ed 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -283,12 +283,15 @@ private static final class MessageHandlerContext implements Context { private final long offset; private final long timestamp; + private final long committedOffset; private final Consumer consumer; - private MessageHandlerContext(long offset, long timestamp, Consumer consumer) { + private MessageHandlerContext( + long offset, long timestamp, long committedOffset, Consumer consumer) { this.offset = offset; this.timestamp = timestamp; this.consumer = consumer; + this.committedOffset = committedOffset; } @Override @@ -306,6 +309,11 @@ public long timestamp() { return this.timestamp; } + @Override + public long committedOffset() { + return committedOffset; + } + @Override public Consumer consumer() { return this.consumer; @@ -430,14 +438,15 @@ private ClientSubscriptionsManager( subscriptionId & 0xFF, Utils.formatConstant(responseCode)); MessageListener messageListener = - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, committedOffset, chunkTimestamp, message) -> { SubscriptionTracker subscriptionTracker = subscriptionTrackers.get(subscriptionId & 0xFF); if (subscriptionTracker != null) { subscriptionTracker.offset = offset; subscriptionTracker.hasReceivedSomething = true; subscriptionTracker.messageHandler.handle( - new MessageHandlerContext(offset, chunkTimestamp, subscriptionTracker.consumer), + new MessageHandlerContext( + offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer), message); // FIXME set offset here as well, best effort to avoid duplicates } else { diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java index 52af554d2e..3bc2d87bb6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java +++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java @@ -325,6 +325,7 @@ static int handleMessage( long offset, long offsetLimit, long chunkTimestamp, + long committedOffset, Codec codec, MessageListener messageListener, byte subscriptionId) { @@ -338,7 +339,7 @@ static int handleMessage( messageFiltered.set(true); } else { Message message = codec.decode(data); - messageListener.handle(subscriptionId, offset, chunkTimestamp, message); + messageListener.handle(subscriptionId, offset, chunkTimestamp, committedOffset, message); } return read; } @@ -377,7 +378,7 @@ static int handleDeliver( ChunkChecksum chunkChecksum, MetricsCollector metricsCollector, byte subscriptionId, - long lastCommittedOffset, + long committedOffset, int read) { /* %% << @@ -473,6 +474,7 @@ static int handleDeliver( offset, offsetLimit, chunkTimestamp, + committedOffset, codec, messageListener, subscriptionId); @@ -538,6 +540,7 @@ static int handleDeliver( offset, offsetLimit, chunkTimestamp, + committedOffset, codec, messageListener, subscriptionId); diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java index 723c4f7972..100eecb559 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java @@ -115,6 +115,11 @@ public long timestamp() { return context.timestamp(); } + @Override + public long committedOffset() { + return context.committedOffset(); + } + @Override public void storeOffset() { for (ConsumerState state : consumerStates) { diff --git a/src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java index 1161938d40..46819bf858 100644 --- a/src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/AmqpInteroperabilityTest.java @@ -210,7 +210,7 @@ void publishToStreamQueueConsumeFromStream(Codec codec) throws Exception { (client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { messages.add(message); messageBodies.add(new String(message.getBodyAsBinary(), UTF8)); consumedLatch.countDown(); diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 44d55475a8..8b2bf2bcf3 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -259,7 +259,7 @@ void publishConsumeComplexMessage( Client.ChunkListener chunkListener = (client, correlationId, offset, messageCount, dataSize) -> client.credit(correlationId, 1); Client.MessageListener messageListener = - (correlationId, offset, chunkTimestamp, message) -> { + (correlationId, offset, chunkTimestamp, committedOffset, message) -> { messages.add(message); latch.countDown(); }; @@ -395,7 +395,7 @@ void publishConsumeWithSimpleCodec() throws Exception { (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { messageBodies.add(new String(message.getBodyAsBinary())); consumeLatch.countDown(); })); @@ -448,7 +448,7 @@ void batchPublishing() throws Exception { (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { ByteBuffer bb = ByteBuffer.wrap(message.getBodyAsBinary()); sizes.add(message.getBodyAsBinary().length); sequences.add(bb.getInt()); @@ -479,7 +479,7 @@ void consume() throws Exception { AtomicLong chunkTimestamp = new AtomicLong(); Client.MessageListener messageListener = - (corr, offset, chkTimestamp, message) -> { + (corr, offset, chkTimestamp, committedOffset, message) -> { chunkTimestamp.set(chkTimestamp); latch.countDown(); }; @@ -515,7 +515,7 @@ void publishAndConsume(boolean directBuffer) throws Exception { }; Client.MessageListener messageListener = - (corr, offset, chunkTimestamp, data) -> consumedLatch.countDown(); + (corr, offset, chunkTimestamp, committedOffset, data) -> consumedLatch.countDown(); Client client = cf.get( @@ -659,7 +659,7 @@ void declareAmqpStreamQueueAndUseItAsStream(TestInfo info) throws Exception { (client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> consumedLatch.countDown())); ConnectionFactory connectionFactory = new ConnectionFactory(); try (Connection amqpConnection = connectionFactory.newConnection()) { @@ -837,4 +837,42 @@ void exchangeCommandVersions() { assertThat(infos.stream().filter(info -> info.getKey() == Constants.COMMAND_DECLARE_PUBLISHER)) .isNotEmpty(); } + + @Test + void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception { + int publishCount = 20_000; + byte correlationId = 42; + TestUtils.publishAndWaitForConfirms(cf, publishCount, stream); + + CountDownLatch latch = new CountDownLatch(publishCount); + + Client.ChunkListener chunkListener = + (client, corr, offset, messageCountInChunk, dataSize) -> { + client.credit(correlationId, 1); + }; + + AtomicLong committedOffset = new AtomicLong(); + Client.MessageListener messageListener = + (corr, offset, chkTimestamp, committedOfft, message) -> { + committedOffset.set(committedOfft); + latch.countDown(); + }; + + Client client = + cf.get( + new Client.ClientParameters() + .chunkListener(chunkListener) + .messageListener(messageListener)); + + client.exchangeCommandVersions(); + + Response response = + client.subscribe(correlationId, stream, OffsetSpecification.first(), credit); + assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(response.isOk()).isTrue(); + + assertThat(latch.await(10, SECONDS)).isTrue(); + assertThat(committedOffset.get()).isPositive(); + client.close(); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 8b5ffebc2b..14d1d674ff 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -395,7 +395,7 @@ void subscribeShouldSubscribeToStreamAndDispatchMessage_UnsubscribeShouldUnsubsc assertThat(messageHandlerCalls.get()).isEqualTo(0); messageListener.handle( - subscriptionIdCaptor.getValue(), 0, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(1); when(client.unsubscribe(subscriptionIdCaptor.getValue())) @@ -405,7 +405,7 @@ void subscribeShouldSubscribeToStreamAndDispatchMessage_UnsubscribeShouldUnsubsc verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue()); messageListener.handle( - subscriptionIdCaptor.getValue(), 0, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(1); } @@ -449,7 +449,7 @@ void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() { .forEach( subscriptionId -> { messageListener.handle( - subscriptionId, 0, 0, new WrapperMessageBuilder().build()); + subscriptionId, 0, 0, 0, new WrapperMessageBuilder().build()); }); messageToEachSubscription.run(); assertThat(messageHandlerCalls).hasSize(ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT); @@ -508,7 +508,7 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception { assertThat(messageHandlerCalls.get()).isEqualTo(0); messageListener.handle( - subscriptionIdCaptor.getAllValues().get(0), 1, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getAllValues().get(0), 1, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(1); coordinator.subscribe( @@ -533,7 +533,7 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception { assertThat(messageHandlerCalls.get()).isEqualTo(1); messageListener.handle( - subscriptionIdCaptor.getAllValues().get(0), 0, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getAllValues().get(0), 0, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(2); when(client.unsubscribe(subscriptionIdCaptor.getValue())) @@ -543,7 +543,7 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception { verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue()); messageListener.handle( - subscriptionIdCaptor.getValue(), 0, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(2); } @@ -596,7 +596,11 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception { assertThat(messageHandlerCalls.get()).isEqualTo(0); firstMessageListener() .handle( - subscriptionIdCaptor.getAllValues().get(0), 1, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getAllValues().get(0), + 1, + 0, + 0, + new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(1); this.metadataListeners.forEach( @@ -611,7 +615,11 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception { assertThat(messageHandlerCalls.get()).isEqualTo(1); lastMessageListener() .handle( - subscriptionIdCaptor.getAllValues().get(0), 0, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getAllValues().get(0), + 0, + 0, + 0, + new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(2); when(client.unsubscribe(subscriptionIdCaptor.getValue())) @@ -621,7 +629,7 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception { verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue()); lastMessageListener() - .handle(subscriptionIdCaptor.getValue(), 0, 0, new WrapperMessageBuilder().build()); + .handle(subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(2); assertThat(coordinator.poolSize()).isZero(); @@ -664,7 +672,7 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti assertThat(messageHandlerCalls.get()).isEqualTo(0); messageListener.handle( - subscriptionIdCaptor.getValue(), 1, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 1, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(1); metadataListener.handle("stream", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE); @@ -676,7 +684,7 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti assertThat(messageHandlerCalls.get()).isEqualTo(1); messageListener.handle( - subscriptionIdCaptor.getValue(), 0, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(2); when(client.unsubscribe(subscriptionIdCaptor.getValue())) @@ -686,7 +694,7 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue()); messageListener.handle( - subscriptionIdCaptor.getValue(), 0, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 0, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(2); assertThat(coordinator.poolSize()).isZero(); @@ -726,7 +734,7 @@ void metadataUpdate_shouldCloseConsumerIfStreamIsDeleted() throws Exception { assertThat(messageHandlerCalls.get()).isEqualTo(0); messageListener.handle( - subscriptionIdCaptor.getValue(), 1, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 1, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(1); metadataListener.handle("stream", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE); @@ -776,7 +784,7 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio assertThat(messageHandlerCalls.get()).isEqualTo(0); messageListener.handle( - subscriptionIdCaptor.getValue(), 1, 0, new WrapperMessageBuilder().build()); + subscriptionIdCaptor.getValue(), 1, 0, 0, new WrapperMessageBuilder().build()); assertThat(messageHandlerCalls.get()).isEqualTo(1); metadataListener.handle("stream", Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE); @@ -1033,6 +1041,7 @@ void shouldRestartWhereItLeftOffAfterDisruption(Consumer configur subscriptionIdCaptor.getValue(), lastReceivedOffset, 0, + 0, new WrapperMessageBuilder().build()); configurator.accept(this); diff --git a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java index 84d8311770..484ac2c737 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java @@ -137,7 +137,7 @@ public TestConfig(long chunkOffset, long subscriptionOffset) { assertThat(messageCount).isEqualTo(nbMessages); chunkCountInCallback.incrementAndGet(); }, - (subscriptionId, offset, chunkTimestamp, message) -> + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> messageCountInCallback.incrementAndGet(), NO_OP_CODEC, subscriptionOffsets, diff --git a/src/test/java/com/rabbitmq/stream/impl/MetricsCollectionTest.java b/src/test/java/com/rabbitmq/stream/impl/MetricsCollectionTest.java index 46d9c4c27a..64292ee1e1 100644 --- a/src/test/java/com/rabbitmq/stream/impl/MetricsCollectionTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/MetricsCollectionTest.java @@ -100,7 +100,8 @@ void publishConfirmChunkConsumeShouldBeCollected() throws Exception { (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> consumeLatch.countDown())); + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> + consumeLatch.countDown())); Client.Response response = consumer.subscribe(b(1), stream, OffsetSpecification.first(), 10); assertThat(response.isOk()).isTrue(); @@ -176,7 +177,7 @@ void publishConfirmChunkConsumeShouldBeCollectedWithBatchEntryPublishing() throw (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { consumeLatch.countDown(); })); @@ -219,7 +220,7 @@ void filteredSmallerOffsetsInChunksShouldNotBeCounted() throws Exception { new Client( new ClientParameters() .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { counts.get(subscriptionId).incrementAndGet(); if (message.getProperties().getMessageIdAsLong() == messageCount) { latches.get(subscriptionId).countDown(); diff --git a/src/test/java/com/rabbitmq/stream/impl/NotificationTest.java b/src/test/java/com/rabbitmq/stream/impl/NotificationTest.java index 87ad8bba07..c2a991d7d2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/NotificationTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/NotificationTest.java @@ -134,7 +134,8 @@ void consumerIsNotifiedIfStreamIsDeleted() throws Exception { cf.get( new Client.ClientParameters() .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> consumeLatch.countDown()) + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> + consumeLatch.countDown()) .metadataListener( (stream, code) -> { receivedStream.set(stream); diff --git a/src/test/java/com/rabbitmq/stream/impl/OffsetTest.java b/src/test/java/com/rabbitmq/stream/impl/OffsetTest.java index d1fd825be9..6e149e100e 100644 --- a/src/test/java/com/rabbitmq/stream/impl/OffsetTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/OffsetTest.java @@ -99,7 +99,7 @@ void offsetTypeFirstShouldStartConsumingFromBeginning( (client1, subscriptionId, offset12, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset1, chunkTimestamp, message) -> { + (subscriptionId, offset1, chunkTimestamp, committedOffset, message) -> { first.compareAndSet(-1, offset1); last.set(offset1); latch.countDown(); @@ -162,7 +162,7 @@ void offsetTypeLastShouldReturnLastChunk( chunkCount.incrementAndGet(); }) .messageListener( - (subscriptionId, offset1, chunkTimestamp, message) -> { + (subscriptionId, offset1, chunkTimestamp, committedOffset, message) -> { first.compareAndSet(-1, offset1); last.set(offset1); if (offset1 == lastOffset) { @@ -240,7 +240,7 @@ void offsetTypeNextShouldReturnNewPublishedMessages( (client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset1, chunkTimestamp, message) -> { + (subscriptionId, offset1, chunkTimestamp, committedOffset, message) -> { first.compareAndSet(-1, offset1); last.set(offset1); if (offset1 == lastOffset) { @@ -308,7 +308,7 @@ void offsetTypeOffsetShouldStartConsumingFromOffset( (client1, subscriptionId, offset12, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset1, chunkTimestamp, message) -> { + (subscriptionId, offset1, chunkTimestamp, committedOffset, message) -> { first.compareAndSet(-1, offset1); last.set(offset1); latch.countDown(); @@ -373,7 +373,7 @@ void offsetTypeTimestampShouldStartConsumingFromTimestamp( (client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset1, chunkTimestamp, message) -> { + (subscriptionId, offset1, chunkTimestamp, committedOffset, message) -> { first.compareAndSet(-1, offset1); last.set(offset1); consumed.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); @@ -446,7 +446,7 @@ void filterSmallerOffsets() throws Exception { new Client( new Client.ClientParameters() .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { if (firstOffsets.get(subscriptionId) == null) { firstOffsets.put(subscriptionId, offset); } @@ -507,7 +507,7 @@ void consumeFromTail(SslContext sslContext) throws Exception { (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { consumed.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); consumedLatch.countDown(); })); @@ -559,7 +559,7 @@ void shouldReachTailWhenPublisherStopWhileConsumerIsBehind(SslContext sslContext .chunkListener( (client, subscriptionId, offset, msgCount, dataSize) -> client.credit(b(0), 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { lastConsumedMessage.set(new String(message.getBodyAsBinary())); consumedMessagesLatch.countDown(); })); diff --git a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java index 3d31425ddd..e2ffaac901 100644 --- a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingCoordinatorTest.java @@ -349,6 +349,11 @@ public long timestamp() { return 0; } + @Override + public long committedOffset() { + return 0; + } + @Override public com.rabbitmq.stream.Consumer consumer() { return consumer; diff --git a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java index 31769368e7..4caa56f12a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java @@ -182,7 +182,7 @@ void consumeAndStore(BiConsumer streamCreator, TestInfo info) th Collection messageIdsCollection = new ConcurrentLinkedQueue<>(); CountDownLatch consumeLatch = new CountDownLatch(1); MessageListener messageListener = - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { if (consumeCount.get() <= consumeCountFirst) { consumeCount.incrementAndGet(); long messageId = message.getProperties().getMessageIdAsLong(); @@ -238,7 +238,7 @@ void consumeAndStore(BiConsumer streamCreator, TestInfo info) th AtomicLong firstOffset = new AtomicLong(-1); messageListener = - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { firstOffset.compareAndSet(-1, offset); long messageId = message.getProperties().getMessageIdAsLong(); if (lastConsumedMessageId.get() < messageId) { @@ -318,7 +318,7 @@ void storeOffsetAndThenAttachByTimestampShouldWork() throws Exception { (client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> consumed.incrementAndGet())); assertThat(client.declarePublisher((byte) 0, null, stream).isOk()).isTrue(); diff --git a/src/test/java/com/rabbitmq/stream/impl/PublisherTest.java b/src/test/java/com/rabbitmq/stream/impl/PublisherTest.java index 278d946eec..f3c07604c6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/PublisherTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/PublisherTest.java @@ -64,7 +64,7 @@ void declarePublisher(String publisherReference) throws Exception { (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> consumerLatch.countDown())); Response response = c.declarePublisher(b(1), publisherReference, stream); @@ -170,7 +170,7 @@ void deduplication( (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { consumeCount.incrementAndGet(); consumeLatch.countDown(); })); diff --git a/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java b/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java index 42ec3a7979..5fcda2a0b6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RetentionClientTest.java @@ -219,7 +219,7 @@ void retention(RetentionTestConfig configuration, TestInfo info) throws Exceptio (client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { long messageId = message.getProperties().getMessageIdAsLong(); firstMessageId.compareAndSet(-1, messageId); if (messageId == publishSequence.get() - 1) { diff --git a/src/test/java/com/rabbitmq/stream/impl/SubEntryBatchingTest.java b/src/test/java/com/rabbitmq/stream/impl/SubEntryBatchingTest.java index 89b84d0e34..b31c04f97c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SubEntryBatchingTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SubEntryBatchingTest.java @@ -110,7 +110,7 @@ void publishConsumeCompressedMessages( (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { consumedBodies.add(new String(message.getBodyAsBinary(), UTF8)); consumeLatch.countDown(); }) @@ -189,7 +189,7 @@ void subEntriesCompressedWithDifferentCompressionsShouldBeReadCorrectly() { (client, subscriptionId, offset, messageCount1, dataSize) -> client.credit(subscriptionId, 1)) .messageListener( - (subscriptionId, offset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { consumedBodies.add(new String(message.getBodyAsBinary(), UTF8)); consumeLatch.countDown(); })); diff --git a/src/test/java/com/rabbitmq/stream/impl/SubscriptionTest.java b/src/test/java/com/rabbitmq/stream/impl/SubscriptionTest.java index ced78ce6a4..b2652cbf4e 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SubscriptionTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SubscriptionTest.java @@ -53,7 +53,7 @@ void severalSubscriptionsInSameConnection() throws Exception { cf.get( new Client.ClientParameters() .messageListener( - (correlationId, offset, chunkTimestamp, message) -> { + (correlationId, offset, chunkTimestamp, committedOffset, message) -> { messageCounts .computeIfAbsent(correlationId, k -> new AtomicInteger(0)) .incrementAndGet(); @@ -124,7 +124,7 @@ void unsubscribeShouldNotReceiveMoreMessageAfterUnsubscribe() throws Exception { cf.get( new Client.ClientParameters() .messageListener( - (correlationId, offset, chunkTimestamp, message) -> { + (correlationId, offset, chunkTimestamp, committedOffset, message) -> { receivedMessageCount.incrementAndGet(); latch.countDown(); })); @@ -148,7 +148,8 @@ void unsubscribeShouldNotReceiveMoreMessageAfterUnsubscribe() throws Exception { cf.get( new Client.ClientParameters() .messageListener( - (correlationId, offset, chunkTimestamp, message) -> latch2.countDown())); + (correlationId, offset, chunkTimestamp, committedOffset, message) -> + latch2.countDown())); client2.subscribe(b(1), stream, OffsetSpecification.first(), messageCount * 100); client.declarePublisher(b(1), null, stream); IntStream.range(0, messageCount) @@ -174,7 +175,7 @@ void unsubscribeTwoSubscriptionsOneIsCancelled() throws Exception { cf.get( new Client.ClientParameters() .messageListener( - (correlationId, offset, chunkTimestamp, message) -> { + (correlationId, offset, chunkTimestamp, committedOffset, message) -> { messageCounts .computeIfAbsent(correlationId, k -> new AtomicInteger(0)) .incrementAndGet(); diff --git a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java index 851f362723..70d0c2321d 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java @@ -161,7 +161,7 @@ void publishAndConsumeWithUnverifiedConnection() throws Exception { }; Client.MessageListener messageListener = - (corr, offset, chunkTimestamp, data) -> consumedLatch.countDown(); + (corr, offset, chunkTimestamp, committedOffset, data) -> consumedLatch.countDown(); Client client = cf.get( From 4604b0e33e888deb85ecfe3b93f5800da5ef5aa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 26 Jul 2022 17:08:35 +0200 Subject: [PATCH 2/3] Exchange command versions if possible For consumer connections, on RabbitMQ 3.11+. This is to benefit from deliver v2 if possible. References #158 --- .../stream/impl/ConsumersCoordinator.java | 15 ++++++- .../java/com/rabbitmq/stream/impl/Utils.java | 41 +++++++++++++++++++ .../com/rabbitmq/stream/impl/ClientTest.java | 3 +- .../stream/impl/StreamConsumerTest.java | 25 +++++++++++ .../com/rabbitmq/stream/impl/TestUtils.java | 23 +---------- .../com/rabbitmq/stream/impl/UtilsTest.java | 22 ++++++++++ 6 files changed, 105 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 995b69b5ed..62edd061c8 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -290,8 +290,8 @@ private MessageHandlerContext( long offset, long timestamp, long committedOffset, Consumer consumer) { this.offset = offset; this.timestamp = timestamp; - this.consumer = consumer; this.committedOffset = committedOffset; + this.consumer = consumer; } @Override @@ -438,7 +438,7 @@ private ClientSubscriptionsManager( subscriptionId & 0xFF, Utils.formatConstant(responseCode)); MessageListener messageListener = - (subscriptionId, offset, committedOffset, chunkTimestamp, message) -> { + (subscriptionId, offset, chunkTimestamp, committedOffset, message) -> { SubscriptionTracker subscriptionTracker = subscriptionTrackers.get(subscriptionId & 0xFF); if (subscriptionTracker != null) { @@ -559,6 +559,7 @@ private ClientSubscriptionsManager( .metadataListener(metadataListener)) .key(owner.name); this.client = clientFactory.client(clientFactoryContext); + maybeExchangeCommandVersions(client); clientInitializedInManager.set(true); } @@ -848,4 +849,14 @@ public String toString() { return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}'; } } + + private static void maybeExchangeCommandVersions(Client client) { + try { + if (Utils.is3_11_OrMore(client.brokerVersion())) { + client.exchangeCommandVersions(); + } + } catch (Exception e) { + LOGGER.info("Error while exchanging command versions: {}", e.getMessage()); + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 869ba1b6e6..3fa2e1d2ae 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -240,4 +240,45 @@ static Function defaultConnectionNamingStrategy(St static boolean offsetBefore(long x, long y) { return Long.compareUnsigned(x, y) < 0; } + + private static String currentVersion(String currentVersion) { + // versions built from source: 3.7.0+rc.1.4.gedc5d96 + if (currentVersion.contains("+")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("+")); + } + // alpha (snapshot) versions: 3.7.0~alpha.449-1 + if (currentVersion.contains("~")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("~")); + } + // alpha (snapshot) versions: 3.7.1-alpha.40 + if (currentVersion.contains("-")) { + currentVersion = currentVersion.substring(0, currentVersion.indexOf("-")); + } + return currentVersion; + } + + /** + * https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java + */ + static int versionCompare(String str1, String str2) { + String[] vals1 = str1.split("\\."); + String[] vals2 = str2.split("\\."); + int i = 0; + // set index to first non-equal ordinal or length of shortest version string + while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) { + i++; + } + // compare first non-equal ordinal number + if (i < vals1.length && i < vals2.length) { + int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i])); + return Integer.signum(diff); + } + // the strings are equal or one string is a substring of the other + // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" + return Integer.signum(vals1.length - vals2.length); + } + + static boolean is3_11_OrMore(String brokerVersion) { + return versionCompare(currentVersion(brokerVersion), "3.11.0") >= 0; + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 8b2bf2bcf3..67528ee946 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -37,6 +37,7 @@ import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.Client.StreamParametersBuilder; import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo; +import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; import java.io.ByteArrayOutputStream; @@ -830,7 +831,7 @@ void closingPublisherWhilePublishingShouldNotCloseConnection(String publisherRef } @Test - @Disabled + @BrokerVersionAtLeast("3.11.0") void exchangeCommandVersions() { Client client = cf.get(); List infos = client.exchangeCommandVersions(); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index ad02f9dad2..7e85f43b07 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -35,6 +35,7 @@ import com.rabbitmq.stream.StreamDoesNotExistException; import com.rabbitmq.stream.impl.Client.QueryOffsetResponse; import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerInfo; +import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet; import io.netty.channel.EventLoopGroup; import java.time.Duration; @@ -146,6 +147,30 @@ void nameShouldBeSetIfTrackingStrategyIsSet() { }); } + @Test + @BrokerVersionAtLeast("3.11.0") + void committedOffsetShouldBeSet() throws Exception { + int messageCount = 20_000; + TestUtils.publishAndWaitForConfirms(cf, messageCount, this.stream); + + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + AtomicLong committedOffset = new AtomicLong(); + Consumer consumer = + environment.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .messageHandler( + (context, message) -> { + committedOffset.set(context.committedOffset()); + consumeLatch.countDown(); + }) + .build(); + + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(committedOffset.get()).isNotZero(); + + consumer.close(); + } + @Test void consume() throws Exception { int messageCount = 100_000; diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index c9180c73ce..2775240f91 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -414,7 +414,7 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) { } try { currentVersion = currentVersion(currentVersion); - return "0.0.0".equals(currentVersion) || versionCompare(currentVersion, expectedVersion) >= 0; + return "0.0.0".equals(currentVersion) || Utils.versionCompare(currentVersion, expectedVersion) >= 0; } catch (RuntimeException e) { LoggerFactory.getLogger(TestUtils.class) .warn("Unable to parse broker version {}", currentVersion, e); @@ -422,26 +422,7 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) { } } - /** - * https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java - */ - static int versionCompare(String str1, String str2) { - String[] vals1 = str1.split("\\."); - String[] vals2 = str2.split("\\."); - int i = 0; - // set index to first non-equal ordinal or length of shortest version string - while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) { - i++; - } - // compare first non-equal ordinal number - if (i < vals1.length && i < vals2.length) { - int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i])); - return Integer.signum(diff); - } - // the strings are equal or one string is a substring of the other - // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" - return Integer.signum(vals1.length - vals2.length); - } + @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) diff --git a/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java b/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java index bf40854cf7..107db651e2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java @@ -37,6 +37,8 @@ import java.util.function.Predicate; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; public class UtilsTest { @@ -102,4 +104,24 @@ void testOffsetBefore() { assertThat(offsetBefore(Long.MAX_VALUE + 10, Long.MAX_VALUE + 10)).isFalse(); assertThat(offsetBefore(Long.MAX_VALUE + 10, Long.MAX_VALUE + 20)).isTrue(); } + + @ParameterizedTest + @CsvSource({ + "3.8.0+rc.1.2186.g95f3fde,false", + "3.9.21,false", + "3.9.22-alpha.13,false", + "3.10.6,false", + "3.11.0-alpha.15,true", + "3.11.0,true", + "3.11.1,true", + "4.0.0-alpha.15,true", + "4.0.0,true", + "4.0.1,true", + "4.1.0-alpha.15,true", + "4.1.0,true", + "4.1.1,true", + }) + void is_3_11_OrMore(String input, boolean expected) { + assertThat(Utils.is3_11_OrMore(input)).isEqualTo(expected); + } } From c6168ae849e41730c90c91edc8be719b57e80909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 26 Jul 2022 17:22:20 +0200 Subject: [PATCH 3/3] Enable test only on RabbitMQ 3.11+ References #158 --- src/test/java/com/rabbitmq/stream/impl/ClientTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java index 67528ee946..c9c560e5a8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ClientTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ClientTest.java @@ -840,6 +840,7 @@ void exchangeCommandVersions() { } @Test + @BrokerVersionAtLeast("3.11.0") void deliverVersion2LastCommittedOffsetShouldBeSet() throws Exception { int publishCount = 20_000; byte correlationId = 42;