Skip to content

Commit a4bd7da

Browse files
authored
Merge pull request #486 from rabbitmq/issue-464-enforce-per-connection-limits
Enforce entity by connection limit more strictly
2 parents 89865ee + 5647c2e commit a4bd7da

File tree

4 files changed

+121
-70
lines changed

4 files changed

+121
-70
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+47-46
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17-
import static com.rabbitmq.stream.impl.Utils.convertCodeToException;
18-
import static com.rabbitmq.stream.impl.Utils.formatConstant;
19-
import static com.rabbitmq.stream.impl.Utils.isSac;
20-
import static com.rabbitmq.stream.impl.Utils.jsonField;
21-
import static com.rabbitmq.stream.impl.Utils.namedFunction;
22-
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
23-
import static com.rabbitmq.stream.impl.Utils.quote;
17+
import static com.rabbitmq.stream.impl.Utils.*;
2418
import static java.lang.String.format;
2519

2620
import com.rabbitmq.stream.*;
@@ -51,6 +45,8 @@
5145
import java.util.concurrent.atomic.AtomicInteger;
5246
import java.util.concurrent.atomic.AtomicLong;
5347
import java.util.concurrent.atomic.AtomicReference;
48+
import java.util.concurrent.locks.Lock;
49+
import java.util.concurrent.locks.ReentrantLock;
5450
import java.util.function.*;
5551
import java.util.stream.Collectors;
5652
import java.util.stream.IntStream;
@@ -80,6 +76,7 @@ class ConsumersCoordinator {
8076
new DefaultExecutorServiceFactory(
8177
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
8278
private final boolean forceReplica;
79+
private final Lock coordinatorLock = new ReentrantLock();
8380

8481
ConsumersCoordinator(
8582
StreamEnvironment environment,
@@ -116,47 +113,51 @@ Runnable subscribe(
116113
MessageHandler messageHandler,
117114
Map<String, String> subscriptionProperties,
118115
ConsumerFlowStrategy flowStrategy) {
119-
List<Client.Broker> candidates = findBrokersForStream(stream, forceReplica);
120-
Client.Broker newNode = pickBroker(candidates);
121-
if (newNode == null) {
122-
throw new IllegalStateException("No available node to subscribe to");
123-
}
124-
125-
// create stream subscription to track final and changing state of this very subscription
126-
// we keep this instance when we move the subscription from a client to another one
127-
SubscriptionTracker subscriptionTracker =
128-
new SubscriptionTracker(
129-
this.trackerIdSequence.getAndIncrement(),
130-
consumer,
131-
stream,
132-
offsetSpecification,
133-
trackingReference,
134-
subscriptionListener,
135-
trackingClosingCallback,
136-
messageHandler,
137-
subscriptionProperties,
138-
flowStrategy);
116+
return lock(
117+
this.coordinatorLock,
118+
() -> {
119+
List<Client.Broker> candidates = findBrokersForStream(stream, forceReplica);
120+
Client.Broker newNode = pickBroker(candidates);
121+
if (newNode == null) {
122+
throw new IllegalStateException("No available node to subscribe to");
123+
}
139124

140-
try {
141-
addToManager(newNode, subscriptionTracker, offsetSpecification, true);
142-
} catch (ConnectionStreamException e) {
143-
// these exceptions are not public
144-
throw new StreamException(e.getMessage());
145-
}
125+
// create stream subscription to track final and changing state of this very subscription
126+
// we keep this instance when we move the subscription from a client to another one
127+
SubscriptionTracker subscriptionTracker =
128+
new SubscriptionTracker(
129+
this.trackerIdSequence.getAndIncrement(),
130+
consumer,
131+
stream,
132+
offsetSpecification,
133+
trackingReference,
134+
subscriptionListener,
135+
trackingClosingCallback,
136+
messageHandler,
137+
subscriptionProperties,
138+
flowStrategy);
139+
140+
try {
141+
addToManager(newNode, subscriptionTracker, offsetSpecification, true);
142+
} catch (ConnectionStreamException e) {
143+
// these exceptions are not public
144+
throw new StreamException(e.getMessage());
145+
}
146146

147-
if (debug) {
148-
this.trackers.add(subscriptionTracker);
149-
return () -> {
150-
try {
151-
this.trackers.remove(subscriptionTracker);
152-
} catch (Exception e) {
153-
LOGGER.debug("Error while removing subscription tracker from list");
154-
}
155-
subscriptionTracker.cancel();
156-
};
157-
} else {
158-
return subscriptionTracker::cancel;
159-
}
147+
if (debug) {
148+
this.trackers.add(subscriptionTracker);
149+
return () -> {
150+
try {
151+
this.trackers.remove(subscriptionTracker);
152+
} catch (Exception e) {
153+
LOGGER.debug("Error while removing subscription tracker from list");
154+
}
155+
subscriptionTracker.cancel();
156+
};
157+
} else {
158+
return subscriptionTracker::cancel;
159+
}
160+
});
160161
}
161162

162163
private void addToManager(

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+21-16
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17-
import static com.rabbitmq.stream.impl.Utils.callAndMaybeRetry;
18-
import static com.rabbitmq.stream.impl.Utils.formatConstant;
19-
import static com.rabbitmq.stream.impl.Utils.jsonField;
20-
import static com.rabbitmq.stream.impl.Utils.namedFunction;
21-
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
22-
import static com.rabbitmq.stream.impl.Utils.quote;
17+
import static com.rabbitmq.stream.impl.Utils.*;
2318
import static java.util.stream.Collectors.toSet;
2419

2520
import com.rabbitmq.stream.BackOffDelayPolicy;
@@ -52,6 +47,8 @@
5247
import java.util.concurrent.atomic.AtomicInteger;
5348
import java.util.concurrent.atomic.AtomicLong;
5449
import java.util.concurrent.atomic.AtomicReference;
50+
import java.util.concurrent.locks.Lock;
51+
import java.util.concurrent.locks.ReentrantLock;
5552
import java.util.function.Function;
5653
import java.util.function.Predicate;
5754
import java.util.stream.Collectors;
@@ -75,6 +72,7 @@ class ProducersCoordinator {
7572
private final ExecutorServiceFactory executorServiceFactory =
7673
new DefaultExecutorServiceFactory(
7774
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-producer-connection-");
75+
private final Lock coordinatorLock = new ReentrantLock();
7876

7977
ProducersCoordinator(
8078
StreamEnvironment environment,
@@ -94,19 +92,26 @@ private static String keyForNode(Client.Broker broker) {
9492
}
9593

9694
Runnable registerProducer(StreamProducer producer, String reference, String stream) {
97-
ProducerTracker tracker =
98-
new ProducerTracker(trackerIdSequence.getAndIncrement(), reference, stream, producer);
99-
if (debug) {
100-
this.producerTrackers.add(tracker);
101-
}
102-
return registerAgentTracker(tracker, stream);
95+
return lock(
96+
this.coordinatorLock,
97+
() -> {
98+
ProducerTracker tracker =
99+
new ProducerTracker(trackerIdSequence.getAndIncrement(), reference, stream, producer);
100+
if (debug) {
101+
this.producerTrackers.add(tracker);
102+
}
103+
return registerAgentTracker(tracker, stream);
104+
});
103105
}
104106

105107
Runnable registerTrackingConsumer(StreamConsumer consumer) {
106-
return registerAgentTracker(
107-
new TrackingConsumerTracker(
108-
trackerIdSequence.getAndIncrement(), consumer.stream(), consumer),
109-
consumer.stream());
108+
return lock(
109+
this.coordinatorLock,
110+
() ->
111+
registerAgentTracker(
112+
new TrackingConsumerTracker(
113+
trackerIdSequence.getAndIncrement(), consumer.stream(), consumer),
114+
consumer.stream()));
110115
}
111116

112117
private Runnable registerAgentTracker(AgentTracker tracker, String stream) {

src/main/java/com/rabbitmq/stream/impl/Utils.java

+10
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.atomic.AtomicBoolean;
4141
import java.util.concurrent.atomic.AtomicLong;
42+
import java.util.concurrent.locks.Lock;
4243
import java.util.function.Consumer;
4344
import java.util.function.Function;
4445
import java.util.function.LongConsumer;
@@ -662,4 +663,13 @@ boolean get() {
662663
return this.value;
663664
}
664665
}
666+
667+
static <T> T lock(Lock lock, Supplier<T> action) {
668+
lock.lock();
669+
try {
670+
return action.get();
671+
} finally {
672+
lock.unlock();
673+
}
674+
}
665675
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+43-8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
1718
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
1819
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1920
import static com.rabbitmq.stream.impl.TestUtils.localhost;
@@ -69,9 +70,7 @@
6970
import java.util.Random;
7071
import java.util.Set;
7172
import java.util.UUID;
72-
import java.util.concurrent.ConcurrentHashMap;
73-
import java.util.concurrent.CopyOnWriteArrayList;
74-
import java.util.concurrent.CountDownLatch;
73+
import java.util.concurrent.*;
7574
import java.util.concurrent.atomic.AtomicBoolean;
7675
import java.util.concurrent.atomic.AtomicInteger;
7776
import java.util.concurrent.atomic.AtomicLong;
@@ -82,11 +81,7 @@
8281
import javax.net.ssl.SNIHostName;
8382
import javax.net.ssl.SSLParameters;
8483
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
85-
import org.junit.jupiter.api.AfterAll;
86-
import org.junit.jupiter.api.BeforeAll;
87-
import org.junit.jupiter.api.BeforeEach;
88-
import org.junit.jupiter.api.Test;
89-
import org.junit.jupiter.api.TestInfo;
84+
import org.junit.jupiter.api.*;
9085
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
9186
import org.junit.jupiter.api.condition.EnabledOnOs;
9287
import org.junit.jupiter.api.condition.OS;
@@ -778,4 +773,44 @@ void nativeEpollWorksOnLinux() {
778773
epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS);
779774
}
780775
}
776+
777+
@Test
778+
void enforceEntityPerConnectionLimits() {
779+
int entityCount = 10;
780+
int limit = 3;
781+
ExecutorService executor = Executors.newCachedThreadPool();
782+
try (Environment env =
783+
environmentBuilder
784+
.maxProducersByConnection(limit)
785+
.maxConsumersByConnection(limit)
786+
.maxTrackingConsumersByConnection(limit)
787+
.build()) {
788+
CountDownLatch latch = new CountDownLatch(entityCount * 2);
789+
IntStream.range(0, entityCount)
790+
.forEach(
791+
i -> {
792+
executor.execute(
793+
() -> {
794+
env.producerBuilder().stream(stream).name(String.valueOf(i)).build();
795+
latch.countDown();
796+
});
797+
});
798+
IntStream.range(0, entityCount)
799+
.forEach(
800+
i -> {
801+
executor.execute(
802+
() -> {
803+
env.consumerBuilder().stream(stream).messageHandler((ctx, msg) -> {}).build();
804+
latch.countDown();
805+
});
806+
});
807+
assertThat(latch).is(completed());
808+
EnvironmentInfo envInfo = MonitoringTestUtils.extract(env);
809+
int expectedConnectionCount = entityCount / limit + 1;
810+
assertThat(envInfo.getProducers().clientCount()).isEqualTo(expectedConnectionCount);
811+
assertThat(envInfo.getConsumers().clients()).hasSize(expectedConnectionCount);
812+
} finally {
813+
executor.shutdownNow();
814+
}
815+
}
781816
}

0 commit comments

Comments
 (0)