Skip to content

Commit 0eb471f

Browse files
committed
Harden candidate lookup for producers
1 parent 949c150 commit 0eb471f

File tree

6 files changed

+190
-35
lines changed

6 files changed

+190
-35
lines changed

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+29-11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.Tuples.pair;
1718
import static com.rabbitmq.stream.impl.Utils.*;
1819
import static java.util.stream.Collectors.toList;
1920
import static java.util.stream.Collectors.toSet;
@@ -30,6 +31,7 @@
3031
import com.rabbitmq.stream.impl.Client.PublishErrorListener;
3132
import com.rabbitmq.stream.impl.Client.Response;
3233
import com.rabbitmq.stream.impl.Client.ShutdownListener;
34+
import com.rabbitmq.stream.impl.Tuples.Pair;
3335
import com.rabbitmq.stream.impl.Utils.ClientConnectionType;
3436
import com.rabbitmq.stream.impl.Utils.ClientFactory;
3537
import com.rabbitmq.stream.impl.Utils.ClientFactoryContext;
@@ -219,10 +221,13 @@ List<BrokerWrapper> findCandidateNodes(String stream, boolean forceLeader) {
219221

220222
List<BrokerWrapper> candidates = new ArrayList<>();
221223
Client.Broker leader = streamMetadata.getLeader();
222-
if (leader == null && forceLeader) {
223-
throw new IllegalStateException("Not leader available for stream " + stream);
224+
if (leader == null) {
225+
if (forceLeader) {
226+
throw new IllegalStateException("Not leader available for stream " + stream);
227+
}
228+
} else {
229+
candidates.add(new BrokerWrapper(leader, true));
224230
}
225-
candidates.add(new BrokerWrapper(leader, true));
226231

227232
if (!forceLeader && streamMetadata.hasReplicas()) {
228233
candidates.addAll(
@@ -231,7 +236,11 @@ List<BrokerWrapper> findCandidateNodes(String stream, boolean forceLeader) {
231236
.collect(toList()));
232237
}
233238

234-
LOGGER.debug("Candidates to publish to {}: {}", stream, candidates);
239+
if (candidates.isEmpty()) {
240+
throw new IllegalStateException("No stream member available to publish for stream " + stream);
241+
} else {
242+
LOGGER.debug("Candidates to publish to {}: {}", stream, candidates);
243+
}
235244

236245
return List.copyOf(candidates);
237246
}
@@ -721,15 +730,20 @@ private ClientProducersManager(
721730

722731
private void assignProducersToNewManagers(
723732
Collection<AgentTracker> trackers, String stream, BackOffDelayPolicy delayPolicy) {
724-
AsyncRetry.asyncRetry(() -> findCandidateNodes(stream, forceLeader))
733+
AsyncRetry.asyncRetry(
734+
() -> {
735+
List<BrokerWrapper> candidates = findCandidateNodes(stream, forceLeader);
736+
return pair(pickBroker(candidates), candidates);
737+
})
725738
.description("Candidate lookup to publish to " + stream)
726739
.scheduler(environment.scheduledExecutorService())
727740
.retry(ex -> !(ex instanceof StreamDoesNotExistException))
728741
.delayPolicy(delayPolicy)
729742
.build()
730743
.thenAccept(
731-
candidates -> {
732-
Broker broker = pickBroker(candidates);
744+
brokerAndCandidates -> {
745+
Broker broker = brokerAndCandidates.v1();
746+
List<BrokerWrapper> candidates = brokerAndCandidates.v2();
733747
String key = keyForNode(broker);
734748
LOGGER.debug(
735749
"Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
@@ -805,15 +819,19 @@ private void recoverAgent(Broker node, List<BrokerWrapper> candidates, AgentTrac
805819
tracker.identifiable() ? tracker.id() : "N/A",
806820
tracker.stream());
807821
// maybe not a good candidate, let's refresh and retry for this one
808-
candidates =
809-
Utils.callAndMaybeRetry(
810-
() -> findCandidateNodes(tracker.stream(), forceLeader),
822+
Pair<Broker, List<BrokerWrapper>> brokerAndCandidates =
823+
callAndMaybeRetry(
824+
() -> {
825+
List<BrokerWrapper> cs = findCandidateNodes(tracker.stream(), forceLeader);
826+
return pair(pickBroker(cs), cs);
827+
},
811828
ex -> !(ex instanceof StreamDoesNotExistException),
812829
environment.recoveryBackOffDelayPolicy(),
813830
"Candidate lookup for %s on stream '%s'",
814831
tracker.type(),
815832
tracker.stream());
816-
node = pickBroker(candidates);
833+
node = brokerAndCandidates.v1();
834+
candidates = brokerAndCandidates.v2();
817835
} catch (Exception e) {
818836
LOGGER.warn(
819837
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
final class Tuples {
18+
19+
private Tuples() {}
20+
21+
static <A, B> Pair<A, B> pair(A v1, B v2) {
22+
return new Pair<>(v1, v2);
23+
}
24+
25+
static class Pair<A, B> {
26+
27+
private final A v1;
28+
private final B v2;
29+
30+
private Pair(A v1, B v2) {
31+
this.v1 = v1;
32+
this.v2 = v2;
33+
}
34+
35+
A v1() {
36+
return this.v1;
37+
}
38+
39+
B v2() {
40+
return this.v2;
41+
}
42+
}
43+
}

src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class LoadBalancerClusterTest {
5757
String stream;
5858
EventLoopGroup eventLoopGroup;
5959
Client locator;
60-
Address loadBalancerAddress = new Address("localhost", LB_PORT);
60+
static final Address LOAD_BALANCER_ADDRESS = new Address("localhost", LB_PORT);
6161

6262
@BeforeEach
6363
void init() {
@@ -66,7 +66,7 @@ void init() {
6666
when(environment.locator()).thenReturn(locator);
6767
when(environment.clientParametersCopy())
6868
.thenReturn(new Client.ClientParameters().eventLoopGroup(eventLoopGroup).port(LB_PORT));
69-
when(environment.addressResolver()).thenReturn(address -> loadBalancerAddress);
69+
when(environment.addressResolver()).thenReturn(address -> LOAD_BALANCER_ADDRESS);
7070
when(environment.locatorOperation(any())).thenCallRealMethod();
7171
}
7272

@@ -175,7 +175,7 @@ void producersConsumersShouldSpreadAccordingToDataLocalitySettings(boolean force
175175
.port(LB_PORT)
176176
.forceReplicaForConsumers(forceLocality)
177177
.forceReplicaForConsumers(forceLocality)
178-
.addressResolver(addr -> loadBalancerAddress)
178+
.addressResolver(addr -> LOAD_BALANCER_ADDRESS)
179179
.maxProducersByConnection(maxPerConnection)
180180
.maxConsumersByConnection(maxPerConnection)
181181
.forceLeaderForProducers(forceLocality)

src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -750,12 +750,27 @@ void findCandidateNodesShouldReturnLeaderAndReplicasWhenForceLeaderIsFalse() {
750750
}
751751

752752
@Test
753-
void findCandidateNodesShouldThrowIfThereIsNoLeader() {
753+
void findCandidateNodesShouldThrowIfThereIsNoLeaderAndForceLeaderIsTrue() {
754754
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
755755
assertThatThrownBy(() -> coordinator.findCandidateNodes("stream", true))
756756
.isInstanceOf(IllegalStateException.class);
757757
}
758758

759+
@Test
760+
void findCandidateNodesShouldThrowIfNoMembersAndForceLeaderIsFalse() {
761+
when(locator.metadata("stream")).thenReturn(metadata(null, List.of()));
762+
assertThatThrownBy(() -> coordinator.findCandidateNodes("stream", false))
763+
.isInstanceOf(IllegalStateException.class);
764+
}
765+
766+
@Test
767+
void findCandidateNodesShouldReturnOnlyReplicasIfNoLeaderAndForceLeaderIsFalse() {
768+
when(locator.metadata("stream")).thenReturn(metadata(null, replicas()));
769+
assertThat(coordinator.findCandidateNodes("stream", false))
770+
.hasSize(2)
771+
.containsAll(replicaWrappers());
772+
}
773+
759774
private static ScheduledExecutorService createScheduledExecutorService() {
760775
return new ScheduledExecutorServiceWrapper(Executors.newSingleThreadScheduledExecutor());
761776
}

0 commit comments

Comments
 (0)