From 16ffff663c4f71d83565c543dce2007e9b6bcb52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Mon, 23 Sep 2024 17:19:50 +0200 Subject: [PATCH] Notify SAC when it is no longer active A single active consumer may not always receive a notification from the broker when it gets inactive. An obvious reason is the consumer connection goes down. It is still possible to call the consumer update listener from the library, which can help applications take an appropriate action when a consumer goes from active to inactive. This commit implements the call to the listener under such circumstances (connection closed, stream unavailable because restarted, normal consumer closing). --- .../java/com/rabbitmq/stream/impl/Client.java | 11 +- .../rabbitmq/stream/impl/StreamConsumer.java | 23 ++++ .../stream/impl/SuperStreamConsumer.java | 4 + src/test/java/com/rabbitmq/stream/Host.java | 77 ++++++++---- .../com/rabbitmq/stream/impl/Assertions.java | 62 ++++++++++ .../stream/impl/SacStreamConsumerTest.java | 80 ++++++++++++- .../impl/SacSuperStreamConsumerTest.java | 111 ++++++++++++++++-- .../com/rabbitmq/stream/impl/TestUtils.java | 58 ++++++++- 8 files changed, 374 insertions(+), 52 deletions(-) create mode 100644 src/test/java/com/rabbitmq/stream/impl/Assertions.java diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 37c3967fa9..bb8b5bc635 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -303,12 +303,7 @@ public void initChannel(SocketChannel ch) { }); ChannelFuture f; - String clientConnectionName = - parameters.clientProperties == null - ? "" - : (parameters.clientProperties.containsKey("connection_name") - ? parameters.clientProperties.get("connection_name") - : ""); + String clientConnectionName = parameters.clientProperties.getOrDefault("connection_name", ""); try { LOGGER.debug( "Trying to create stream connection to {}:{}, with client connection name '{}'", @@ -1505,6 +1500,10 @@ String connectionName() { return builder.append(serverAddress()).toString(); } + String clientConnectionName() { + return this.clientConnectionName; + } + private String serverAddress() { SocketAddress remoteAddress = remoteAddress(); if (remoteAddress instanceof InetSocketAddress) { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java index d252c777a5..b62dad93d6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java @@ -481,6 +481,7 @@ public void close() { } void closeFromEnvironment() { + this.maybeNotifyActiveToInactiveSac(); LOGGER.debug("Calling consumer {} closing callback (stream {})", this.id, this.stream); this.closingCallback.run(); closed.set(true); @@ -490,6 +491,7 @@ void closeFromEnvironment() { void closeAfterStreamDeletion() { if (closed.compareAndSet(false, true)) { + this.maybeNotifyActiveToInactiveSac(); this.environment.removeConsumer(this); this.status = Status.CLOSED; } @@ -506,11 +508,23 @@ void setTrackingClient(Client client) { void setSubscriptionClient(Client client) { this.subscriptionClient = client; if (client == null && this.isSac()) { + maybeNotifyActiveToInactiveSac(); // we lost the connection this.sacActive = false; } } + private void maybeNotifyActiveToInactiveSac() { + if (this.isSac() && this.sacActive) { + LOGGER.debug( + "Single active consumer {} from stream {} with name {} is unavailable, calling consumer update listener", + this.id, + this.stream, + this.name); + this.consumerUpdate(false); + } + } + synchronized void unavailable() { this.status = Status.NOT_AVAILABLE; this.trackingClient = null; @@ -623,4 +637,13 @@ private void checkNotClosed() { long id() { return this.id; } + + String subscriptionConnectionName() { + Client client = this.subscriptionClient; + if (client == null) { + return ""; + } else { + return client.clientConnectionName(); + } + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java index ce417b6b91..dac6a2e9eb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java +++ b/src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java @@ -187,6 +187,10 @@ public void store(long offset) { "Consumer#store(long) does not work for super streams, use MessageHandler.Context#storeOffset() instead"); } + Consumer consumer(String partition) { + return this.consumers.get(partition); + } + @Override public long storedOffset() { throw new UnsupportedOperationException( diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index 28b63de828..f1267075ed 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -47,30 +47,34 @@ public static String capture(InputStream is) throws IOException { return buff.toString(); } - private static Process executeCommand(String command) throws IOException { + private static Process executeCommand(String command) { return executeCommand(command, false); } - private static Process executeCommand(String command, boolean ignoreError) throws IOException { - Process pr = executeCommandProcess(command); - - int ev = waitForExitValue(pr); - if (ev != 0 && !ignoreError) { - String stdout = capture(pr.getInputStream()); - String stderr = capture(pr.getErrorStream()); - throw new IOException( - "unexpected command exit value: " - + ev - + "\ncommand: " - + command - + "\n" - + "\nstdout:\n" - + stdout - + "\nstderr:\n" - + stderr - + "\n"); + private static Process executeCommand(String command, boolean ignoreError) { + try { + Process pr = executeCommandProcess(command); + + int ev = waitForExitValue(pr); + if (ev != 0 && !ignoreError) { + String stdout = capture(pr.getInputStream()); + String stderr = capture(pr.getErrorStream()); + throw new IOException( + "unexpected command exit value: " + + ev + + "\ncommand: " + + command + + "\n" + + "\nstdout:\n" + + stdout + + "\nstderr:\n" + + stderr + + "\n"); + } + return pr; + } catch (IOException e) { + throw new RuntimeException(e); } - return pr; } public static String hostname() throws IOException { @@ -110,6 +114,10 @@ public static Process rabbitmqctl(String command) throws IOException { return executeCommand(rabbitmqctlCommand() + " " + command); } + static Process rabbitmqStreams(String command) { + return executeCommand(rabbitmqStreamsCommand() + " " + command); + } + public static Process rabbitmqctlIgnoreError(String command) throws IOException { return executeCommand(rabbitmqctlCommand() + " " + command, true); } @@ -189,11 +197,19 @@ static List toConnectionInfoList(String json) { return GSON.fromJson(json, new TypeToken>() {}.getType()); } - public static Process killStreamLeaderProcess(String stream) throws IOException { - return rabbitmqctl( - "eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\"" - + stream - + "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'"); + public static void restartStream(String stream) { + rabbitmqStreams(" restart_stream " + stream); + } + + public static Process killStreamLeaderProcess(String stream) { + try { + return rabbitmqctl( + "eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\"" + + stream + + "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'"); + } catch (IOException e) { + throw new RuntimeException(e); + } } public static void addUser(String username, String password) throws IOException { @@ -243,7 +259,7 @@ public static void setEnv(String parameter, String value) throws IOException { public static String rabbitmqctlCommand() { String rabbitmqCtl = System.getProperty("rabbitmqctl.bin"); if (rabbitmqCtl == null) { - throw new IllegalStateException("Please define the rabbitmqctl.bin system property"); + rabbitmqCtl = DOCKER_PREFIX + "rabbitmq"; } if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) { String containerId = rabbitmqCtl.split(":")[1]; @@ -253,6 +269,15 @@ public static String rabbitmqctlCommand() { } } + private static String rabbitmqStreamsCommand() { + String rabbitmqctl = rabbitmqctlCommand(); + int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl"); + if (lastIndex == -1) { + throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl); + } + return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-streams"; + } + public static AutoCloseable diskAlarm() throws Exception { return new CallableAutoCloseable( () -> { diff --git a/src/test/java/com/rabbitmq/stream/impl/Assertions.java b/src/test/java/com/rabbitmq/stream/impl/Assertions.java new file mode 100644 index 0000000000..b6f884d622 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/Assertions.java @@ -0,0 +1,62 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static org.assertj.core.api.Assertions.fail; + +import java.time.Duration; +import org.assertj.core.api.AbstractObjectAssert; + +final class Assertions { + + private Assertions() {} + + static SyncAssert assertThat(TestUtils.Sync sync) { + return new SyncAssert(sync); + } + + static class SyncAssert extends AbstractObjectAssert { + + private SyncAssert(TestUtils.Sync sync) { + super(sync, SyncAssert.class); + } + + SyncAssert completes() { + return this.completes(TestUtils.DEFAULT_CONDITION_TIMEOUT); + } + + SyncAssert completes(Duration timeout) { + boolean completed = actual.await(timeout); + if (!completed) { + fail("Sync timed out after %d ms", timeout.toMillis()); + } + return this; + } + + SyncAssert hasCompleted() { + if (!this.actual.hasCompleted()) { + fail("Sync should have completed but has not"); + } + return this; + } + + SyncAssert hasNotCompleted() { + if (this.actual.hasCompleted()) { + fail("Sync should have not completed"); + } + return this; + } + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java index af108a9659..698cbc5ede 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacStreamConsumerTest.java @@ -14,24 +14,24 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.publishAndWaitForConfirms; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.impl.Assertions.assertThat; +import static com.rabbitmq.stream.impl.TestUtils.*; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.Environment; -import com.rabbitmq.stream.EnvironmentBuilder; -import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition; import io.netty.channel.EventLoopGroup; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; @ExtendWith({ TestUtils.StreamTestInfrastructureExtension.class, @@ -237,4 +237,72 @@ void externalTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throw // nothing stored on the server side assertThat(cf.get().queryOffset(consumerName, stream).getOffset()).isZero(); } + + public static Stream> + activeConsumerShouldGetUpdateNotificationAfterDisruption() { + return Stream.of( + namedConsumer(consumer -> Host.killConnection(connectionName(consumer)), "kill connection"), + namedConsumer(consumer -> Host.restartStream(stream(consumer)), "restart stream"), + namedConsumer(Consumer::close, "close consumer")); + } + + @ParameterizedTest + @MethodSource + @TestUtils.DisabledIfRabbitMqCtlNotSet + void activeConsumerShouldGetUpdateNotificationAfterDisruption( + java.util.function.Consumer disruption) { + String consumerName = "foo"; + Sync consumer1Active = sync(); + Sync consumer1Inactive = sync(); + Consumer consumer1 = + environment.consumerBuilder().stream(stream) + .name(consumerName) + .noTrackingStrategy() + .singleActiveConsumer() + .consumerUpdateListener( + context -> { + if (context.isActive()) { + consumer1Active.down(); + } else { + consumer1Inactive.down(); + } + return OffsetSpecification.next(); + }) + .messageHandler((context, message) -> {}) + .build(); + + Sync consumer2Active = sync(); + Sync consumer2Inactive = sync(); + environment.consumerBuilder().stream(stream) + .name(consumerName) + .noTrackingStrategy() + .singleActiveConsumer() + .consumerUpdateListener( + context -> { + if (!context.isActive()) { + consumer2Inactive.down(); + } + return OffsetSpecification.next(); + }) + .messageHandler((context, message) -> {}) + .build(); + + assertThat(consumer1Active).completes(); + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).hasNotCompleted(); + assertThat(consumer2Active).hasNotCompleted(); + + disruption.accept(consumer1); + + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).completes(); + } + + private static String connectionName(Consumer consumer) { + return ((StreamConsumer) consumer).subscriptionConnectionName(); + } + + private static String stream(Consumer consumer) { + return ((StreamConsumer) consumer).stream(); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java index afb8e86131..3180bbbea6 100644 --- a/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java @@ -14,18 +14,12 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; -import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology; -import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology; -import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.impl.Assertions.assertThat; +import static com.rabbitmq.stream.impl.TestUtils.*; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.stream.Consumer; -import com.rabbitmq.stream.ConsumerUpdateListener; -import com.rabbitmq.stream.Environment; -import com.rabbitmq.stream.EnvironmentBuilder; -import com.rabbitmq.stream.NoOffsetException; -import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast311Condition; import com.rabbitmq.stream.impl.TestUtils.CallableBooleanSupplier; import com.rabbitmq.stream.impl.TestUtils.SingleActiveConsumer; @@ -39,11 +33,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; @ExtendWith({ TestUtils.StreamTestInfrastructureExtension.class, @@ -287,7 +284,8 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { && consumerStates.get("1" + partitions.get(2))); assertThat(consumerStates) - .containsEntry("0" + partitions.get(0), true) // not changed after closing + .containsEntry( + "0" + partitions.get(0), false) // client library notifies the listener on closing .containsEntry("0" + partitions.get(1), false) // not changed after closing .containsEntry("0" + partitions.get(2), false) // not changed after closing .containsEntry("1" + partitions.get(0), true) // now active @@ -314,12 +312,15 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception { && consumerStates.get("2" + partitions.get(2))); assertThat(consumerStates) - .containsEntry("0" + partitions.get(0), true) // not changed after closing + .containsEntry( + "0" + partitions.get(0), false) // client library notifies the listener on closing .containsEntry("0" + partitions.get(1), false) // not changed after closing .containsEntry("0" + partitions.get(2), false) // not changed after closing - .containsEntry("1" + partitions.get(0), true) // not changed after closing + .containsEntry( + "1" + partitions.get(0), false) // client library notifies the listener on closing .containsEntry("1" + partitions.get(1), false) // not changed after closing - .containsEntry("1" + partitions.get(2), true) // not changed after closing + .containsEntry( + "1" + partitions.get(2), false) // client library notifies the listener on closing .containsEntry("2" + partitions.get(0), true) // now active .containsEntry("2" + partitions.get(1), true) // now active .containsEntry("2" + partitions.get(2), true); // now active @@ -809,6 +810,85 @@ void consumerGroupsOnSameSuperStreamShouldBeIndependent() { }); } + public static Stream> + activeConsumerShouldGetUpdateNotificationAfterDisruption() { + return Stream.of( + namedBiConsumer((s, c) -> Host.killConnection(connectionName(s, c)), "kill connection"), + namedBiConsumer((s, c) -> Host.restartStream(s), "restart stream"), + namedBiConsumer((s, c) -> c.close(), "close consumer")); + } + + @ParameterizedTest + @MethodSource + @TestUtils.DisabledIfRabbitMqCtlNotSet + void activeConsumerShouldGetUpdateNotificationAfterDisruption( + java.util.function.BiConsumer disruption) { + declareSuperStreamTopology(configurationClient, superStream, partitionCount); + String partition = superStream + "-0"; + + String consumerName = "foo"; + Function, ConsumerUpdateListener> + filteringListener = + action -> + (ConsumerUpdateListener) + context -> { + if (partition.equals(context.stream())) { + action.accept(context); + } + return OffsetSpecification.next(); + }; + + Sync consumer1Active = sync(); + Sync consumer1Inactive = sync(); + + Consumer consumer1 = + environment + .consumerBuilder() + .singleActiveConsumer() + .superStream(superStream) + .name(consumerName) + .noTrackingStrategy() + .consumerUpdateListener( + filteringListener.apply( + context -> { + if (context.isActive()) { + consumer1Active.down(); + } else { + consumer1Inactive.down(); + } + })) + .messageHandler((context, message) -> {}) + .build(); + + Sync consumer2Active = sync(); + Sync consumer2Inactive = sync(); + environment + .consumerBuilder() + .singleActiveConsumer() + .superStream(superStream) + .name(consumerName) + .noTrackingStrategy() + .consumerUpdateListener( + filteringListener.apply( + context -> { + if (!context.isActive()) { + consumer2Inactive.down(); + } + })) + .messageHandler((context, message) -> {}) + .build(); + + assertThat(consumer1Active).completes(); + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).hasNotCompleted(); + assertThat(consumer2Active).hasNotCompleted(); + + disruption.accept(partition, consumer1); + + assertThat(consumer2Inactive).hasNotCompleted(); + assertThat(consumer1Inactive).completes(); + } + private static void waitUntil(CallableBooleanSupplier action) { try { waitAtMost(action); @@ -816,4 +896,9 @@ private static void waitUntil(CallableBooleanSupplier action) { throw new RuntimeException(e); } } + + private static String connectionName(String partition, Consumer consumer) { + return ((StreamConsumer) ((SuperStreamConsumer) consumer).consumer(partition)) + .subscriptionConnectionName(); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 56e862d7ec..9ab50aadf7 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -83,7 +83,7 @@ public final class TestUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class); - private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10); + static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10); private static final ConnectionFactory AMQP_CF = new ConnectionFactory(); @@ -257,6 +257,20 @@ public String toString() { }; } + static BiConsumer namedBiConsumer(BiConsumer delegate, String description) { + return new BiConsumer() { + @Override + public void accept(T t, U s) { + delegate.accept(t, s); + } + + @Override + public String toString() { + return description; + } + }; + } + static Answer answer(Runnable task) { return invocationOnMock -> { task.run(); @@ -1103,4 +1117,46 @@ static void repeatIfFailure(RunnableWithException test) throws Exception { private static Connection connection() throws IOException, TimeoutException { return AMQP_CF.newConnection(); } + + static Sync sync() { + return sync(1); + } + + static Sync sync(int count) { + return new Sync(count); + } + + static class Sync { + + private final AtomicReference latch = new AtomicReference<>(); + + private Sync(int count) { + this.latch.set(new CountDownLatch(count)); + } + + void down() { + this.latch.get().countDown(); + } + + boolean await(Duration timeout) { + try { + return this.latch.get().await(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } + + void reset(int count) { + this.latch.set(new CountDownLatch(count)); + } + + void reset() { + this.reset(1); + } + + boolean hasCompleted() { + return this.latch.get().getCount() == 0; + } + } }