Skip to content

Commit 0d3f9e1

Browse files
authored
Merge pull request #478 from rabbitmq/allocate-producer-consumer-ids-in-sequence
Allocate producer/consumer IDs with a sequence
2 parents 8f5e9e8 + c973d01 commit 0d3f9e1

File tree

4 files changed

+164
-66
lines changed

4 files changed

+164
-66
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -576,16 +576,16 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
576576
new ConcurrentHashMap<>();
577577
// trackers and tracker count must be kept in sync
578578
private volatile List<SubscriptionTracker> subscriptionTrackers =
579-
new ArrayList<>(maxConsumersByConnection);
580-
private volatile int trackerCount = 0;
579+
createSubscriptionTrackerList();
580+
private final AtomicInteger consumerIndexSequence = new AtomicInteger(0);
581+
private volatile int trackerCount;
581582
private final AtomicBoolean closed = new AtomicBoolean(false);
582583

583584
private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientParameters) {
584585
this.id = managerIdSequence.getAndIncrement();
585586
this.node = node;
586587
this.name = keyForClientSubscription(node);
587588
LOGGER.debug("creating subscription manager on {}", name);
588-
IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null));
589589
this.trackerCount = 0;
590590
AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
591591
ChunkListener chunkListener =
@@ -729,10 +729,9 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
729729
synchronized (this) {
730730
Set<SubscriptionTracker> subscriptions = streamToStreamSubscriptions.remove(stream);
731731
if (subscriptions != null && !subscriptions.isEmpty()) {
732-
List<SubscriptionTracker> newSubscriptions =
733-
new ArrayList<>(maxConsumersByConnection);
734-
for (int i = 0; i < maxConsumersByConnection; i++) {
735-
newSubscriptions.add(subscriptionTrackers.get(i));
732+
List<SubscriptionTracker> newSubscriptions = createSubscriptionTrackerList();
733+
for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
734+
newSubscriptions.set(i, subscriptionTrackers.get(i));
736735
}
737736
for (SubscriptionTracker subscription : subscriptions) {
738737
LOGGER.debug(
@@ -864,6 +863,12 @@ private void assignConsumersToStream(
864863
});
865864
}
866865

866+
private List<SubscriptionTracker> createSubscriptionTrackerList() {
867+
List<SubscriptionTracker> newSubscriptions = new ArrayList<>(MAX_SUBSCRIPTIONS_PER_CLIENT);
868+
IntStream.range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(i -> newSubscriptions.add(null));
869+
return newSubscriptions;
870+
}
871+
867872
private void maybeRecoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
868873
if (tracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
869874
try {
@@ -958,13 +963,7 @@ synchronized void add(
958963

959964
checkNotClosed();
960965

961-
byte subscriptionId = 0;
962-
for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
963-
if (subscriptionTrackers.get(i) == null) {
964-
subscriptionId = (byte) i;
965-
break;
966-
}
967-
}
966+
byte subscriptionId = (byte) pickSlot(this.subscriptionTrackers, this.consumerIndexSequence);
968967

969968
List<SubscriptionTracker> previousSubscriptions = this.subscriptionTrackers;
970969

@@ -1121,15 +1120,14 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) {
11211120
}
11221121
});
11231122
closeIfEmpty();
1124-
// this.owner.maybeDisposeManager(this);
11251123
}
11261124

