From 03825a9887ee930b576daa581b3d75f7931a5ceb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Oct 2021 14:53:33 +0200 Subject: [PATCH 1/5] Add SubscriptionListener References #38 --- .../com/rabbitmq/stream/ConsumerBuilder.java | 2 + .../rabbitmq/stream/SubscriptionListener.java | 26 +++ .../stream/impl/ConsumersCoordinator.java | 49 +++++- .../rabbitmq/stream/impl/StreamConsumer.java | 5 +- .../stream/impl/StreamConsumerBuilder.java | 14 +- .../stream/impl/StreamEnvironment.java | 9 +- .../stream/impl/ConsumersCoordinatorTest.java | 114 +++++++++++-- .../stream/impl/StreamConsumerTest.java | 158 ++++++++++++++++++ .../com/rabbitmq/stream/impl/TestUtils.java | 5 + 9 files changed, 362 insertions(+), 20 deletions(-) create mode 100644 src/main/java/com/rabbitmq/stream/SubscriptionListener.java diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index 61c3b2197a..e9f2977e30 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -64,6 +64,8 @@ public interface ConsumerBuilder { */ ConsumerBuilder name(String name); + ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener); + /** * Enable {@link ManualTrackingStrategy}. * diff --git a/src/main/java/com/rabbitmq/stream/SubscriptionListener.java b/src/main/java/com/rabbitmq/stream/SubscriptionListener.java new file mode 100644 index 0000000000..e1be84c966 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/SubscriptionListener.java @@ -0,0 +1,26 @@ +// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream; + +public interface SubscriptionListener { + + void preSubscribe(SubscriptionContext subscriptionContext); + + interface SubscriptionContext { + + OffsetSpecification offsetSpecification(); + + void offsetSpecification(OffsetSpecification offsetSpecification); + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 158812e647..4f3f8a7a4e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -23,6 +23,8 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamDoesNotExistException; import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.SubscriptionListener; +import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext; import com.rabbitmq.stream.impl.Client.ChunkListener; import com.rabbitmq.stream.impl.Client.CreditNotification; import com.rabbitmq.stream.impl.Client.MessageListener; @@ -83,6 +85,7 @@ Runnable subscribe( String stream, OffsetSpecification offsetSpecification, String trackingReference, + SubscriptionListener subscriptionListener, MessageHandler messageHandler) { // FIXME fail immediately if there's no locator (can provide a supplier that does not retry) List candidates = findBrokersForStream(stream); @@ -95,7 +98,12 @@ Runnable subscribe( // we keep this instance when we move the subscription from a client to another one SubscriptionTracker subscriptionTracker = new SubscriptionTracker( - consumer, stream, offsetSpecification, trackingReference, messageHandler); + consumer, + stream, + offsetSpecification, + trackingReference, + subscriptionListener, + messageHandler); String key = keyForClientSubscription(newNode); @@ -212,6 +220,7 @@ private static class SubscriptionTracker { private final String offsetTrackingReference; private final MessageHandler messageHandler; private final StreamConsumer consumer; + private final SubscriptionListener subscriptionListener; private volatile long offset; private volatile boolean hasReceivedSomething = false; private volatile byte subscriptionIdInClient; @@ -223,11 +232,13 @@ private SubscriptionTracker( String stream, OffsetSpecification initialOffsetSpecification, String offsetTrackingReference, + SubscriptionListener subscriptionListener, MessageHandler messageHandler) { this.consumer = consumer; this.stream = stream; this.initialOffsetSpecification = initialOffsetSpecification; this.offsetTrackingReference = offsetTrackingReference; + this.subscriptionListener = subscriptionListener; this.messageHandler = messageHandler; } @@ -635,7 +646,7 @@ synchronized void add( update(previousSubscriptions, subscriptionId, subscriptionTracker); String offsetTrackingReference = subscriptionTracker.offsetTrackingReference; - if (subscriptionTracker.offsetTrackingReference != null) { + if (offsetTrackingReference != null) { long trackedOffset = client.queryOffset(offsetTrackingReference, subscriptionTracker.stream); if (trackedOffset != 0) { @@ -666,12 +677,20 @@ synchronized void add( subscriptionProperties.put("name", subscriptionTracker.offsetTrackingReference); } + SubscriptionContext subscriptionContext = + new DefaultSubscriptionContext(offsetSpecification); + subscriptionTracker.subscriptionListener.preSubscribe(subscriptionContext); + LOGGER.info( + "Computed offset specification {}, offset specification used after subscription listener {}", + offsetSpecification, + subscriptionContext.offsetSpecification()); + // FIXME consider using fewer initial credits Client.Response subscribeResponse = client.subscribe( subscriptionId, subscriptionTracker.stream, - offsetSpecification, + subscriptionContext.offsetSpecification(), 10, subscriptionProperties); if (!subscribeResponse.isOk()) { @@ -767,4 +786,28 @@ synchronized void close() { } } } + + private static final class DefaultSubscriptionContext implements SubscriptionContext { + + private volatile OffsetSpecification offsetSpecification; + + private DefaultSubscriptionContext(OffsetSpecification computedOffsetSpecification) { + this.offsetSpecification = computedOffsetSpecification; + } + + @Override + public OffsetSpecification offsetSpecification() { + return this.offsetSpecification; + } + + @Override + public void offsetSpecification(OffsetSpecification offsetSpecification) { + this.offsetSpecification = offsetSpecification; + } + + @Override + public String toString() { + return "SubscriptionContext{" + "offsetSpecification=" + offsetSpecification + '}'; + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index 996b6ef639..b330b832ae 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -17,6 +17,7 @@ import com.rabbitmq.stream.MessageHandler; import com.rabbitmq.stream.MessageHandler.Context; import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration; import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,7 +58,8 @@ class StreamConsumer implements Consumer { String name, StreamEnvironment environment, TrackingConfiguration trackingConfiguration, - boolean lazyInit) { + boolean lazyInit, + SubscriptionListener subscriptionListener) { try { this.name = name; @@ -100,6 +102,7 @@ class StreamConsumer implements Consumer { stream, offsetSpecification, this.name, + subscriptionListener, messageHandlerWithOrWithoutTracking); this.status = Status.RUNNING; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java index 3e50878767..f80d3dfd57 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java @@ -18,6 +18,7 @@ import com.rabbitmq.stream.MessageHandler; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.SubscriptionListener; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.time.Duration; @@ -34,6 +35,7 @@ class StreamConsumerBuilder implements ConsumerBuilder { private DefaultAutoTrackingStrategy autoTrackingStrategy; private DefaultManualTrackingStrategy manualTrackingStrategy; private boolean lazyInit = false; + private SubscriptionListener subscriptionListener = subscriptionContext -> {}; public StreamConsumerBuilder(StreamEnvironment environment) { this.environment = environment; @@ -77,6 +79,15 @@ public ConsumerBuilder name(String name) { return this; } + @Override + public ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener) { + if (subscriptionListener == null) { + throw new IllegalArgumentException("The subscription listener cannot be null"); + } + this.subscriptionListener = subscriptionListener; + return this; + } + @Override public ManualTrackingStrategy manualTrackingStrategy() { this.manualTrackingStrategy = new DefaultManualTrackingStrategy(this); @@ -142,7 +153,8 @@ public Consumer build() { this.name, this.environment, trackingConfiguration, - this.lazyInit); + this.lazyInit, + this.subscriptionListener); environment.addConsumer((StreamConsumer) consumer); } else { consumer = diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 50edc1f0c5..53000f80f3 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -28,6 +28,7 @@ import com.rabbitmq.stream.ProducerBuilder; import com.rabbitmq.stream.StreamCreator; import com.rabbitmq.stream.StreamException; +import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.compression.CompressionCodecFactory; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.OffsetTrackingCoordinator.Registration; @@ -467,10 +468,16 @@ Runnable registerConsumer( String stream, OffsetSpecification offsetSpecification, String trackingReference, + SubscriptionListener subscriptionListener, MessageHandler messageHandler) { Runnable closingCallback = this.consumersCoordinator.subscribe( - consumer, stream, offsetSpecification, trackingReference, messageHandler); + consumer, + stream, + offsetSpecification, + trackingReference, + subscriptionListener, + messageHandler); return closingCallback; } diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index dd2bb2161e..8468c6b6b8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -33,6 +33,7 @@ import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.StreamDoesNotExistException; +import com.rabbitmq.stream.SubscriptionListener; import com.rabbitmq.stream.codec.WrapperMessageBuilder; import com.rabbitmq.stream.impl.Client.MessageListener; import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumersPoolInfo; @@ -64,6 +65,8 @@ public class ConsumersCoordinatorTest { + private static final SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = subscriptionContext -> {}; + @Mock StreamEnvironment environment; @Mock StreamConsumer consumer; @Mock Client locator; @@ -173,7 +176,13 @@ void tearDown() throws Exception { when(client.serverAdvertisedHost()).thenReturn("foo").thenReturn(replica().get(0).getHost()); when(client.serverAdvertisedPort()).thenReturn(42).thenReturn(replica().get(0).getPort()); - c.subscribe(consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {}); + c.subscribe( + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(2)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -202,7 +211,13 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod when(client.serverAdvertisedHost()).thenReturn(replica().get(0).getHost()); when(client.serverAdvertisedPort()).thenReturn(replica().get(0).getPort()); - c.subscribe(consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {}); + c.subscribe( + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -224,7 +239,12 @@ void shouldSubscribeWithEmptyPropertiesWithUnamedConsumer() { .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {}); + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -237,7 +257,12 @@ void subscribeShouldThrowExceptionWhenNoMetadataForTheStream() { assertThatThrownBy( () -> coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {})) + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {})) .isInstanceOf(StreamDoesNotExistException.class); } @@ -248,7 +273,12 @@ void subscribeShouldThrowExceptionWhenStreamDoesNotExist() { assertThatThrownBy( () -> coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {})) + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {})) .isInstanceOf(StreamDoesNotExistException.class); } @@ -259,7 +289,12 @@ void subscribeShouldThrowExceptionWhenMetadataResponseIsNotOk() { assertThatThrownBy( () -> coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {})) + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {})) .isInstanceOf(IllegalStateException.class); } @@ -269,7 +304,12 @@ void subscribeShouldThrowExceptionIfNoNodeAvailableForStream() { assertThatThrownBy( () -> coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {})) + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {})) .isInstanceOf(IllegalStateException.class); } @@ -305,6 +345,7 @@ void subscribeShouldSubscribeToStreamAndDispatchesMessage_UnsubscribeShouldUnsub "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> messageHandlerCalls.incrementAndGet()); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) @@ -349,6 +390,7 @@ void subscribeShouldSubscribeToStreamAndDispatchesMessageWithManySubscriptions() "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> messageHandlerCalls.compute(subId, (k, v) -> (v == null) ? 1 : ++v)); closingRunnables.add(closingRunnable); @@ -416,6 +458,7 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception { "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> messageHandlerCalls.incrementAndGet()); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) @@ -431,6 +474,7 @@ void shouldRedistributeConsumerIfConnectionIsLost() throws Exception { "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> {}); verify(client, times(1 + 1)) @@ -490,6 +534,7 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception { "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> messageHandlerCalls.incrementAndGet()); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) @@ -500,6 +545,7 @@ void shouldRedistributeConsumerOnMetadataUpdate() throws Exception { "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> {}); verify(client, times(1 + 1)) @@ -568,6 +614,7 @@ void shouldRetryRedistributionIfMetadataIsNotUpdatedImmediately() throws Excepti "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> messageHandlerCalls.incrementAndGet()); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) @@ -629,6 +676,7 @@ void metadataUpdate_shouldCloseConsumerIfStreamIsDeleted() throws Exception { "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> messageHandlerCalls.incrementAndGet()); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) @@ -678,6 +726,7 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> messageHandlerCalls.incrementAndGet()); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) @@ -728,6 +777,7 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip "stream", OffsetSpecification.first(), null, + NO_OP_SUBSCRIPTION_LISTENER, (offset, message) -> {})) .collect(Collectors.toList()); @@ -781,7 +831,12 @@ void shouldRemoveClientSubscriptionManagerFromPoolAfterConnectionDies() throws E .forEach( i -> { coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {}); + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); }); // the extra is allocated on another client from the same pool verify(clientFactory, times(2)).client(any()); @@ -798,7 +853,12 @@ void shouldRemoveClientSubscriptionManagerFromPoolAfterConnectionDies() throws E // the MAX consumers must have been re-allocated to the existing client and a new one // let's add a new subscription to make sure we are still using the same pool coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {}); + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(2 + 1)).client(any()); verify(client, times(subscriptionCount + ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + 1)) @@ -830,7 +890,12 @@ void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() t .forEach( i -> { coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {}); + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); }); // the extra is allocated on another client from the same pool verify(clientFactory, times(2)).client(any()); @@ -859,7 +924,12 @@ void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() t // the MAX consumers must have been re-allocated to the existing client and a new one // let's add a new subscription to make sure we are still using the same pool coordinator.subscribe( - consumer, "stream", OffsetSpecification.first(), null, (offset, message) -> {}); + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(2 + 1)).client(any()); verify(client, times(subscriptionCount + ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + 1)) @@ -903,7 +973,12 @@ void shouldRestartWhereItLeftOffAfterDisruption(Consumer {}); + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -966,7 +1041,12 @@ void shouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived( Runnable closingRunnable = coordinator.subscribe( - consumer, "stream", OffsetSpecification.next(), null, (offset, message) -> {}); + consumer, + "stream", + OffsetSpecification.next(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); @@ -1031,7 +1111,13 @@ void shouldUseStoredOffsetOnRecovery(Consumer configur .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); Runnable closingRunnable = - coordinator.subscribe(consumer, "stream", null, consumerName, (offset, message) -> {}); + coordinator.subscribe( + consumer, + "stream", + null, + consumerName, + NO_OP_SUBSCRIPTION_LISTENER, + (offset, message) -> {}); verify(clientFactory, times(1)).client(any()); verify(client, times(1)) .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 335f634496..99532e8e35 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.IntConsumer; import java.util.function.UnaryOperator; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -515,4 +516,161 @@ void autoTrackingShouldStorePeriodicallyAndAfterInactivity() throws Exception { waitAtMost(5, () -> client.queryOffset(reference, stream) == lastReceivedOffset.get()); } + + @Test + @DisabledIfRabbitMqCtlNotSet + void externalOffsetTrackingWithSubscriptionListener() throws Exception { + AtomicInteger subscriptionListenerCallCount = new AtomicInteger(0); + AtomicInteger receivedMessages = new AtomicInteger(0); + AtomicLong offsetTracking = new AtomicLong(0); + AtomicBoolean started = new AtomicBoolean(false); + environment.consumerBuilder().stream(stream) + .offset(OffsetSpecification.first()) + .subscriptionListener( + subscriptionContext -> { + subscriptionListenerCallCount.incrementAndGet(); + OffsetSpecification offsetSpecification = + started.get() + ? OffsetSpecification.offset(offsetTracking.get() + 1) + : subscriptionContext.offsetSpecification(); + subscriptionContext.offsetSpecification(offsetSpecification); + }) + .messageHandler( + (context, message) -> { + receivedMessages.incrementAndGet(); + offsetTracking.set(context.offset()); + started.set(true); + }) + .build(); + + int messageCount = 10_000; + Producer producer = environment.producerBuilder().stream(stream).build(); + Runnable publish = + () -> + IntStream.range(0, messageCount) + .forEach( + i -> + producer.send( + producer.messageBuilder().addData("".getBytes()).build(), + confirmationStatus -> {})); + + publish.run(); + + waitAtMost(5, () -> receivedMessages.get() == messageCount); + assertThat(offsetTracking.get()).isGreaterThanOrEqualTo(messageCount - 1); + + Host.killConnection("rabbitmq-stream-consumer"); + waitAtMost(RECOVERY_DELAY.multipliedBy(2), () -> subscriptionListenerCallCount.get() == 2); + + publish.run(); + waitAtMost(5, () -> receivedMessages.get() == messageCount * 2); + assertThat(offsetTracking.get()).isGreaterThanOrEqualTo(messageCount * 2 - 1); + } + + @Test + @DisabledIfRabbitMqCtlNotSet + void duplicatesWhenResubscribeAfterDisconnectionWithLongFlushInterval() throws Exception { + AtomicInteger receivedMessages = new AtomicInteger(0); + int storeEvery = 10_000; + String reference = "ref-1"; + CountDownLatch poisonLatch = new CountDownLatch(1); + environment.consumerBuilder().name(reference).stream(stream) + .offset(OffsetSpecification.first()) + .messageHandler( + (context, message) -> { + receivedMessages.incrementAndGet(); + if ("poison".equals(new String(message.getBodyAsBinary()))) { + poisonLatch.countDown(); + } + }) + .autoTrackingStrategy() + .flushInterval(Duration.ofMinutes(60)) // long flush interval + .messageCountBeforeStorage(storeEvery) + .builder() + .build(); + + AtomicInteger publishedMessages = new AtomicInteger(0); + Producer producer = environment.producerBuilder().stream(stream).build(); + IntConsumer publish = + messagesToPublish -> { + publishedMessages.addAndGet(messagesToPublish); + IntStream.range(0, messagesToPublish) + .forEach( + i -> + producer.send( + producer.messageBuilder().addData("".getBytes()).build(), + confirmationStatus -> {})); + }; + publish.accept(storeEvery * 2 - 100); + waitAtMost(5, () -> receivedMessages.get() == publishedMessages.get()); + Host.killConnection("rabbitmq-stream-consumer"); + + publish.accept(storeEvery * 2); + producer.send( + producer.messageBuilder().addData("poison".getBytes()).build(), confirmationStatus -> {}); + latchAssert(poisonLatch).completes(); + // we have duplicates because the last stored value is behind and the re-subscription uses it + assertThat(receivedMessages).hasValueGreaterThan(publishedMessages.get()); + } + + @Test + @DisabledIfRabbitMqCtlNotSet + void useSubscriptionListenerToRestartExactlyWhereDesired() throws Exception { + AtomicInteger subscriptionListenerCallCount = new AtomicInteger(0); + AtomicInteger receivedMessages = new AtomicInteger(0); + AtomicLong offsetTracking = new AtomicLong(0); + AtomicBoolean started = new AtomicBoolean(false); + int storeEvery = 10_000; + String reference = "ref-1"; + CountDownLatch poisonLatch = new CountDownLatch(1); + environment.consumerBuilder().name(reference).stream(stream) + .offset(OffsetSpecification.first()) + .subscriptionListener( + subscriptionContext -> { + subscriptionListenerCallCount.getAndIncrement(); + OffsetSpecification offsetSpecification = + started.get() + ? OffsetSpecification.offset(offsetTracking.get() + 1) + : subscriptionContext.offsetSpecification(); + subscriptionContext.offsetSpecification(offsetSpecification); + }) + .messageHandler( + (context, message) -> { + receivedMessages.incrementAndGet(); + offsetTracking.set(context.offset()); + started.set(true); + if ("poison".equals(new String(message.getBodyAsBinary()))) { + poisonLatch.countDown(); + } + }) + .autoTrackingStrategy() + .flushInterval(Duration.ofMinutes(60)) // long flush interval + .messageCountBeforeStorage(storeEvery) + .builder() + .build(); + + AtomicInteger publishedMessages = new AtomicInteger(0); + Producer producer = environment.producerBuilder().stream(stream).build(); + IntConsumer publish = + messagesToPublish -> { + publishedMessages.addAndGet(messagesToPublish); + IntStream.range(0, messagesToPublish) + .forEach( + i -> + producer.send( + producer.messageBuilder().addData("".getBytes()).build(), + confirmationStatus -> {})); + }; + publish.accept(storeEvery * 2 - 100); + waitAtMost(5, () -> receivedMessages.get() == publishedMessages.get()); + Host.killConnection("rabbitmq-stream-consumer"); + + publish.accept(storeEvery * 2); + producer.send( + producer.messageBuilder().addData("poison".getBytes()).build(), confirmationStatus -> {}); + latchAssert(poisonLatch).completes(); + // no duplicates because the custom offset tracking overrides the stored offset in the + // subscription listener + assertThat(receivedMessages).hasValue(publishedMessages.get() + 1); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 41ca4c26a3..e09b07dbad 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -87,6 +87,11 @@ public static Duration waitAtMost(CallableBooleanSupplier condition) throws Exce return waitAtMost(10, condition, null); } + public static Duration waitAtMost(Duration timeout, CallableBooleanSupplier condition) + throws Exception { + return waitAtMost((int) timeout.getSeconds(), condition, null); + } + public static Duration waitAtMost(int timeoutInSeconds, CallableBooleanSupplier condition) throws Exception { return waitAtMost(timeoutInSeconds, condition, null); From 0bb1db37893618f7421390afe9ad49867fc770c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Oct 2021 15:07:03 +0200 Subject: [PATCH 2/5] Add Javadoc for SubscriptionListener References #38 --- .../com/rabbitmq/stream/ConsumerBuilder.java | 9 +++++ .../rabbitmq/stream/SubscriptionListener.java | 39 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java index e9f2977e30..4f9e2d44af 100644 --- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java @@ -64,6 +64,15 @@ public interface ConsumerBuilder { */ ConsumerBuilder name(String name); + /** + * Callback on subscription. + * + *

