Skip to content

Commit 5647c2e

Browse files
committed
Enforce entity by connection limit more strictly
Lock producer and consumer creation in respective coordinator. This adds some contention but should be acceptable as creations are not hot and concurrent operations. Fixes #464
1 parent 89865ee commit 5647c2e

File tree

4 files changed

+121
-70
lines changed

4 files changed

+121
-70
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+47-46
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17-
import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
18-
import static com.rabbitmq.stream.impl.Utils.formatConstant;
19-
import static com.rabbitmq.stream.impl.Utils.isSac;
20-
import static com.rabbitmq.stream.impl.Utils.jsonField;
21-
import static com.rabbitmq.stream.impl.Utils.namedFunction;
22-
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
23-
import static com.rabbitmq.stream.impl.Utils.quote;
17+
import static com.rabbitmq.stream.impl.Utils.*;
2418
import static java.lang.String.format;
2519

2620
import com.rabbitmq.stream.*;
@@ -51,6 +45,8 @@
5145
import java.util.concurrent.atomic.AtomicInteger;
5246
import java.util.concurrent.atomic.AtomicLong;
5347
import java.util.concurrent.atomic.AtomicReference;
48+
import java.util.concurrent.locks.Lock;
49+
import java.util.concurrent.locks.ReentrantLock;
5450
import java.util.function.*;
5551
import java.util.stream.Collectors;
5652
import java.util.stream.IntStream;
@@ -80,6 +76,7 @@ class ConsumersCoordinator {
8076
new DefaultExecutorServiceFactory(
8177
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
8278
private final boolean forceReplica;
79+
private final Lock coordinatorLock = new ReentrantLock();
8380

8481
ConsumersCoordinator(
8582
StreamEnvironment environment,
@@ -116,47 +113,51 @@ Runnable subscribe(
116113
MessageHandler messageHandler,
117114
Map<String, String> subscriptionProperties,
118115
ConsumerFlowStrategy flowStrategy) {
119-
List<Client.Broker> candidates = findBrokersForStream(stream, forceReplica);
120-
Client.Broker newNode = pickBroker(candidates);
121-
if (newNode == null) {
122-
throw new IllegalStateException("No available node to subscribe to");
123-
}
124-
125-
// create stream subscription to track final and changing state of this very subscription
126-
// we keep this instance when we move the subscription from a client to another one
127-
SubscriptionTracker subscriptionTracker =
128-
new SubscriptionTracker(
129-
this.trackerIdSequence.getAndIncrement(),
130-
consumer,
131-
stream,
132-
offsetSpecification,
133-
trackingReference,
134-
subscriptionListener,
135-
trackingClosingCallback,
136-
messageHandler,
137-
subscriptionProperties,
138-
flowStrategy);
116+
return lock(
117+
this.coordinatorLock,
118+
() -> {
119+
List<Client.Broker> candidates = findBrokersForStream(stream, forceReplica);
120+
Client.Broker newNode = pickBroker(candidates);
121+
if (newNode == null) {
122+
throw new IllegalStateException("No available node to subscribe to");
123+
}
139124

140-
try {
141-
addToManager(newNode, subscriptionTracker, offsetSpecification, true);
142-
} catch (ConnectionStreamException e) {
143-
// these exceptions are not public
144-
throw new StreamException(e.getMessage());
145-
}
125+
// create stream subscription to track final and changing state of this very subscription
126+
// we keep this instance when we move the subscription from a client to another one
127+
SubscriptionTracker subscriptionTracker =
128+
new SubscriptionTracker(
129+
this.trackerIdSequence.getAndIncrement(),
130+
consumer,
131+
stream,
132+
offsetSpecification,
133+
trackingReference,
134+
subscriptionListener,
135+
trackingClosingCallback,
136+
messageHandler,
137+
subscriptionProperties,
138+
flowStrategy);
139+
140+
try {
141+
addToManager(newNode, subscriptionTracker, offsetSpecification, true);
142+
} catch (ConnectionStreamException e) {
143+
// these exceptions are not public
144+
throw new StreamException(e.getMessage());
145+
}
146146

147-
if (debug) {
148-
this.trackers.add(subscriptionTracker);
149-
return () -> {
150-
try {
151-
this.trackers.remove(subscriptionTracker);
152-
} catch (Exception e) {
153-
LOGGER.debug("Error while removing subscription tracker from list");
154-
}
155-
subscriptionTracker.cancel();
156-
};
157-
} else {
158-
return subscriptionTracker::cancel;
159-
}
147+
if (debug) {
148+
this.trackers.add(subscriptionTracker);
149+
return () -> {
150+
try {
151+
this.trackers.remove(subscriptionTracker);
152+
} catch (Exception e) {
153+
LOGGER.debug("Error while removing subscription tracker from list");
154+
}
155+
subscriptionTracker.cancel();
156+
};
157+
} else {
158+
return subscriptionTracker::cancel;
159+
}
160+
});
160161
}
161162

162163
private void addToManager(

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+21-16
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17-
import static com.rabbitmq.stream.impl.Utils.callAndMaybeRetry;
18-
import static com.rabbitmq.stream.impl.Utils.formatConstant;
19-
import static com.rabbitmq.stream.impl.Utils.jsonField;
20-
import static com.rabbitmq.stream.impl.Utils.namedFunction;
21-
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
22-
import static com.rabbitmq.stream.impl.Utils.quote;
17+
import static com.rabbitmq.stream.impl.Utils.*;
2318
import static java.util.stream.Collectors.toSet;
2419

2520
import com.rabbitmq.stream.BackOffDelayPolicy;
@@ -52,6 +47,8 @@
5247
import java.util.concurrent.atomic.AtomicInteger;
5348
import java.util.concurrent.atomic.AtomicLong;
5449
import java.util.concurrent.atomic.AtomicReference;
50+
import java.util.concurrent.locks.Lock;
51+
import java.util.concurrent.locks.ReentrantLock;
5552
import java.util.function.Function;
5653
import java.util.function.Predicate;
5754
import java.util.stream.Collectors;
@@ -75,6 +72,7 @@ class ProducersCoordinator {
7572
private final ExecutorServiceFactory executorServiceFactory =
7673
new DefaultExecutorServiceFactory(
7774
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-producer-connection-");
75+
private final Lock coordinatorLock = new ReentrantLock();
7876

7977
ProducersCoordinator(
8078
StreamEnvironment environment,
@@ -94,19 +92,26 @@ private static String keyForNode(Client.Broker broker) {
9492
}
9593

9694
Runnable registerProducer(StreamProducer producer, String reference, String stream) {
97-
ProducerTracker tracker =
98-
new ProducerTracker(trackerIdSequence.getAndIncrement(), reference, stream, producer);
99-
if (debug) {
100-
this.producerTrackers.add(tracker);
101-
}
102-
return registerAgentTracker(tracker, stream);
95+
return lock(
96+
this.coordinatorLock,
97+
() -> {
98+
ProducerTracker tracker =
99+
new ProducerTracker(trackerIdSequence.getAndIncrement(), reference, stream, producer);
100+
if (debug) {
101+
this.producerTrackers.add(tracker);
102+
}
103+
return registerAgentTracker(tracker, stream);
104+
});
103105
}
104106

105107
Runnable registerTrackingConsumer(StreamConsumer consumer) {
106-
return registerAgentTracker(
107-
new TrackingConsumerTracker(
108-
trackerIdSequence.getAndIncrement(), consumer.stream(), consumer),
109-
consumer.stream());
108+
return lock(
109+
this.coordinatorLock,
110+
() ->
111+
registerAgentTracker(
112+
new TrackingConsumerTracker(
113+
trackerIdSequence.getAndIncrement(), consumer.stream(), consumer),
114+
consumer.stream()));
110115
}
111116

112117
private Runnable registerAgentTracker(AgentTracker tracker, String stream) {

src/main/java/com/rabbitmq/stream/impl/Utils.java

+10
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.atomic.AtomicBoolean;
4141
import java.util.concurrent.atomic.AtomicLong;
42+
import java.util.concurrent.locks.Lock;
4243
import java.util.function.Consumer;
4344
import java.util.function.Function;
4445
import java.util.function.LongConsumer;
@@ -662,4 +663,13 @@ boolean get() {
662663
return this.value;
663664
}
664665
}
666+
667+
static <T> T lock(Lock lock, Supplier<T> action) {
668+
lock.lock();
669+
try {
670+
return action.get();
671+
} finally {
672+
lock.unlock();
673+
}
674+
}
665675
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+43-8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
1718
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
1819
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1920
import static com.rabbitmq.stream.impl.TestUtils.localhost;
@@ -69,9 +70,7 @@
6970
import java.util.Random;
7071
import java.util.Set;
7172
import java.util.UUID;
72-
import java.util.concurrent.ConcurrentHashMap;
73-
import java.util.concurrent.CopyOnWriteArrayList;
74-
import java.util.concurrent.CountDownLatch;
73+
import java.util.concurrent.*;
7574
import java.util.concurrent.atomic.AtomicBoolean;
7675
import java.util.concurrent.atomic.AtomicInteger;
7776
import java.util.concurrent.atomic.AtomicLong;
@@ -82,11 +81,7 @@
8281
import javax.net.ssl.SNIHostName;
8382
import javax.net.ssl.SSLParameters;
8483
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
85-
import org.junit.jupiter.api.AfterAll;
86-
import org.junit.jupiter.api.BeforeAll;
87-
import org.junit.jupiter.api.BeforeEach;
88-
import org.junit.jupiter.api.Test;
89-
import org.junit.jupiter.api.TestInfo;
84+
import org.junit.jupiter.api.*;
9085
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
9186
import org.junit.jupiter.api.condition.EnabledOnOs;
9287
import org.junit.jupiter.api.condition.OS;
@@ -778,4 +773,44 @@ void nativeEpollWorksOnLinux() {
778773
epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS);
779774
}
780775
}
776+
777+
@Test
778+
void enforceEntityPerConnectionLimits() {
779+
int entityCount = 10;
780+
int limit = 3;
781+
ExecutorService executor = Executors.newCachedThreadPool();
782+
try (Environment env =
783+
environmentBuilder
784+
.maxProducersByConnection(limit)
785+
.maxConsumersByConnection(limit)
786+
.maxTrackingConsumersByConnection(limit)
787+
.build()) {
788+
CountDownLatch latch = new CountDownLatch(entityCount * 2);
789+
IntStream.range(0, entityCount)
790+
.forEach(
791+
i -> {
792+
executor.execute(
793+
() -> {
794+
env.producerBuilder().stream(stream).name(String.valueOf(i)).build();
795+
latch.countDown();
796+
});
797+
});
798+
IntStream.range(0, entityCount)
799+
.forEach(
800+
i -> {
801+
executor.execute(
802+
() -> {
803+
env.consumerBuilder().stream(stream).messageHandler((ctx, msg) -> {}).build();
804+
latch.countDown();
805+
});
806+
});
807+
assertThat(latch).is(completed());
808+
EnvironmentInfo envInfo = MonitoringTestUtils.extract(env);
809+
int expectedConnectionCount = entityCount / limit + 1;
810+
assertThat(envInfo.getProducers().clientCount()).isEqualTo(expectedConnectionCount);
811+
assertThat(envInfo.getConsumers().clients()).hasSize(expectedConnectionCount);
812+
} finally {
813+
executor.shutdownNow();
814+
}
815+
}
781816
}

0 commit comments

Comments
 (0)