11271125
private List<SubscriptionTracker> update(
11281126
List<SubscriptionTracker> original, byte index, SubscriptionTracker newValue) {
1129-
List<SubscriptionTracker> newSubcriptions = new ArrayList<>(maxConsumersByConnection);
1127+
List<SubscriptionTracker> newSubcriptions = createSubscriptionTrackerList();
11301128
int intIndex = index & 0xFF;
1131-
for (int i = 0; i < maxConsumersByConnection; i++) {
1132-
newSubcriptions.add(i == intIndex ? newValue : original.get(i));
1129+
for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
1130+
newSubcriptions.set(i, i == intIndex ? newValue : original.get(i));
11331131
}
11341132
return newSubcriptions;
11351133
}
@@ -1280,4 +1278,12 @@ public long messageCount() {
12801278
return messageCount;
12811279
}
12821280
}
1281+
1282+
static <T> int pickSlot(List<T> list, AtomicInteger sequence) {
1283+
int index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_SUBSCRIPTIONS_PER_CLIENT);
1284+
while (list.get(index) != null) {
1285+
index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_SUBSCRIPTIONS_PER_CLIENT);
1286+
}
1287+
return index;
1288+
}
12831289
}

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.ConcurrentSkipListSet;
5050
import java.util.concurrent.CopyOnWriteArrayList;
5151
import java.util.concurrent.atomic.AtomicBoolean;
52+
import java.util.concurrent.atomic.AtomicInteger;
5253
import java.util.concurrent.atomic.AtomicLong;
5354
import java.util.concurrent.atomic.AtomicReference;
5455
import java.util.function.Function;
@@ -564,6 +565,7 @@ private class ClientProducersManager implements Comparable<ClientProducersManage
564565
private final Broker node;
565566
private final ConcurrentMap<Byte, ProducerTracker> producers =
566567
new ConcurrentHashMap<>(maxProducersByClient);
568+
private final AtomicInteger producerIndexSequence = new AtomicInteger(0);
567569
private final Set<AgentTracker> trackingConsumerTrackers =
568570
ConcurrentHashMap.newKeySet(maxTrackingConsumersByClient);
569571
private final Map<String, Set<AgentTracker>> streamToTrackers = new ConcurrentHashMap<>();
@@ -802,33 +804,26 @@ private synchronized void register(AgentTracker tracker) {
802804
checkNotClosed();
803805
if (tracker.identifiable()) {
804806
ProducerTracker producerTracker = (ProducerTracker) tracker;
805-
// using the next available slot
806-
for (int i = 0; i < maxProducersByClient; i++) {
807-
ProducerTracker previousValue = producers.putIfAbsent((byte) i, producerTracker);
808-
if (previousValue == null) {
809-
this.checkNotClosed();
810-
int index = i;
811-
Response response =
812-
callAndMaybeRetry(
813-
() ->
814-
this.client.declarePublisher(
815-
(byte) index, tracker.reference(), tracker.stream()),
816-
RETRY_ON_TIMEOUT,
817-
"Declare publisher request for publisher %d on stream '%s'",
818-
producerTracker.uniqueId(),
819-
producerTracker.stream());
820-
if (response.isOk()) {
821-
tracker.assign((byte) i, this.client, this);
822-
} else {
823-
String message =
824-
"Error while declaring publisher: "
825-
+ formatConstant(response.getResponseCode())
826-
+ ". Could not assign producer to client.";
827-
LOGGER.info(message);
828-
throw new StreamException(message, response.getResponseCode());
829-
}
830-
break;
831-
}
807+
int index = pickSlot(this.producers, producerTracker, this.producerIndexSequence);
808+
this.checkNotClosed();
809+
Response response =
810+
callAndMaybeRetry(
811+
() ->
812+
this.client.declarePublisher(
813+
(byte) index, tracker.reference(), tracker.stream()),
814+
RETRY_ON_TIMEOUT,
815+
"Declare publisher request for publisher %d on stream '%s'",
816+
producerTracker.uniqueId(),
817+
producerTracker.stream());
818+
if (response.isOk()) {
819+
tracker.assign((byte) index, this.client, this);
820+
} else {
821+
String message =
822+
"Error while declaring publisher: "
823+
+ formatConstant(response.getResponseCode())
824+
+ ". Could not assign producer to client.";
825+
LOGGER.info(message);
826+
throw new StreamException(message, response.getResponseCode());
832827
}
833828
producers.put(tracker.id(), producerTracker);
834829
} else {
@@ -944,4 +939,14 @@ public ClientClosedException() {
944939
super("Client already closed");
945940
}
946941
}
942+
943+
static <T> int pickSlot(ConcurrentMap<Byte, T> map, T tracker, AtomicInteger sequence) {
944+
int index = -1;
945+
T previousValue = tracker;
946+
while (previousValue != null) {
947+
index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_PRODUCERS_PER_CLIENT);
948+
previousValue = map.putIfAbsent((byte) index, tracker);
949+
}
950+
return index;
951+
}
947952
}

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package com.rabbitmq.stream.impl;
1616

