1
- // Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
1
+ // Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
2
2
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3
3
//
4
4
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -70,6 +70,7 @@ class ConsumersCoordinator {
70
70
private final AtomicLong managerIdSequence = new AtomicLong (0 );
71
71
private final NavigableSet <ClientSubscriptionsManager > managers = new ConcurrentSkipListSet <>();
72
72
private final AtomicLong trackerIdSequence = new AtomicLong (0 );
73
+ private final Function <List <Broker >, Broker > brokerPicker ;
73
74
74
75
private final List <SubscriptionTracker > trackers = new CopyOnWriteArrayList <>();
75
76
private final ExecutorServiceFactory executorServiceFactory =
@@ -83,16 +84,14 @@ class ConsumersCoordinator {
83
84
int maxConsumersByConnection ,
84
85
Function <ClientConnectionType , String > connectionNamingStrategy ,
85
86
ClientFactory clientFactory ,
86
- boolean forceReplica ) {
87
+ boolean forceReplica ,
88
+ Function <List <Broker >, Broker > brokerPicker ) {
87
89
this .environment = environment ;
88
90
this .clientFactory = clientFactory ;
89
91
this .maxConsumersByConnection = maxConsumersByConnection ;
90
92
this .connectionNamingStrategy = connectionNamingStrategy ;
91
93
this .forceReplica = forceReplica ;
92
- }
93
-
94
- private static String keyForClientSubscription (Client .Broker broker ) {
95
- return broker .getHost () + ":" + broker .getPort ();
94
+ this .brokerPicker = brokerPicker ;
96
95
}
97
96
98
97
private BackOffDelayPolicy recoveryBackOffDelayPolicy () {
@@ -138,7 +137,7 @@ Runnable subscribe(
138
137
flowStrategy );
139
138
140
139
try {
141
- addToManager (newNode , subscriptionTracker , offsetSpecification , true );
140
+ addToManager (newNode , candidates , subscriptionTracker , offsetSpecification , true );
142
141
} catch (ConnectionStreamException e ) {
143
142
// these exceptions are not public
144
143
throw new StreamException (e .getMessage ());
@@ -162,6 +161,7 @@ Runnable subscribe(
162
161
163
162
private void addToManager (
164
163
Broker node ,
164
+ List <Broker > candidates ,
165
165
SubscriptionTracker tracker ,
166
166
OffsetSpecification offsetSpecification ,
167
167
boolean isInitialSubscription ) {
@@ -189,9 +189,9 @@ private void addToManager(
189
189
}
190
190
}
191
191
if (pickedManager == null ) {
192
- String name = keyForClientSubscription (node );
192
+ String name = keyForNode (node );
193
193
LOGGER .debug ("Creating subscription manager on {}" , name );
194
- pickedManager = new ClientSubscriptionsManager (node , clientParameters );
194
+ pickedManager = new ClientSubscriptionsManager (node , candidates , clientParameters );
195
195
LOGGER .debug ("Created subscription manager on {}, id {}" , name , pickedManager .id );
196
196
}
197
197
try {
@@ -571,6 +571,7 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
571
571
private final long id ;
572
572
private final Broker node ;
573
573
private final Client client ;
574
+ // <host>:<port> (actual or advertised)
574
575
private final String name ;
575
576
// the 2 data structures track the subscriptions, they must remain consistent
576
577
private final Map <String , Set <SubscriptionTracker >> streamToStreamSubscriptions =
@@ -582,12 +583,12 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
582
583
private volatile int trackerCount ;
583
584
private final AtomicBoolean closed = new AtomicBoolean (false );
584
585
585
- private ClientSubscriptionsManager (Broker node , Client .ClientParameters clientParameters ) {
586
+ private ClientSubscriptionsManager (
587
+ Broker targetNode , List <Broker > candidates , Client .ClientParameters clientParameters ) {
586
588
this .id = managerIdSequence .getAndIncrement ();
587
- this .node = node ;
588
- this .name = keyForClientSubscription (node );
589
- LOGGER .debug ("creating subscription manager on {}" , name );
590
589
this .trackerCount = 0 ;
590
+ AtomicReference <String > nameReference = new AtomicReference <>();
591
+
591
592
AtomicBoolean clientInitializedInManager = new AtomicBoolean (false );
592
593
ChunkListener chunkListener =
593
594
(client , subscriptionId , offset , messageCount , dataSize ) -> {
@@ -639,7 +640,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
639
640
"Could not find stream subscription {} in manager {}, node {} for message listener" ,
640
641
subscriptionId ,
641
642
this .id ,
642
- this . name );
643
+ nameReference . get () );
643
644
}
644
645
};
645
646
MessageIgnoredListener messageIgnoredListener =
@@ -663,7 +664,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
663
664
"Could not find stream subscription {} in manager {}, node {} for message ignored listener" ,
664
665
subscriptionId ,
665
666
this .id ,
666
- this . name );
667
+ nameReference . get () );
667
668
}
668
669
};
669
670
ShutdownListener shutdownListener =
@@ -675,7 +676,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
675
676
if (shutdownContext .isShutdownUnexpected ()) {
676
677
LOGGER .debug (
677
678
"Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment" ,
678
- name );
679
+ nameReference . get () );
679
680
LOGGER .debug (
680
681
"Subscription connection has {} consumer(s) over {} stream(s) to recover" ,
681
682
this .subscriptionTrackers .stream ().filter (Objects ::nonNull ).count (),
@@ -718,7 +719,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
718
719
}
719
720
},
720
721
"Consumers re-assignment after disconnection from %s" ,
721
- name ));
722
+ nameReference . get () ));
722
723
}
723
724
};
724
725
MetadataListener metadataListener =
@@ -792,18 +793,23 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
792
793
};
793
794
String connectionName = connectionNamingStrategy .apply (ClientConnectionType .CONSUMER );
794
795
ClientFactoryContext clientFactoryContext =
795
- ClientFactoryContext .fromParameters (
796
- clientParameters
797
- .clientProperty ("connection_name" , connectionName )
798
- .chunkListener (chunkListener )
799
- .creditNotification (creditNotification )
800
- .messageListener (messageListener )
801
- .messageIgnoredListener (messageIgnoredListener )
802
- .shutdownListener (shutdownListener )
803
- .metadataListener (metadataListener )
804
- .consumerUpdateListener (consumerUpdateListener ))
805
- .key (name );
796
+ new ClientFactoryContext (
797
+ clientParameters
798
+ .clientProperty ("connection_name" , connectionName )
799
+ .chunkListener (chunkListener )
800
+ .creditNotification (creditNotification )
801
+ .messageListener (messageListener )
802
+ .messageIgnoredListener (messageIgnoredListener )
803
+ .shutdownListener (shutdownListener )
804
+ .metadataListener (metadataListener )
805
+ .consumerUpdateListener (consumerUpdateListener ),
806
+ keyForNode (targetNode ),
807
+ candidates );
806
808
this .client = clientFactory .client (clientFactoryContext );
809
+ this .node = brokerFromClient (this .client );
810
+ this .name = keyForNode (this .node );
811
+ nameReference .set (this .name );
812
+ LOGGER .debug ("creating subscription manager on {}" , name );
807
813
LOGGER .debug ("Created consumer connection '{}'" , connectionName );
808
814
clientInitializedInManager .set (true );
809
815
}
@@ -906,7 +912,7 @@ private void recoverSubscription(List<Broker> candidates, SubscriptionTracker tr
906
912
} else {
907
913
offsetSpecification = tracker .initialOffsetSpecification ;
908
914
}
909
- addToManager (broker , tracker , offsetSpecification , false );
915
+ addToManager (broker , candidates , tracker , offsetSpecification , false );
910
916
}
911
917
}
912
918
} else {
0 commit comments