diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 7543f2f175..2019fc656a 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -33,10 +33,16 @@ jobs: -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - name: Test (dynamic-batch publishing) run: | - ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ -Drabbitmq.stream.producer.dynamic.batch=true \ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - name: Stop broker run: docker stop rabbitmq && docker rm rabbitmq + - name: Start cluster + run: ci/start-cluster.sh + - name: Test against cluster + run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0 + - name: Stop cluster + run: docker compose --file ci/cluster/docker-compose.yml down diff --git a/ci/cluster/configuration-0/enabled_plugins b/ci/cluster/configuration-0/enabled_plugins new file mode 100644 index 0000000000..244c8f60e8 --- /dev/null +++ b/ci/cluster/configuration-0/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_stream_management]. diff --git a/ci/cluster/configuration-0/rabbitmq.conf b/ci/cluster/configuration-0/rabbitmq.conf new file mode 100644 index 0000000000..9216e03142 --- /dev/null +++ b/ci/cluster/configuration-0/rabbitmq.conf @@ -0,0 +1,8 @@ +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@node0 +cluster_formation.classic_config.nodes.2 = rabbit@node1 +cluster_formation.classic_config.nodes.3 = rabbit@node2 +loopback_users = none + +stream.advertised_host = localhost +stream.advertised_port = 5552 diff --git a/ci/cluster/configuration-1/enabled_plugins b/ci/cluster/configuration-1/enabled_plugins new file mode 100644 index 0000000000..244c8f60e8 --- /dev/null +++ b/ci/cluster/configuration-1/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_stream_management]. diff --git a/ci/cluster/configuration-1/rabbitmq.conf b/ci/cluster/configuration-1/rabbitmq.conf new file mode 100644 index 0000000000..2184b5e340 --- /dev/null +++ b/ci/cluster/configuration-1/rabbitmq.conf @@ -0,0 +1,8 @@ +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@node0 +cluster_formation.classic_config.nodes.2 = rabbit@node1 +cluster_formation.classic_config.nodes.3 = rabbit@node2 +loopback_users = none + +stream.advertised_host = localhost +stream.advertised_port = 5553 diff --git a/ci/cluster/configuration-2/enabled_plugins b/ci/cluster/configuration-2/enabled_plugins new file mode 100644 index 0000000000..244c8f60e8 --- /dev/null +++ b/ci/cluster/configuration-2/enabled_plugins @@ -0,0 +1 @@ +[rabbitmq_stream_management]. diff --git a/ci/cluster/configuration-2/rabbitmq.conf b/ci/cluster/configuration-2/rabbitmq.conf new file mode 100644 index 0000000000..ff57480c50 --- /dev/null +++ b/ci/cluster/configuration-2/rabbitmq.conf @@ -0,0 +1,8 @@ +cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config +cluster_formation.classic_config.nodes.1 = rabbit@node0 +cluster_formation.classic_config.nodes.2 = rabbit@node1 +cluster_formation.classic_config.nodes.3 = rabbit@node2 +loopback_users = none + +stream.advertised_host = localhost +stream.advertised_port = 5554 diff --git a/ci/cluster/docker-compose.yml b/ci/cluster/docker-compose.yml new file mode 100644 index 0000000000..39345d3e8d --- /dev/null +++ b/ci/cluster/docker-compose.yml @@ -0,0 +1,64 @@ +services: + node0: + environment: + - RABBITMQ_ERLANG_COOKIE='secret_cookie' + networks: + - rabbitmq-cluster + hostname: node0 + container_name: rabbitmq0 + image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + pull_policy: always + ports: + - "5672:5672" + - "5552:5552" + - "15672:15672" + tty: true + volumes: + - ./configuration-0/:/etc/rabbitmq/ + node1: + environment: + - RABBITMQ_ERLANG_COOKIE='secret_cookie' + networks: + - rabbitmq-cluster + hostname: node1 + container_name: rabbitmq1 + image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + pull_policy: always + ports: + - "5673:5672" + - "5553:5552" + - "15673:15672" + tty: true + volumes: + - ./configuration-1/:/etc/rabbitmq/ + node2: + environment: + - RABBITMQ_ERLANG_COOKIE='secret_cookie' + networks: + - rabbitmq-cluster + hostname: node2 + container_name: rabbitmq2 + image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + pull_policy: always + ports: + - "5674:5672" + - "5554:5552" + - "15674:15672" + tty: true + volumes: + - ./configuration-2/:/etc/rabbitmq/ + load-balander: + networks: + - rabbitmq-cluster + hostname: load-balancer + container_name: haproxy + image: haproxy:3.0 + pull_policy: always + ports: + - "5555:5555" + - "8100:8100" + tty: true + volumes: + - ./load-balancer/:/usr/local/etc/haproxy:ro +networks: + rabbitmq-cluster: diff --git a/ci/cluster/load-balancer/haproxy.cfg b/ci/cluster/load-balancer/haproxy.cfg new file mode 100644 index 0000000000..e5c628bec1 --- /dev/null +++ b/ci/cluster/load-balancer/haproxy.cfg @@ -0,0 +1,31 @@ +global + log 127.0.0.1 local0 info + maxconn 512 + +defaults + log global + mode tcp + option tcplog + option dontlognull + retries 3 + option redispatch + maxconn 512 + timeout connect 5s + timeout client 120s + timeout server 120s + +listen stream + bind :5555 + mode tcp + balance roundrobin + server rabbitmq-0 node0:5552 + server rabbitmq-1 node1:5552 + server rabbitmq-2 node2:5552 + +listen stats + bind :8100 + mode http + option httplog + stats enable + stats uri /stats + stats refresh 5s diff --git a/ci/start-cluster.sh b/ci/start-cluster.sh new file mode 100755 index 0000000000..c7a5a9f321 --- /dev/null +++ b/ci/start-cluster.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +export RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.0} + +wait_for_message() { + while ! docker logs "$1" | grep -q "$2"; + do + sleep 2 + echo "Waiting 2 seconds for $1 to start..." + done +} + +docker compose --file ci/cluster/docker-compose.yml down +docker compose --file ci/cluster/docker-compose.yml up --detach + +wait_for_message rabbitmq0 "completed with" + +docker exec rabbitmq0 rabbitmqctl await_online_nodes 3 + +docker exec rabbitmq0 rabbitmqctl enable_feature_flag --opt-in khepri_db +docker exec rabbitmq1 rabbitmqctl enable_feature_flag --opt-in khepri_db +docker exec rabbitmq2 rabbitmqctl enable_feature_flag --opt-in khepri_db + +docker exec rabbitmq0 rabbitmqctl cluster_status + +docker compose --file ci/cluster/docker-compose.yml ps diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index d563ffb3b1..d60f4ff943 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -16,6 +16,7 @@ import static com.rabbitmq.stream.impl.Utils.*; import static java.lang.String.format; +import static java.util.stream.Collectors.toList; import com.rabbitmq.stream.*; import com.rabbitmq.stream.Consumer; @@ -35,7 +36,6 @@ import java.util.Map.Entry; import java.util.NavigableSet; import java.util.Objects; -import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -53,7 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class ConsumersCoordinator { +final class ConsumersCoordinator implements AutoCloseable { static final int MAX_SUBSCRIPTIONS_PER_CLIENT = 256; static final int MAX_ATTEMPT_BEFORE_FALLING_BACK_TO_LEADER = 5; @@ -62,7 +62,6 @@ class ConsumersCoordinator { static final OffsetSpecification DEFAULT_OFFSET_SPECIFICATION = OffsetSpecification.next(); private static final Logger LOGGER = LoggerFactory.getLogger(ConsumersCoordinator.class); - private final Random random = new Random(); private final StreamEnvironment environment; private final ClientFactory clientFactory; private final int maxConsumersByConnection; @@ -70,6 +69,7 @@ class ConsumersCoordinator { private final AtomicLong managerIdSequence = new AtomicLong(0); private final NavigableSet managers = new ConcurrentSkipListSet<>(); private final AtomicLong trackerIdSequence = new AtomicLong(0); + private final Function, Broker> brokerPicker; private final List trackers = new CopyOnWriteArrayList<>(); private final ExecutorServiceFactory executorServiceFactory = @@ -83,16 +83,14 @@ class ConsumersCoordinator { int maxConsumersByConnection, Function connectionNamingStrategy, ClientFactory clientFactory, - boolean forceReplica) { + boolean forceReplica, + Function, Broker> brokerPicker) { this.environment = environment; this.clientFactory = clientFactory; this.maxConsumersByConnection = maxConsumersByConnection; this.connectionNamingStrategy = connectionNamingStrategy; this.forceReplica = forceReplica; - } - - private static String keyForClientSubscription(Client.Broker broker) { - return broker.getHost() + ":" + broker.getPort(); + this.brokerPicker = brokerPicker; } private BackOffDelayPolicy recoveryBackOffDelayPolicy() { @@ -116,8 +114,8 @@ Runnable subscribe( return lock( this.coordinatorLock, () -> { - List candidates = findBrokersForStream(stream, forceReplica); - Client.Broker newNode = pickBroker(candidates); + List candidates = findCandidateNodes(stream, forceReplica); + Broker newNode = pickBroker(this.brokerPicker, candidates); if (newNode == null) { throw new IllegalStateException("No available node to subscribe to"); } @@ -138,7 +136,7 @@ Runnable subscribe( flowStrategy); try { - addToManager(newNode, subscriptionTracker, offsetSpecification, true); + addToManager(newNode, candidates, subscriptionTracker, offsetSpecification, true); } catch (ConnectionStreamException e) { // these exceptions are not public throw new StreamException(e.getMessage()); @@ -162,6 +160,7 @@ Runnable subscribe( private void addToManager( Broker node, + List candidates, SubscriptionTracker tracker, OffsetSpecification offsetSpecification, boolean isInitialSubscription) { @@ -189,9 +188,9 @@ private void addToManager( } } if (pickedManager == null) { - String name = keyForClientSubscription(node); + String name = keyForNode(node); LOGGER.debug("Creating subscription manager on {}", name); - pickedManager = new ClientSubscriptionsManager(node, clientParameters); + pickedManager = new ClientSubscriptionsManager(node, candidates, clientParameters); LOGGER.debug("Created subscription manager on {}, id {}", name, pickedManager.id); } try { @@ -231,7 +230,7 @@ int managerCount() { } // package protected for testing - List findBrokersForStream(String stream, boolean forceReplica) { + List findCandidateNodes(String stream, boolean forceReplica) { LOGGER.debug( "Candidate lookup to consumer from '{}', forcing replica? {}", stream, forceReplica); Map metadata = @@ -254,12 +253,13 @@ List findBrokersForStream(String stream, boolean forceReplica) { } } - List replicas = streamMetadata.getReplicas(); - if ((replicas == null || replicas.isEmpty()) && streamMetadata.getLeader() == null) { + Broker leader = streamMetadata.getLeader(); + List replicas = streamMetadata.getReplicas(); + if ((replicas == null || replicas.isEmpty()) && leader == null) { throw new IllegalStateException("No node available to consume from stream " + stream); } - List brokers; + List brokers; if (replicas == null || replicas.isEmpty()) { if (forceReplica) { throw new IllegalStateException( @@ -268,13 +268,18 @@ List findBrokersForStream(String stream, boolean forceReplica) { + "consuming from leader has been deactivated for this consumer", stream)); } else { - brokers = Collections.singletonList(streamMetadata.getLeader()); - LOGGER.debug( - "Only leader node {} for consuming from {}", streamMetadata.getLeader(), stream); + brokers = Collections.singletonList(new BrokerWrapper(leader, true)); + LOGGER.debug("Only leader node {} for consuming from {}", leader, stream); } } else { LOGGER.debug("Replicas for consuming from {}: {}", stream, replicas); - brokers = new ArrayList<>(replicas); + brokers = + replicas.stream() + .map(b -> new BrokerWrapper(b, false)) + .collect(Collectors.toCollection(ArrayList::new)); + if (!forceReplica && leader != null) { + brokers.add(new BrokerWrapper(leader, true)); + } } LOGGER.debug("Candidates to consume from {}: {}", stream, brokers); @@ -282,7 +287,7 @@ List findBrokersForStream(String stream, boolean forceReplica) { return brokers; } - private Callable> findBrokersForStream(String stream) { + private Callable> findCandidateNodes(String stream) { AtomicInteger attemptNumber = new AtomicInteger(); return () -> { boolean mustUseReplica; @@ -294,20 +299,10 @@ private Callable> findBrokersForStream(String stream) { } LOGGER.debug( "Looking for broker(s) for stream {}, forcing replica {}", stream, mustUseReplica); - return findBrokersForStream(stream, mustUseReplica); + return findCandidateNodes(stream, mustUseReplica); }; } - private Client.Broker pickBroker(List brokers) { - if (brokers.isEmpty()) { - return null; - } else if (brokers.size() == 1) { - return brokers.get(0); - } else { - return brokers.get(random.nextInt(brokers.size())); - } - } - public void close() { Iterator iterator = this.managers.iterator(); while (iterator.hasNext()) { @@ -571,6 +566,7 @@ private class ClientSubscriptionsManager implements Comparable: (actual or advertised) private final String name; // the 2 data structures track the subscriptions, they must remain consistent private final Map> streamToStreamSubscriptions = @@ -582,12 +578,14 @@ private class ClientSubscriptionsManager implements Comparable candidates, + Client.ClientParameters clientParameters) { this.id = managerIdSequence.getAndIncrement(); - this.node = node; - this.name = keyForClientSubscription(node); - LOGGER.debug("creating subscription manager on {}", name); this.trackerCount = 0; + AtomicReference nameReference = new AtomicReference<>(); + AtomicBoolean clientInitializedInManager = new AtomicBoolean(false); ChunkListener chunkListener = (client, subscriptionId, offset, messageCount, dataSize) -> { @@ -639,7 +637,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa "Could not find stream subscription {} in manager {}, node {} for message listener", subscriptionId, this.id, - this.name); + nameReference.get()); } }; MessageIgnoredListener messageIgnoredListener = @@ -663,7 +661,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa "Could not find stream subscription {} in manager {}, node {} for message ignored listener", subscriptionId, this.id, - this.name); + nameReference.get()); } }; ShutdownListener shutdownListener = @@ -675,7 +673,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa if (shutdownContext.isShutdownUnexpected()) { LOGGER.debug( "Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment", - name); + nameReference.get()); LOGGER.debug( "Subscription connection has {} consumer(s) over {} stream(s) to recover", this.subscriptionTrackers.stream().filter(Objects::nonNull).count(), @@ -718,7 +716,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa } }, "Consumers re-assignment after disconnection from %s", - name)); + nameReference.get())); } }; MetadataListener metadataListener = @@ -792,18 +790,23 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa }; String connectionName = connectionNamingStrategy.apply(ClientConnectionType.CONSUMER); ClientFactoryContext clientFactoryContext = - ClientFactoryContext.fromParameters( - clientParameters - .clientProperty("connection_name", connectionName) - .chunkListener(chunkListener) - .creditNotification(creditNotification) - .messageListener(messageListener) - .messageIgnoredListener(messageIgnoredListener) - .shutdownListener(shutdownListener) - .metadataListener(metadataListener) - .consumerUpdateListener(consumerUpdateListener)) - .key(name); + new ClientFactoryContext( + clientParameters + .clientProperty("connection_name", connectionName) + .chunkListener(chunkListener) + .creditNotification(creditNotification) + .messageListener(messageListener) + .messageIgnoredListener(messageIgnoredListener) + .shutdownListener(shutdownListener) + .metadataListener(metadataListener) + .consumerUpdateListener(consumerUpdateListener), + keyForNode(targetNode), + candidates.stream().map(BrokerWrapper::broker).collect(toList())); this.client = clientFactory.client(clientFactoryContext); + this.node = brokerFromClient(this.client); + this.name = keyForNode(this.node); + nameReference.set(this.name); + LOGGER.debug("creating subscription manager on {}", name); LOGGER.debug("Created consumer connection '{}'", connectionName); clientInitializedInManager.set(true); } @@ -828,7 +831,7 @@ private void assignConsumersToStream( } }; - AsyncRetry.asyncRetry(findBrokersForStream(stream)) + AsyncRetry.asyncRetry(findCandidateNodes(stream)) .description("Candidate lookup to consume from '%s'", stream) .scheduler(environment.scheduledExecutorService()) .retry(ex -> !(ex instanceof StreamDoesNotExistException)) @@ -836,7 +839,7 @@ private void assignConsumersToStream( .build() .thenAccept( candidateNodes -> { - List candidates = candidateNodes; + List candidates = candidateNodes; if (candidates == null) { LOGGER.debug("No candidate nodes to consume from '{}'", stream); consumersClosingCallback.run(); @@ -870,7 +873,8 @@ private List createSubscriptionTrackerList() { return newSubscriptions; } - private void maybeRecoverSubscription(List candidates, SubscriptionTracker tracker) { + private void maybeRecoverSubscription( + List candidates, SubscriptionTracker tracker) { if (tracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) { try { recoverSubscription(candidates, tracker); @@ -891,12 +895,12 @@ private void maybeRecoverSubscription(List candidates, SubscriptionTrack } } - private void recoverSubscription(List candidates, SubscriptionTracker tracker) { + private void recoverSubscription(List candidates, SubscriptionTracker tracker) { boolean reassignmentCompleted = false; while (!reassignmentCompleted) { try { if (tracker.consumer.isOpen()) { - Broker broker = pickBroker(candidates); + Broker broker = pickBroker(brokerPicker, candidates); LOGGER.debug("Using {} to resume consuming from {}", broker, tracker.stream); synchronized (tracker.consumer) { if (tracker.consumer.isOpen()) { @@ -906,7 +910,7 @@ private void recoverSubscription(List candidates, SubscriptionTracker tr } else { offsetSpecification = tracker.initialOffsetSpecification; } - addToManager(broker, tracker, offsetSpecification, false); + addToManager(broker, candidates, tracker, offsetSpecification, false); } } } else { @@ -927,7 +931,7 @@ private void recoverSubscription(List candidates, SubscriptionTracker tr // maybe not a good candidate, let's refresh and retry for this one candidates = Utils.callAndMaybeRetry( - findBrokersForStream(tracker.stream), + findCandidateNodes(tracker.stream), ex -> !(ex instanceof StreamDoesNotExistException), recoveryBackOffDelayPolicy(), "Candidate lookup to consume from '%s' (subscription recovery)", @@ -1295,4 +1299,20 @@ static int pickSlot(List list, AtomicInteger sequence) { } return index; } + + private static List keepReplicasIfPossible(Collection brokers) { + if (brokers.size() > 1) { + return brokers.stream() + .filter(w -> !w.isLeader()) + .map(BrokerWrapper::broker) + .collect(toList()); + } else { + return brokers.stream().map(BrokerWrapper::broker).collect(toList()); + } + } + + static Broker pickBroker( + Function, Broker> picker, Collection candidates) { + return picker.apply(keepReplicasIfPossible(candidates)); + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 56143a0547..14a1f3adc0 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -32,13 +32,7 @@ import com.rabbitmq.stream.impl.Utils.ClientConnectionType; import com.rabbitmq.stream.impl.Utils.ClientFactory; import com.rabbitmq.stream.impl.Utils.ClientFactoryContext; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; @@ -87,10 +81,6 @@ class ProducersCoordinator { this.connectionNamingStrategy = connectionNamingStrategy; } - private static String keyForNode(Client.Broker broker) { - return broker.getHost() + ":" + broker.getPort(); - } - Runnable registerProducer(StreamProducer producer, String reference, String stream) { return lock( this.coordinatorLock, @@ -162,7 +152,7 @@ private void addToManager(Broker node, AgentTracker tracker) { } if (pickedManager == null) { String name = keyForNode(node); - LOGGER.debug("Creating producer manager on {}", name); + LOGGER.debug("Trying to create producer manager on {}", name); pickedManager = new ClientProducersManager(node, this.clientFactory, clientParameters); LOGGER.debug("Created producer manager on {}, id {}", name, pickedManager.id); } @@ -578,10 +568,9 @@ private class ClientProducersManager implements Comparable nameReference = new AtomicReference<>(); AtomicReference ref = new AtomicReference<>(); AtomicBoolean clientInitializedInManager = new AtomicBoolean(false); PublishConfirmListener publishConfirmListener = @@ -636,7 +625,7 @@ private ClientProducersManager( }); }, "Producer recovery after disconnection from %s", - name)); + nameReference.get())); } }; MetadataListener metadataListener = @@ -685,15 +674,19 @@ private ClientProducersManager( }; String connectionName = connectionNamingStrategy.apply(ClientConnectionType.PRODUCER); ClientFactoryContext connectionFactoryContext = - ClientFactoryContext.fromParameters( - clientParameters - .publishConfirmListener(publishConfirmListener) - .publishErrorListener(publishErrorListener) - .shutdownListener(shutdownListener) - .metadataListener(metadataListener) - .clientProperty("connection_name", connectionName)) - .key(name); + new ClientFactoryContext( + clientParameters + .publishConfirmListener(publishConfirmListener) + .publishErrorListener(publishErrorListener) + .shutdownListener(shutdownListener) + .metadataListener(metadataListener) + .clientProperty("connection_name", connectionName), + keyForNode(targetNode), + Collections.emptyList()); this.client = cf.client(connectionFactoryContext); + this.node = Utils.brokerFromClient(this.client); + this.name = keyForNode(this.node); + nameReference.set(this.name); LOGGER.debug("Created producer connection '{}'", connectionName); clientInitializedInManager.set(true); ref.set(this.client); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index ed951c38b7..3b66e0d3ca 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -219,7 +219,8 @@ class StreamEnvironment implements Environment { maxConsumersByConnection, connectionNamingStrategy, Utils.coordinatorClientFactory(this), - forceReplicaForConsumers); + forceReplicaForConsumers, + Utils.brokerPicker()); this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this); ClientParameters clientParametersForInit = locatorParametersCopy(); Runnable locatorInitSequence = diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 745c7778a1..558d2a347b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -15,6 +15,7 @@ package com.rabbitmq.stream.impl; import static java.lang.String.format; +import static java.util.Collections.unmodifiableList; import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.Client.ClientParameters; @@ -22,13 +23,7 @@ import java.net.UnknownHostException; import java.security.cert.X509Certificate; import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -40,12 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; +import java.util.function.*; import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.LongConsumer; -import java.util.function.LongSupplier; -import java.util.function.Predicate; -import java.util.function.Supplier; import javax.net.ssl.X509TrustManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +136,11 @@ static short encodeResponseCode(Short code) { } static ClientFactory coordinatorClientFactory(StreamEnvironment environment) { + return coordinatorClientFactory(environment, ConditionalClientFactory.RETRY_INTERVAL); + } + + static ClientFactory coordinatorClientFactory( + StreamEnvironment environment, Duration retryInterval) { String messageFormat = "%s. %s. " + "This may be due to the usage of a load balancer that makes topology discovery fail. " @@ -157,14 +153,16 @@ static ClientFactory coordinatorClientFactory(StreamEnvironment environment) { address = environment.addressResolver().resolve(address); parametersCopy.host(address.host()).port(address.port()); - if (context.key() == null) { + if (context.targetKey() == null) { throw new IllegalArgumentException("A key is necessary to create the client connection"); } try { - return Utils.connectToAdvertisedNodeClientFactory( - context.key(), context1 -> new Client(context1.parameters())) - .client(Utils.ClientFactoryContext.fromParameters(parametersCopy).key(context.key())); + ClientFactory delegate = context1 -> new Client(context1.parameters()); + ClientFactoryContext clientFactoryContext = + new ClientFactoryContext(parametersCopy, context.targetKey(), context.candidates()); + return Utils.connectToAdvertisedNodeClientFactory(delegate, retryInterval) + .client(clientFactoryContext); } catch (TimeoutStreamException e) { throw new TimeoutStreamException( format(messageFormat, e.getMessage(), e.getCause().getMessage(), e.getCause())); @@ -182,28 +180,47 @@ static ClientFactory coordinatorClientFactory(StreamEnvironment environment) { } static ClientFactory connectToAdvertisedNodeClientFactory( - String expectedAdvertisedHostPort, ClientFactory clientFactory) { - return connectToAdvertisedNodeClientFactory( - expectedAdvertisedHostPort, clientFactory, ExactNodeRetryClientFactory.RETRY_INTERVAL); - } - - static ClientFactory connectToAdvertisedNodeClientFactory( - String expectedAdvertisedHostPort, ClientFactory clientFactory, Duration retryInterval) { - return new ExactNodeRetryClientFactory( + ClientFactory clientFactory, Duration retryInterval) { + return new ConditionalClientFactory( clientFactory, - client -> { + (ctx, client) -> { String currentKey = client.serverAdvertisedHost() + ":" + client.serverAdvertisedPort(); - boolean success = expectedAdvertisedHostPort.equals(currentKey); + boolean success = ctx.targetKey().equals(currentKey); + if (!success && !ctx.candidates().isEmpty()) { + success = ctx.candidates().stream().anyMatch(b -> currentKey.equals(keyForNode(b))); + } LOGGER.debug( - "Expected client {}, got {}: {}", - expectedAdvertisedHostPort, + "Expected client {}, got {}, viable candidates {}: {}", + ctx.targetKey(), currentKey, + ctx.candidates(), success ? "success" : "failure"); return success; }, retryInterval); } + static String keyForNode(Client.Broker broker) { + return broker.getHost() + ":" + broker.getPort(); + } + + static Client.Broker brokerFromClient(Client client) { + return new Client.Broker(client.serverAdvertisedHost(), client.serverAdvertisedPort()); + } + + static Function, Client.Broker> brokerPicker() { + Random random = new Random(); + return brokers -> { + if (brokers.isEmpty()) { + return null; + } else if (brokers.size() == 1) { + return brokers.get(0); + } else { + return brokers.get(random.nextInt(brokers.size())); + } + }; + } + static Runnable namedRunnable(Runnable task, String format, Object... args) { return new NamedRunnable(format(format, args), task); } @@ -295,16 +312,18 @@ interface ClientFactory { Client client(ClientFactoryContext context); } - static class ExactNodeRetryClientFactory implements ClientFactory { + static class ConditionalClientFactory implements ClientFactory { private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1); private final ClientFactory delegate; - private final Predicate condition; + private final BiPredicate condition; private final Duration retryInterval; - ExactNodeRetryClientFactory( - ClientFactory delegate, Predicate condition, Duration retryInterval) { + ConditionalClientFactory( + ClientFactory delegate, + BiPredicate condition, + Duration retryInterval) { this.delegate = delegate; this.condition = condition; this.retryInterval = retryInterval; @@ -314,7 +333,7 @@ static class ExactNodeRetryClientFactory implements ClientFactory { public Client client(ClientFactoryContext context) { while (true) { Client client = this.delegate.client(context); - if (condition.test(client)) { + if (condition.test(context, client)) { return client; } else { try { @@ -335,29 +354,30 @@ public Client client(ClientFactoryContext context) { static class ClientFactoryContext { - private ClientParameters parameters; - private String key; + private final ClientParameters parameters; + private final String targetKey; + private final List candidates; - static ClientFactoryContext fromParameters(ClientParameters parameters) { - return new ClientFactoryContext().parameters(parameters); + ClientFactoryContext( + ClientParameters parameters, String targetKey, List candidates) { + this.parameters = parameters; + this.targetKey = targetKey; + this.candidates = + candidates == null + ? Collections.emptyList() + : unmodifiableList(new ArrayList<>(candidates)); } ClientParameters parameters() { return parameters; } - ClientFactoryContext parameters(ClientParameters parameters) { - this.parameters = parameters; - return this; - } - - String key() { - return key; + String targetKey() { + return targetKey; } - ClientFactoryContext key(String key) { - this.key = key; - return this; + List candidates() { + return candidates; } } @@ -673,4 +693,40 @@ static T lock(Lock lock, Supplier action) { lock.unlock(); } } + + static class BrokerWrapper { + + private final Client.Broker broker; + private final boolean leader; + + BrokerWrapper(Client.Broker broker, boolean leader) { + this.broker = broker; + this.leader = leader; + } + + Client.Broker broker() { + return broker; + } + + boolean isLeader() { + return this.leader; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + BrokerWrapper that = (BrokerWrapper) o; + return leader == that.leader && Objects.equals(broker, that.broker); + } + + @Override + public int hashCode() { + return Objects.hash(broker, leader); + } + + @Override + public String toString() { + return "BrokerWrapper{" + "broker=" + broker + ", leader=" + leader + '}'; + } + } } diff --git a/src/test/java/com/rabbitmq/stream/Host.java b/src/test/java/com/rabbitmq/stream/Host.java index f1267075ed..48c8c0d972 100644 --- a/src/test/java/com/rabbitmq/stream/Host.java +++ b/src/test/java/com/rabbitmq/stream/Host.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -118,7 +118,7 @@ static Process rabbitmqStreams(String command) { return executeCommand(rabbitmqStreamsCommand() + " " + command); } - public static Process rabbitmqctlIgnoreError(String command) throws IOException { + public static Process rabbitmqctlIgnoreError(String command) { return executeCommand(rabbitmqctlCommand() + " " + command, true); } diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 6b655e9141..acb40d7314 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -15,15 +15,18 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay; -import static com.rabbitmq.stream.impl.ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT; -import static com.rabbitmq.stream.impl.ConsumersCoordinator.pickSlot; +import static com.rabbitmq.stream.impl.ConsumersCoordinator.*; import static com.rabbitmq.stream.impl.TestUtils.b; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static com.rabbitmq.stream.impl.TestUtils.metadata; import static com.rabbitmq.stream.impl.TestUtils.namedConsumer; import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; +import static com.rabbitmq.stream.impl.Utils.brokerPicker; import static java.lang.String.format; import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.*; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -52,8 +55,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; +import java.util.function.Function; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -151,6 +153,7 @@ public Client.ClientParameters shutdownListener( when(environment.addressResolver()).thenReturn(address -> address); when(client.brokerVersion()).thenReturn("3.11.0"); when(client.isOpen()).thenReturn(true); + clientAdvertises(replica().get(0)); coordinator = new ConsumersCoordinator( @@ -158,7 +161,8 @@ public Client.ClientParameters shutdownListener( ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, type -> "consumer-connection", clientFactory, - false); + false, + brokerPicker()); } @AfterEach @@ -179,8 +183,7 @@ void tearDown() throws Exception { shouldRetryUntilGettingExactNodeWithAdvertisedHostNameClientFactoryAndNotExactNodeOnFirstTime() { ClientFactory cf = context -> - Utils.connectToAdvertisedNodeClientFactory( - context.key(), clientFactory, Duration.ofMillis(1)) + Utils.connectToAdvertisedNodeClientFactory(clientFactory, Duration.ofMillis(1)) .client(context); ConsumersCoordinator c = new ConsumersCoordinator( @@ -188,7 +191,8 @@ void tearDown() throws Exception { ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, type -> "consumer-connection", cf, - false); + false, + brokerPicker()); when(locator.metadata("stream")).thenReturn(metadata(null, replica())); when(clientFactory.client(any())).thenReturn(client); @@ -221,8 +225,7 @@ void tearDown() throws Exception { void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNodeOnFirstTime() { ClientFactory cf = context -> - Utils.connectToAdvertisedNodeClientFactory( - context.key(), clientFactory, Duration.ofMillis(1)) + Utils.connectToAdvertisedNodeClientFactory(clientFactory, Duration.ofMillis(1)) .client(context); ConsumersCoordinator c = new ConsumersCoordinator( @@ -230,7 +233,8 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, type -> "consumer-connection", cf, - false); + false, + brokerPicker()); when(locator.metadata("stream")).thenReturn(metadata(null, replica())); when(clientFactory.client(any())).thenReturn(client); @@ -259,6 +263,48 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); } + @Test + void shouldAcceptCandidateNode() { + ClientFactory cf = + context -> + Utils.connectToAdvertisedNodeClientFactory(clientFactory, Duration.ofMillis(1)) + .client(context); + ConsumersCoordinator c = + new ConsumersCoordinator( + environment, + ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, + type -> "consumer-connection", + cf, + false, + brokers -> brokers.get(0)); + + when(locator.metadata("stream")).thenReturn(metadata(null, replicas())); + when(clientFactory.client(any())).thenReturn(client); + when(client.subscribe( + subscriptionIdCaptor.capture(), + anyString(), + any(OffsetSpecification.class), + anyInt(), + anyMap())) + .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); + when(client.serverAdvertisedHost()).thenReturn("foo").thenReturn(replicas().get(1).getHost()); + when(client.serverAdvertisedPort()).thenReturn(42).thenReturn(replicas().get(1).getPort()); + + c.subscribe( + consumer, + "stream", + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + NO_OP_TRACKING_CLOSING_CALLBACK, + (offset, message) -> {}, + Collections.emptyMap(), + flowStrategy()); + verify(clientFactory, times(2)).client(any()); + verify(client, times(1)) + .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); + } + @Test @SuppressWarnings("unchecked") void shouldSubscribeWithEmptyPropertiesWithUnamedConsumer() { @@ -396,17 +442,58 @@ void subscribeShouldThrowExceptionIfNoNodeAvailableForStream() { } @Test - void findBrokersForStreamShouldReturnLeaderIfNoReplicas() { + void findCandidateNodesShouldReturnLeaderIfNoReplicas() { when(locator.metadata("stream")).thenReturn(metadata(leader(), null)); - assertThat(coordinator.findBrokersForStream("stream", false)).hasSize(1).contains(leader()); + assertThat(coordinator.findCandidateNodes("stream", false)) + .hasSize(1) + .contains(leaderWrapper()); } @Test - void findBrokersForStreamShouldReturnReplicasIfThereAreSome() { + void findCandidateNodesShouldReturnReplicasIfThereAreSome() { when(locator.metadata("stream")).thenReturn(metadata(null, replicas())); - assertThat(coordinator.findBrokersForStream("stream", false)) + assertThat(coordinator.findCandidateNodes("stream", false)) .hasSize(2) - .hasSameElementsAs(replicas()); + .hasSameElementsAs(replicaWrappers()); + } + + @Test + void findCandidateNodesShouldReturnLeaderAndReplicasIfForceReplicaIsFalse() { + when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas())); + assertThat(coordinator.findCandidateNodes("stream", false)) + .hasSize(3) + .contains(leaderWrapper()) + .containsAll(replicaWrappers()); + } + + @Test + void findCandidateNodesShouldReturnOnlyReplicasIfForceReplicaIsTrue() { + when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas())); + assertThat(coordinator.findCandidateNodes("stream", true)) + .hasSize(2) + .containsAll(replicaWrappers()); + } + + @Test + void pickBrokerShouldPreferReplicas() { + Client.Broker leader = leader(); + List replicas = replicas(); + AtomicInteger sequence = new AtomicInteger(); + Function, Client.Broker> picker = + candidates -> candidates.get(sequence.getAndIncrement() % candidates.size()); + // never pick a leader if replicas are available + List leaderAndOneReplica = + Arrays.asList(leaderWrapper(), replicaWrappers().get(0)); + range(0, 10) + .forEach( + ignored -> { + Client.Broker picked = pickBroker(picker, nodeWrappers()); + assertThat(picked).isNotEqualTo(leader).isIn(replicas); + picked = pickBroker(picker, leaderAndOneReplica); + assertThat(picked).isNotEqualTo(leader).isIn(replicas); + }); + // pick the leader if it is the only one + assertThat(pickBroker(picker, singletonList(leaderWrapper()))).isEqualTo(leader); } @Test @@ -495,6 +582,7 @@ void shouldNotUnsubscribeIfClientIsClosed() { @Test void subscribeShouldSubscribeToStreamAndDispatchMessageWithManySubscriptions() { when(locator.metadata("stream")).thenReturn(metadata(leader(), null)); + clientAdvertises(leader()); when(clientFactory.client(any())).thenReturn(client); when(client.subscribe( @@ -1078,6 +1166,7 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscriptions( int maxConsumersByConnection) { when(locator.metadata("stream")).thenReturn(metadata(leader(), null)); + clientAdvertises(leader()); when(clientFactory.client(any())).thenReturn(client); @@ -1099,10 +1188,11 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip maxConsumersByConnection, type -> "consumer-connection", clientFactory, - false); + false, + brokerPicker()); List closingRunnables = - IntStream.range(0, subscriptionCount) + range(0, subscriptionCount) .mapToObj( i -> coordinator.subscribe( @@ -1115,7 +1205,7 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip (offset, message) -> {}, Collections.emptyMap(), flowStrategy())) - .collect(Collectors.toList()); + .collect(toList()); verify(clientFactory, times(2)).client(any()); verify(client, times(subscriptionCount)) @@ -1163,7 +1253,7 @@ void shouldRemoveClientSubscriptionManagerFromPoolAfterConnectionDies() throws E int extraSubscriptionCount = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT / 5; int subscriptionCount = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + extraSubscriptionCount; - IntStream.range(0, subscriptionCount) + range(0, subscriptionCount) .forEach( i -> { coordinator.subscribe( @@ -1228,7 +1318,7 @@ void shouldRemoveClientSubscriptionManagerFromPoolIfEmptyAfterMetadataUpdate() t int extraSubscriptionCount = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT / 5; int subscriptionCount = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + extraSubscriptionCount; - IntStream.range(0, subscriptionCount) + range(0, subscriptionCount) .forEach( i -> { coordinator.subscribe( @@ -1919,7 +2009,8 @@ void shouldRetryUntilReplicaIsAvailableWhenForceReplicaIsOn() throws Exception { ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, type -> "consumer-connection", clientFactory, - true); + true, + brokerPicker()); AtomicInteger messageHandlerCalls = new AtomicInteger(); Runnable closingRunnable = @@ -1990,7 +2081,7 @@ void shouldRetryUntilReplicaIsAvailableWhenForceReplicaIsOn() throws Exception { @Test void pickSlotTest() { List list = new ArrayList<>(ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT); - IntStream.range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(ignored -> list.add(null)); + range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(ignored -> list.add(null)); AtomicInteger sequence = new AtomicInteger(0); int index = pickSlot(list, sequence); assertThat(index).isZero(); @@ -2028,14 +2119,28 @@ void pickSlotTest() { assertThat(index).isEqualTo(5); } - Client.Broker leader() { + static Client.Broker leader() { return new Client.Broker("leader", -1); } - List replicas() { + static Utils.BrokerWrapper leaderWrapper() { + return new Utils.BrokerWrapper(leader(), true); + } + + static List replicas() { return Arrays.asList(new Client.Broker("replica1", -1), new Client.Broker("replica2", -1)); } + static List nodeWrappers() { + List wrappers = new ArrayList<>(replicaWrappers()); + wrappers.add(leaderWrapper()); + return wrappers; + } + + static List replicaWrappers() { + return replicas().stream().map(b -> new Utils.BrokerWrapper(b, false)).collect(toList()); + } + List replica() { return replicas().subList(0, 1); } @@ -2066,4 +2171,9 @@ private static Response responseOk() { private static ConsumerFlowStrategy flowStrategy() { return ConsumerFlowStrategy.creditOnChunkArrival(10); } + + private void clientAdvertises(Client.Broker broker) { + when(client.serverAdvertisedHost()).thenReturn(broker.getHost()); + when(client.serverAdvertisedPort()).thenReturn(broker.getPort()); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java new file mode 100644 index 0000000000..727ccc58e4 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java @@ -0,0 +1,124 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static java.lang.Integer.parseInt; +import static java.util.stream.Collectors.toSet; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import com.rabbitmq.stream.Address; +import com.rabbitmq.stream.ConsumerFlowStrategy; +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.SubscriptionListener; +import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo; +import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster; +import io.netty.channel.EventLoopGroup; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@DisabledIfNotCluster +@StreamTestInfrastructure +public class LoadBalancerClusterTest { + + private static final int LB_PORT = 5555; + private static final SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = subscriptionContext -> {}; + private static final Runnable NO_OP_TRACKING_CLOSING_CALLBACK = () -> {}; + + @Mock StreamEnvironment environment; + @Mock StreamConsumer consumer; + AutoCloseable mocks; + TestUtils.ClientFactory cf; + String stream; + EventLoopGroup eventLoopGroup; + Client locator; + + @BeforeEach + void init() { + mocks = MockitoAnnotations.openMocks(this); + locator = cf.get(new Client.ClientParameters().port(LB_PORT)); + when(environment.locator()).thenReturn(locator); + when(environment.clientParametersCopy()) + .thenReturn(new Client.ClientParameters().eventLoopGroup(eventLoopGroup).port(LB_PORT)); + Address loadBalancerAddress = new Address("localhost", LB_PORT); + when(environment.addressResolver()).thenReturn(address -> loadBalancerAddress); + when(environment.locatorOperation(any())).thenCallRealMethod(); + } + + @AfterEach + void tearDown() throws Exception { + mocks.close(); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void pickConsumersAmongCandidates(boolean forceReplica) { + int maxSubscriptionsPerClient = 2; + int subscriptionCount = maxSubscriptionsPerClient * 10; + try (ConsumersCoordinator c = + new ConsumersCoordinator( + environment, + maxSubscriptionsPerClient, + type -> "consumer-connection", + Utils.coordinatorClientFactory(this.environment, Duration.ofMillis(10)), + forceReplica, + Utils.brokerPicker())) { + + IntStream.range(0, subscriptionCount) + .forEach( + ignored -> { + c.subscribe( + consumer, + stream, + OffsetSpecification.first(), + null, + NO_OP_SUBSCRIPTION_LISTENER, + NO_OP_TRACKING_CLOSING_CALLBACK, + (offset, message) -> {}, + Collections.emptyMap(), + flowStrategy()); + }); + + Client.StreamMetadata metadata = locator.metadata(stream).get(stream); + Set allowedNodes = new HashSet<>(metadata.getReplicas()); + if (!forceReplica) { + allowedNodes.add(metadata.getLeader()); + } + + ConsumerCoordinatorInfo info = MonitoringTestUtils.extract(c); + assertThat(info.consumerCount()).isEqualTo(subscriptionCount); + Set usedNodes = + info.clients().stream() + .map(m -> m.node().split(":")) + .map(np -> new Client.Broker(np[0], parseInt(np[1]))) + .collect(toSet()); + assertThat(usedNodes).hasSameSizeAs(allowedNodes).containsAll(allowedNodes); + } + } + + private static ConsumerFlowStrategy flowStrategy() { + return ConsumerFlowStrategy.creditOnChunkArrival(10); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java b/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java index 4fc9ae051a..e645f4eb93 100644 --- a/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/MonitoringTestUtils.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -130,7 +130,11 @@ public ConsumerManager(long id, String node, int consumer_count) { } public int getConsumerCount() { - return consumer_count; + return this.consumer_count; + } + + public String node() { + return this.node; } @Override diff --git a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java index 238c645053..dc89e5ec4c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -117,6 +117,8 @@ public Client.ClientParameters metadataListener( when(trackingConsumer.stream()).thenReturn("stream"); when(client.declarePublisher(anyByte(), isNull(), anyString())) .thenReturn(new Response(Constants.RESPONSE_CODE_OK)); + when(client.serverAdvertisedHost()).thenReturn(leader().getHost()); + when(client.serverAdvertisedPort()).thenReturn(leader().getPort()); coordinator = new ProducersCoordinator( environment, @@ -184,8 +186,7 @@ void registerShouldAllowPublishing() { shouldRetryUntilGettingExactNodeWithAdvertisedHostNameClientFactoryAndNotExactNodeOnFirstTime() { ClientFactory cf = context -> - Utils.connectToAdvertisedNodeClientFactory( - context.key(), clientFactory, Duration.ofMillis(1)) + Utils.connectToAdvertisedNodeClientFactory(clientFactory, Duration.ofMillis(1)) .client(context); ProducersCoordinator c = new ProducersCoordinator( @@ -212,8 +213,7 @@ void registerShouldAllowPublishing() { void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNodeOnFirstTime() { ClientFactory cf = context -> - Utils.connectToAdvertisedNodeClientFactory( - context.key(), clientFactory, Duration.ofMillis(1)) + Utils.connectToAdvertisedNodeClientFactory(clientFactory, Duration.ofMillis(1)) .client(context); ProducersCoordinator c = new ProducersCoordinator( @@ -391,6 +391,10 @@ void shouldRedistributeProducersAndTrackingConsumersOnMetadataUpdate() throws Ex .thenReturn(metadata(movingStream, null, replicas())) .thenReturn(metadata(movingStream, leader2(), replicas())); + // the created client is on leader1 + when(client.serverAdvertisedHost()).thenReturn(leader1().getHost()); + when(client.serverAdvertisedPort()).thenReturn(leader1().getPort()); + String fixedStream = "fixed-stream"; when(locator.metadata(fixedStream)).thenReturn(metadata(fixedStream, leader1(), replicas())); @@ -440,6 +444,10 @@ void shouldRedistributeProducersAndTrackingConsumersOnMetadataUpdate() throws Ex verify(fixedTrackingConsumer, times(1)).setTrackingClient(client); assertThat(coordinator.clientCount()).isEqualTo(1); + // the created client is on leader2 + when(client.serverAdvertisedHost()).thenReturn(leader2().getHost()); + when(client.serverAdvertisedPort()).thenReturn(leader2().getPort()); + metadataListener.handle(movingStream, Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE); assertThat(setClientLatch.await(5, TimeUnit.SECONDS)).isTrue(); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamTestInfrastructure.java b/src/test/java/com/rabbitmq/stream/impl/StreamTestInfrastructure.java new file mode 100644 index 0000000000..4c39066377 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/StreamTestInfrastructure.java @@ -0,0 +1,24 @@ +// Copyright (c) 2024 Broadcom. All Rights Reserved. +// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import java.lang.annotation.*; +import org.junit.jupiter.api.extension.ExtendWith; + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Documented +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public @interface StreamTestInfrastructure {} diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 76c1418d36..41e4689ef9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -386,6 +386,16 @@ static boolean tlsAvailable() { } } + static boolean isCluster() { + try { + Process process = Host.rabbitmqctl("eval 'nodes().'"); + String content = capture(process.getInputStream()); + return !content.replace("[", "").replace("]", "").trim().isEmpty(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private static String capture(InputStream is) throws IOException { BufferedReader br = new BufferedReader(new InputStreamReader(is)); String line; @@ -536,6 +546,12 @@ static boolean atLeastVersion(String expectedVersion, String currentVersion) { @ExtendWith(DisabledIfTlsNotEnabledCondition.class) public @interface DisabledIfTlsNotEnabled {} + @Target({ElementType.TYPE, ElementType.METHOD}) + @Retention(RetentionPolicy.RUNTIME) + @Documented + @ExtendWith(DisabledIfNotClusterCondition.class) + @interface DisabledIfNotCluster {} + @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented @@ -898,6 +914,22 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con } } + static class DisabledIfNotClusterCondition implements ExecutionCondition { + + private static final String KEY = "isCluster"; + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + ExtensionContext.Store store = context.getRoot().getStore(ExtensionContext.Namespace.GLOBAL); + boolean isCluster = store.getOrComputeIfAbsent(KEY, k -> isCluster(), Boolean.class); + if (isCluster) { + return ConditionEvaluationResult.enabled("Multi-node cluster"); + } else { + return ConditionEvaluationResult.disabled("Not a multi-node cluster"); + } + } + } + private static class BaseBrokerVersionAtLeastCondition implements ExecutionCondition { private final Function versionProvider; diff --git a/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java b/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java index b8b3e8c1b3..bd564968f4 100644 --- a/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/UtilsTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -20,6 +20,7 @@ import static com.rabbitmq.stream.impl.Utils.defaultConnectionNamingStrategy; import static com.rabbitmq.stream.impl.Utils.formatConstant; import static com.rabbitmq.stream.impl.Utils.offsetBefore; +import static java.util.Collections.emptyList; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -32,10 +33,10 @@ import com.rabbitmq.stream.impl.Utils.ClientConnectionType; import com.rabbitmq.stream.impl.Utils.ClientFactory; import com.rabbitmq.stream.impl.Utils.ClientFactoryContext; -import com.rabbitmq.stream.impl.Utils.ExactNodeRetryClientFactory; +import com.rabbitmq.stream.impl.Utils.ConditionalClientFactory; import java.time.Duration; +import java.util.function.BiPredicate; import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -53,14 +54,14 @@ void formatConstantOk() { } @Test - void exactNodeRetryClientFactoryShouldReturnImmediatelyIfConditionOk() { + void conditionalClientFactoryShouldReturnImmediatelyIfConditionOk() { Client client = mock(Client.class); ClientFactory cf = mock(ClientFactory.class); when(cf.client(any())).thenReturn(client); - Predicate condition = c -> true; + BiPredicate condition = (ctx, c) -> true; Client result = - new ExactNodeRetryClientFactory(cf, condition, Duration.ofMillis(1)) - .client(ClientFactoryContext.fromParameters(new ClientParameters())); + new ConditionalClientFactory(cf, condition, Duration.ofMillis(1)) + .client(new ClientFactoryContext(new ClientParameters(), "", emptyList())); assertThat(result).isEqualTo(client); verify(cf, times(1)).client(any()); verify(client, never()).close(); @@ -68,15 +69,15 @@ void exactNodeRetryClientFactoryShouldReturnImmediatelyIfConditionOk() { @Test @SuppressWarnings("unchecked") - void exactNodeRetryClientFactoryShouldRetryUntilConditionOk() { + void conditionalClientFactoryShouldRetryUntilConditionOk() { Client client = mock(Client.class); ClientFactory cf = mock(ClientFactory.class); when(cf.client(any())).thenReturn(client); - Predicate condition = mock(Predicate.class); - when(condition.test(any())).thenReturn(false).thenReturn(false).thenReturn(true); + BiPredicate condition = mock(BiPredicate.class); + when(condition.test(any(), any())).thenReturn(false).thenReturn(false).thenReturn(true); Client result = - new ExactNodeRetryClientFactory(cf, condition, Duration.ofMillis(1)) - .client(ClientFactoryContext.fromParameters(new ClientParameters())); + new ConditionalClientFactory(cf, condition, Duration.ofMillis(1)) + .client(new ClientFactoryContext(new ClientParameters(), "", emptyList())); assertThat(result).isEqualTo(client); verify(cf, times(3)).client(any()); verify(client, times(2)).close(); diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 4bec720537..d5a75a1ef7 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -6,6 +6,8 @@ + +