1717
import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
18+
import static com.rabbitmq.stream.impl.ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
19+
import static com.rabbitmq.stream.impl.ConsumersCoordinator.pickSlot;
1820
import static com.rabbitmq.stream.impl.TestUtils.b;
1921
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
2022
import static com.rabbitmq.stream.impl.TestUtils.metadata;
@@ -45,12 +47,7 @@
4547
import java.util.Collections;
4648
import java.util.List;
4749
import java.util.Map;
48-
import java.util.concurrent.ConcurrentHashMap;
49-
import java.util.concurrent.CopyOnWriteArrayList;
50-
import java.util.concurrent.CountDownLatch;
51-
import java.util.concurrent.ExecutorService;
52-
import java.util.concurrent.Executors;
53-
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.*;
5451
import java.util.concurrent.atomic.AtomicInteger;
5552
import java.util.concurrent.atomic.AtomicReference;
5653
import java.util.function.Consumer;
@@ -62,6 +59,7 @@
6259
import org.junit.jupiter.api.Test;
6360
import org.junit.jupiter.params.ParameterizedTest;
6461
import org.junit.jupiter.params.provider.MethodSource;
62+
import org.junit.jupiter.params.provider.ValueSource;
6563
import org.mockito.ArgumentCaptor;
6664
import org.mockito.Captor;
6765
import org.mockito.Mock;
@@ -1074,8 +1072,10 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio
10741072
assertThat(coordinator.managerCount()).isZero();
10751073
}
10761074

1077-
@Test
1078-
void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscriptions() {
1075+
@ParameterizedTest
1076+
@ValueSource(ints = {50, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT})
1077+
void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscriptions(
1078+
int maxConsumersByConnection) {
10791079
when(locator.metadata("stream")).thenReturn(metadata(leader(), null));
10801080

10811081
when(clientFactory.client(any())).thenReturn(client);
@@ -1089,9 +1089,16 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip
10891089
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
10901090
when(client.isOpen()).thenReturn(true);
10911091

1092-
int extraSubscriptionCount = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT / 5;
1093-
int subscriptionCount =
1094-
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + extraSubscriptionCount;
1092+
int extraSubscriptionCount = maxConsumersByConnection / 5;
1093+
int subscriptionCount = maxConsumersByConnection + extraSubscriptionCount;
1094+
1095+
coordinator =
1096+
new ConsumersCoordinator(
1097+
environment,
1098+
maxConsumersByConnection,
1099+
type -> "consumer-connection",
1100+
clientFactory,
1101+
false);
10951102

10961103
List<Runnable> closingRunnables =
10971104
IntStream.range(0, subscriptionCount)
@@ -1129,7 +1136,7 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip
11291136

11301137
verify(client, times(1)).close();
11311138

1132-
closingRunnables.forEach(closingRunnable -> closingRunnable.run());
1139+
closingRunnables.forEach(Runnable::run);
11331140

11341141
verify(client, times(2)).close();
11351142
}
@@ -1927,6 +1934,47 @@ void shouldRetryUntilReplicaIsAvailableWhenForceReplicaIsOn() throws Exception {
19271934
assertThat(messageHandlerCalls.get()).isEqualTo(2);
19281935
}
19291936

1937+
@Test
1938+
void pickSlotTest() {
1939+
List<String> list = new ArrayList<>(ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT);
1940+
IntStream.range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(ignored -> list.add(null));
1941+
AtomicInteger sequence = new AtomicInteger(0);
1942+
int index = pickSlot(list, sequence);
1943+
assertThat(index).isZero();
1944+
list.set(index, "0");
1945+
1946+
index = pickSlot(list, sequence);
1947+
assertThat(index).isEqualTo(1);
1948+
list.set(index, "1");
1949+
index = pickSlot(list, sequence);
1950+
assertThat(index).isEqualTo(2);
1951+
list.set(index, "2");
1952+
index = pickSlot(list, sequence);
1953+
assertThat(index).isEqualTo(3);
1954+
list.set(index, "3");
1955+
1956+
list.set(1, null);
1957+
index = pickSlot(list, sequence);
1958+
assertThat(index).isEqualTo(4);
1959+
list.set(index, "4");
1960+
1961+
sequence.set(MAX_SUBSCRIPTIONS_PER_CLIENT - 2);
1962+
1963+
index = pickSlot(list, sequence);
1964+
assertThat(index).isEqualTo(254);
1965+
list.set(index, "254");
1966+
index = pickSlot(list, sequence);
1967+
assertThat(index).isEqualTo(255);
1968+
list.set(index, "255");
1969+
1970+
// 0 is already taken, so we should get index 1 when we overflow
1971+
index = pickSlot(list, sequence);
1972+
assertThat(index).isEqualTo(1);
1973+
list.set(index, "256");
1974+
index = pickSlot(list, sequence);
1975+
assertThat(index).isEqualTo(5);
1976+
}
1977+
19301978
Client.Broker leader() {
19311979
return new Client.Broker("leader", -1);
19321980
}

0 commit comments

Comments
 (0)