Can be used to set the offset specification before subscribing to the stream. + * + * @see SubscriptionListener + * @param subscriptionListener the listener + * @return this builder instance + */ ConsumerBuilder subscriptionListener(SubscriptionListener subscriptionListener); /** diff --git a/src/main/java/com/rabbitmq/stream/SubscriptionListener.java b/src/main/java/com/rabbitmq/stream/SubscriptionListener.java index e1be84c966..137d885d64 100644 --- a/src/main/java/com/rabbitmq/stream/SubscriptionListener.java +++ b/src/main/java/com/rabbitmq/stream/SubscriptionListener.java @@ -13,14 +13,53 @@ // info@rabbitmq.com. package com.rabbitmq.stream; +/** + * Callback interface to customize a subscription. + * + *

It is possible to change the computed {@link OffsetSpecification} in {@link + * #preSubscribe(SubscriptionContext)} by using a custom offset tracking strategy. + */ public interface SubscriptionListener { + /** + * Callback called before the subscription is created. + * + *

The method is called when a {@link Consumer} is created and it registers to broker, and also + * when the subscription must be re-created (after a disconnection or when the subscription must + * moved because the stream member it was connection becomes unavailable). + * + *

Application code can set the {@link OffsetSpecification} that will be used with the {@link + * SubscriptionContext#offsetSpecification(OffsetSpecification)} method. + * + * @param subscriptionContext + */ void preSubscribe(SubscriptionContext subscriptionContext); + /** Context object for the subscription. */ interface SubscriptionContext { + /** + * The offset specification computed by the library. + * + *

If the consumer has no name, the value is the value set with {@link + * ConsumerBuilder#offset(OffsetSpecification)} on the first subscription and the offset of the + * last dispatched message on subsequent calls (e.g. when the client re-subscribes after a + * disconnection). + * + *

If the consumer has a name, the value is the last stored if any. + * + * @see ConsumerBuilder#name(String) + * @return the computed offset specification + */ OffsetSpecification offsetSpecification(); + /** + * Set the offset specification to use for the subscription. + * + *

It overrides the value computed by the client. + * + * @param offsetSpecification the offset specification to use + */ void offsetSpecification(OffsetSpecification offsetSpecification); } } From 91d61cbec8e72f3c69bcc4d3cddbbafdd2593e45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Oct 2021 15:10:03 +0200 Subject: [PATCH 3/5] Wait for recovery accordingly in test References #38 --- src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 99532e8e35..33da1504c9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -560,7 +560,7 @@ void externalOffsetTrackingWithSubscriptionListener() throws Exception { assertThat(offsetTracking.get()).isGreaterThanOrEqualTo(messageCount - 1); Host.killConnection("rabbitmq-stream-consumer"); - waitAtMost(RECOVERY_DELAY.multipliedBy(2), () -> subscriptionListenerCallCount.get() == 2); + waitAtMost(recoveryInitialDelay.multipliedBy(2), () -> subscriptionListenerCallCount.get() == 2); publish.run(); waitAtMost(5, () -> receivedMessages.get() == messageCount * 2); From ee5465aa41b199c0fd171bfe4a075bf157be17fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Oct 2021 16:31:16 +0200 Subject: [PATCH 4/5] Document subscription listener Fixes #38 --- src/docs/asciidoc/api.adoc | 40 +++++++++++++++++++ .../rabbitmq/stream/docs/ConsumerUsage.java | 26 ++++++++++++ 2 files changed, 66 insertions(+) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 1cf3e9035a..40a418fc0a 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -881,3 +881,43 @@ entry, which has its own offset. This means one must be careful when basing some decision on offset values, like a modulo to perform an operation every X messages. As the message offsets have no guarantee to be contiguous, the operation may not happen exactly every X messages. + +====== Subscription Listener + +The library provides a `SubscriptionListener` interface callback to add behavior before a subscription is created. +This callback can be used to customize the offset the library computed for the subscription. +The callback is called when the consumer is first created and when the library has to re-subscribe (e.g. after a disconnection or a topology change). + +It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides. +The following code snippet shows how this can be done (note the interaction with the external store is not detailed): + +.Using an external store for offset tracking with a subscription listener +[source,java,indent=0] +-------- +include::{test-examples}/ConsumerUsage.java[tag=subscription-listener] +-------- +<1> Set subscription listener +<2> Get offset from external store +<3> Set offset to use for the subscription +<4> Store the offset in the external store after processing + +When using an external store for offset tracking, it is no longer necessary to set a name and an offset strategy, as these only apply when server-side offset tracking is in use. + +Using a subscription listener can be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex. +This requires a good understanding on how and when subscription occurs in the library, and so when the subscription listener is called: + +* for a consumer with no name (server-side offset tracking _disabled_) +** on the first subscription (when the consumer is created): the offset specification is the one specified with `ConsumerBuilder#offset(OffsetSpecification)`, the default being `OffsetSpecification#next()` +** on re-subscription (after a disconnection or topology change): the offset specification is the offset of the last dispatched message +* for a consumer with a name (server-side offset tracking _enabled_) +** on the first subscription (when the consumer is created): the server-side stored offset (if any) overrides the value specified with `ConsumerBuilder#offset(OffsetSpecification)` +** on re-subscription (after a disconnection or topology change): the server-side stored offset is used + +The subscription listener comes in handy on re-subscription. +The application can track the last processed offset in-memory, with an `AtomicLong` for example. +The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the library may not be perfectly appropriate on re-subscription. + +If we take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval. +When the glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed. +Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate. +A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal. \ No newline at end of file diff --git a/src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java b/src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java index 6cc884536f..59e8f0f889 100644 --- a/src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java @@ -127,4 +127,30 @@ void manualTrackingWithSettings() { // end::manual-tracking-with-settings[] } + void subscriptionListener() { + Environment environment = Environment.builder().build(); + // tag::subscription-listener[] + Consumer consumer = environment.consumerBuilder() + .stream("my-stream") + .subscriptionListener(subscriptionContext -> { // <1> + long offset = getOffsetFromExternalStore(); // <2> + subscriptionContext.offsetSpecification(OffsetSpecification.offset(offset)); // <3> + }) + .messageHandler((context, message) -> { + // message handling code... + + storeOffsetInExternalStore(context.offset()); // <4> + }) + .build(); + // end::subscription-listener[] + } + + void storeOffsetInExternalStore(long offset) { + + } + + long getOffsetFromExternalStore() { + return 0L; + } + } From 5862193182418843134b10ae7562c8a562066c35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 2 Nov 2021 15:44:51 +0100 Subject: [PATCH 5/5] Polish subscription listener documentation References #38 --- src/docs/asciidoc/api.adoc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 40a418fc0a..e8133cbd32 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -884,9 +884,9 @@ no guarantee to be contiguous, the operation may not happen exactly every X mess ====== Subscription Listener -The library provides a `SubscriptionListener` interface callback to add behavior before a subscription is created. -This callback can be used to customize the offset the library computed for the subscription. -The callback is called when the consumer is first created and when the library has to re-subscribe (e.g. after a disconnection or a topology change). +The client provides a `SubscriptionListener` interface callback to add behavior before a subscription is created. +This callback can be used to customize the offset the client computed for the subscription. +The callback is called when the consumer is first created and when the client has to re-subscribe (e.g. after a disconnection or a topology change). It is possible to use the callback to get the last processed offset from an external store, that is not using the server-side offset tracking feature RabbitMQ Stream provides. The following code snippet shows how this can be done (note the interaction with the external store is not detailed): @@ -903,8 +903,8 @@ include::{test-examples}/ConsumerUsage.java[tag=subscription-listener] When using an external store for offset tracking, it is no longer necessary to set a name and an offset strategy, as these only apply when server-side offset tracking is in use. -Using a subscription listener can be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex. -This requires a good understanding on how and when subscription occurs in the library, and so when the subscription listener is called: +Using a subscription listener can also be useful to have more accurate offset tracking on re-subscription, at the cost of making the application code slightly more complex. +This requires a good understanding on how and when subscription occurs in the client, and so when the subscription listener is called: * for a consumer with no name (server-side offset tracking _disabled_) ** on the first subscription (when the consumer is created): the offset specification is the one specified with `ConsumerBuilder#offset(OffsetSpecification)`, the default being `OffsetSpecification#next()` @@ -915,9 +915,9 @@ This requires a good understanding on how and when subscription occurs in the li The subscription listener comes in handy on re-subscription. The application can track the last processed offset in-memory, with an `AtomicLong` for example. -The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the library may not be perfectly appropriate on re-subscription. +The application knows exactly when a message is processed and updates its in-memory tracking accordingly, whereas the value computed by the client may not be perfectly appropriate on re-subscription. -If we take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval. -When the glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed. +Let's take the example of a named consumer with an offset tracking strategy that is lagging because of bad timing and a long flush interval. +When a glitch happens and triggers the re-subscription, the server-side stored offset can be quite behind what the application actually processed. Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate. A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal. \ No newline at end of file