15
15
package com .rabbitmq .stream .impl ;
16
16
17
17
import static com .rabbitmq .stream .impl .Utils .*;
18
+ import static java .util .stream .Collectors .toList ;
18
19
import static java .util .stream .Collectors .toSet ;
19
20
20
21
import com .rabbitmq .stream .BackOffDelayPolicy ;
49
50
import org .slf4j .Logger ;
50
51
import org .slf4j .LoggerFactory ;
51
52
52
- class ProducersCoordinator {
53
+ final class ProducersCoordinator implements AutoCloseable {
53
54
54
55
static final int MAX_PRODUCERS_PER_CLIENT = 256 ;
55
56
static final int MAX_TRACKING_CONSUMERS_PER_CLIENT = 50 ;
@@ -67,18 +68,21 @@ class ProducersCoordinator {
67
68
new DefaultExecutorServiceFactory (
68
69
Runtime .getRuntime ().availableProcessors (), 10 , "rabbitmq-stream-producer-connection-" );
69
70
private final Lock coordinatorLock = new ReentrantLock ();
71
+ private final boolean forceLeader ;
70
72
71
73
ProducersCoordinator (
72
74
StreamEnvironment environment ,
73
75
int maxProducersByClient ,
74
76
int maxTrackingConsumersByClient ,
75
77
Function <ClientConnectionType , String > connectionNamingStrategy ,
76
- ClientFactory clientFactory ) {
78
+ ClientFactory clientFactory ,
79
+ boolean forceLeader ) {
77
80
this .environment = environment ;
78
81
this .clientFactory = clientFactory ;
79
82
this .maxProducersByClient = maxProducersByClient ;
80
83
this .maxTrackingConsumersByClient = maxTrackingConsumersByClient ;
81
84
this .connectionNamingStrategy = connectionNamingStrategy ;
85
+ this .forceLeader = forceLeader ;
82
86
}
83
87
84
88
Runnable registerProducer (StreamProducer producer , String reference , String stream ) {
@@ -105,9 +109,10 @@ Runnable registerTrackingConsumer(StreamConsumer consumer) {
105
109
}
106
110
107
111
private Runnable registerAgentTracker (AgentTracker tracker , String stream ) {
108
- Client .Broker broker = getBrokerForProducer (stream );
112
+ List <BrokerWrapper > candidates = findCandidateNodes (stream , this .forceLeader );
113
+ Broker broker = pickBroker (candidates );
109
114
110
- addToManager (broker , tracker );
115
+ addToManager (broker , candidates , tracker );
111
116
112
117
if (DEBUG ) {
113
118
return () -> {
@@ -125,7 +130,7 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
125
130
}
126
131
}
127
132
128
- private void addToManager (Broker node , AgentTracker tracker ) {
133
+ private void addToManager (Broker node , List < BrokerWrapper > candidates , AgentTracker tracker ) {
129
134
ClientParameters clientParameters =
130
135
environment
131
136
.clientParametersCopy ()
@@ -153,7 +158,8 @@ private void addToManager(Broker node, AgentTracker tracker) {
153
158
if (pickedManager == null ) {
154
159
String name = keyForNode (node );
155
160
LOGGER .debug ("Trying to create producer manager on {}" , name );
156
- pickedManager = new ClientProducersManager (node , this .clientFactory , clientParameters );
161
+ pickedManager =
162
+ new ClientProducersManager (node , candidates , this .clientFactory , clientParameters );
157
163
LOGGER .debug ("Created producer manager on {}, id {}" , name , pickedManager .id );
158
164
}
159
165
try {
@@ -192,11 +198,12 @@ private void addToManager(Broker node, AgentTracker tracker) {
192
198
}
193
199
}
194
200
195
- private Client .Broker getBrokerForProducer (String stream ) {
201
+ // package protected for testing
202
+ List <BrokerWrapper > findCandidateNodes (String stream , boolean forceLeader ) {
196
203
Map <String , Client .StreamMetadata > metadata =
197
204
this .environment .locatorOperation (
198
205
namedFunction (c -> c .metadata (stream ), "Candidate lookup to publish to '%s'" , stream ));
199
- if (metadata .size () == 0 || metadata .get (stream ) == null ) {
206
+ if (metadata .isEmpty () || metadata .get (stream ) == null ) {
200
207
throw new StreamDoesNotExistException (stream );
201
208
}
202
209
@@ -210,17 +217,34 @@ private Client.Broker getBrokerForProducer(String stream) {
210
217
}
211
218
}
212
219
220
+ List <BrokerWrapper > candidates = new ArrayList <>();
213
221
Client .Broker leader = streamMetadata .getLeader ();
214
- if (leader == null ) {
222
+ if (leader == null && forceLeader ) {
215
223
throw new IllegalStateException ("Not leader available for stream " + stream );
216
224
}
217
- LOGGER .debug (
218
- "Using client on {}:{} to publish to {}" , leader .getHost (), leader .getPort (), stream );
225
+ candidates .add (new BrokerWrapper (leader , true ));
219
226
220
- return leader ;
227
+ if (!forceLeader && !streamMetadata .getReplicas ().isEmpty ()) {
228
+ candidates .addAll (
229
+ streamMetadata .getReplicas ().stream ()
230
+ .map (b -> new BrokerWrapper (b , false ))
231
+ .collect (toList ()));
232
+ }
233
+
234
+ LOGGER .debug ("Candidates to publish to {}: {}" , stream , candidates );
235
+
236
+ return Collections .unmodifiableList (candidates );
237
+ }
238
+
239
+ static Broker pickBroker (List <BrokerWrapper > candidates ) {
240
+ return candidates .stream ()
241
+ .filter (BrokerWrapper ::isLeader )
242
+ .findFirst ()
243
+ .map (BrokerWrapper ::broker )
244
+ .orElseThrow (() -> new IllegalStateException ("Not leader available" ));
221
245
}
222
246
223
- void close () {
247
+ public void close () {
224
248
Iterator <ClientProducersManager > iterator = this .managers .iterator ();
225
249
while (iterator .hasNext ()) {
226
250
ClientProducersManager manager = iterator .next ();
@@ -568,7 +592,10 @@ private class ClientProducersManager implements Comparable<ClientProducersManage
568
592
private final AtomicBoolean closed = new AtomicBoolean (false );
569
593
570
594
private ClientProducersManager (
571
- Broker targetNode , ClientFactory cf , Client .ClientParameters clientParameters ) {
595
+ Broker targetNode ,
596
+ List <BrokerWrapper > candidates ,
597
+ ClientFactory cf ,
598
+ Client .ClientParameters clientParameters ) {
572
599
this .id = managerIdSequence .getAndIncrement ();
573
600
AtomicReference <String > nameReference = new AtomicReference <>();
574
601
AtomicReference <Client > ref = new AtomicReference <>();
@@ -682,7 +709,7 @@ private ClientProducersManager(
682
709
.metadataListener (metadataListener )
683
710
.clientProperty ("connection_name" , connectionName ),
684
711
keyForNode (targetNode ),
685
- Collections . emptyList ( ));
712
+ candidates . stream (). map ( BrokerWrapper :: broker ). collect ( toList () ));
686
713
this .client = cf .client (connectionFactoryContext );
687
714
this .node = Utils .brokerFromClient (this .client );
688
715
this .name = keyForNode (this .node );
@@ -694,18 +721,19 @@ private ClientProducersManager(
694
721
695
722
private void assignProducersToNewManagers (
696
723
Collection <AgentTracker > trackers , String stream , BackOffDelayPolicy delayPolicy ) {
697
- AsyncRetry .asyncRetry (() -> getBrokerForProducer (stream ))
724
+ AsyncRetry .asyncRetry (() -> findCandidateNodes (stream , forceLeader ))
698
725
.description ("Candidate lookup to publish to " + stream )
699
726
.scheduler (environment .scheduledExecutorService ())
700
727
.retry (ex -> !(ex instanceof StreamDoesNotExistException ))
701
728
.delayPolicy (delayPolicy )
702
729
.build ()
703
730
.thenAccept (
704
- broker -> {
731
+ candidates -> {
732
+ Broker broker = pickBroker (candidates );
705
733
String key = keyForNode (broker );
706
734
LOGGER .debug (
707
735
"Assigning {} producer(s) and consumer tracker(s) to {}" , trackers .size (), key );
708
- trackers .forEach (tracker -> maybeRecoverAgent (broker , tracker ));
736
+ trackers .forEach (tracker -> maybeRecoverAgent (broker , candidates , tracker ));
709
737
})
710
738
.exceptionally (
711
739
ex -> {
@@ -730,10 +758,11 @@ private void assignProducersToNewManagers(
730
758
});
731
759
}
732
760
733
- private void maybeRecoverAgent (Broker broker , AgentTracker tracker ) {
761
+ private void maybeRecoverAgent (
762
+ Broker broker , List <BrokerWrapper > candidates , AgentTracker tracker ) {
734
763
if (tracker .markRecoveryInProgress ()) {
735
764
try {
736
- recoverAgent (broker , tracker );
765
+ recoverAgent (broker , candidates , tracker );
737
766
} catch (Exception e ) {
738
767
LOGGER .warn (
739
768
"Error while recovering {} tracker {} (stream '{}'). Reason: {}" ,
@@ -750,14 +779,14 @@ private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
750
779
}
751
780
}
752
781
753
- private void recoverAgent (Broker node , AgentTracker tracker ) {
782
+ private void recoverAgent (Broker node , List < BrokerWrapper > candidates , AgentTracker tracker ) {
754
783
boolean reassignmentCompleted = false ;
755
784
while (!reassignmentCompleted ) {
756
785
try {
757
786
if (tracker .isOpen ()) {
758
787
LOGGER .debug (
759
788
"Using {} to resume {} to {}" , node .label (), tracker .type (), tracker .stream ());
760
- addToManager (node , tracker );
789
+ addToManager (node , candidates , tracker );
761
790
tracker .running ();
762
791
} else {
763
792
LOGGER .debug (
@@ -776,14 +805,15 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
776
805
tracker .identifiable () ? tracker .id () : "N/A" ,
777
806
tracker .stream ());
778
807
// maybe not a good candidate, let's refresh and retry for this one
779
- node =
808
+ candidates =
780
809
Utils .callAndMaybeRetry (
781
- () -> getBrokerForProducer (tracker .stream ()),
810
+ () -> findCandidateNodes (tracker .stream (), forceLeader ),
782
811
ex -> !(ex instanceof StreamDoesNotExistException ),
783
812
environment .recoveryBackOffDelayPolicy (),
784
813
"Candidate lookup for %s on stream '%s'" ,
785
814
tracker .type (),
786
815
tracker .stream ());
816
+ node = pickBroker (candidates );
787
817
} catch (Exception e ) {
788
818
LOGGER .warn (
789
819
"Error while re-assigning {} (stream '{}')" , tracker .type (), tracker .stream (), e );
0 commit comments