Skip to content

Commit 7146b0b

Browse files
committed
Use number of available processors for stream count in test
Instead of 10. This decreases load for low-resource environments, like CI (usually ~ 4 available processors). The VM has to run the test suite itself, but also the 3-node cluster.
1 parent 6958880 commit 7146b0b

File tree

6 files changed

+14
-10
lines changed

6 files changed

+14
-10
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ final class ConsumersCoordinator implements AutoCloseable {
7575
private final List<SubscriptionTracker> trackers = new CopyOnWriteArrayList<>();
7676
private final ExecutorServiceFactory executorServiceFactory =
7777
new DefaultExecutorServiceFactory(
78-
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-consumer-connection-");
78+
AVAILABLE_PROCESSORS, 10, "rabbitmq-stream-consumer-connection-");
7979
private final boolean forceReplica;
8080
private final Lock coordinatorLock = new ReentrantLock();
8181

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ final class ProducersCoordinator implements AutoCloseable {
6868
private final List<ProducerTracker> producerTrackers = new CopyOnWriteArrayList<>();
6969
private final ExecutorServiceFactory executorServiceFactory =
7070
new DefaultExecutorServiceFactory(
71-
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-producer-connection-");
71+
AVAILABLE_PROCESSORS, 10, "rabbitmq-stream-producer-connection-");
7272
private final Lock coordinatorLock = new ReentrantLock();
7373
private final boolean forceLeader;
7474

@@ -750,7 +750,10 @@ private void assignProducersToNewManagers(
750750
List<BrokerWrapper> candidates = brokerAndCandidates.v2();
751751
String key = keyForNode(broker);
752752
LOGGER.debug(
753-
"Assigning {} producer(s) and consumer tracker(s) to {} (stream '{}')", trackers.size(), key, stream);
753+
"Assigning {} producer(s) and consumer tracker(s) to {} (stream '{}')",
754+
trackers.size(),
755+
key,
756+
stream);
754757
trackers.forEach(tracker -> maybeRecoverAgent(broker, candidates, tracker));
755758
})
756759
.exceptionally(

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ class StreamEnvironment implements Environment {
224224
}
225225
ScheduledExecutorService executorService;
226226
if (scheduledExecutorService == null) {
227-
int threads = Runtime.getRuntime().availableProcessors();
227+
int threads = AVAILABLE_PROCESSORS;
228228
LOGGER.debug("Creating scheduled executor service with {} thread(s)", threads);
229229
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-scheduler-");
230230
executorService = Executors.newScheduledThreadPool(threads, threadFactory);

src/main/java/com/rabbitmq/stream/impl/Utils.java

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141

4242
final class Utils {
4343

44+
static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
45+
4446
@SuppressWarnings("rawtypes")
4547
private static final Consumer NO_OP_CONSUMER = o -> {};
4648

src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.common.collect.Streams;
2929
import com.google.common.util.concurrent.RateLimiter;
3030
import com.rabbitmq.stream.*;
31+
import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster;
3132
import com.rabbitmq.stream.impl.TestUtils.Sync;
3233
import com.rabbitmq.stream.impl.Tuples.Pair;
3334
import io.netty.channel.ChannelOption;
@@ -52,7 +53,7 @@
5253
import org.slf4j.Logger;
5354
import org.slf4j.LoggerFactory;
5455

55-
@TestUtils.DisabledIfNotCluster
56+
@DisabledIfNotCluster
5657
@StreamTestInfrastructure
5758
public class RecoveryClusterTest {
5859

@@ -87,7 +88,7 @@ static void initAll() {
8788

8889
@BeforeEach
8990
void init(TestInfo info) {
90-
int availableProcessors = Runtime.getRuntime().availableProcessors();
91+
int availableProcessors = Utils.AVAILABLE_PROCESSORS;
9192
LOGGER.info("Available processors: {}", availableProcessors);
9293
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-scheduler-");
9394
scheduledExecutorService = Executors.newScheduledThreadPool(availableProcessors, threadFactory);
@@ -134,7 +135,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
134135
"Cluster restart test, use load balancer {}, force leader {}",
135136
useLoadBalancer,
136137
forceLeader);
137-
int streamCount = 10;
138+
int streamCount = Utils.AVAILABLE_PROCESSORS;
138139
int producerCount = streamCount * 2;
139140
int consumerCount = streamCount * 2;
140141

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static com.rabbitmq.stream.ConsumerFlowStrategy.creditWhenHalfMessagesProcessed;
1818
import static com.rabbitmq.stream.impl.TestUtils.*;
1919
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
20-
import static java.lang.Runtime.getRuntime;
2120
import static java.lang.String.format;
2221
import static java.util.Collections.synchronizedList;
2322
import static org.assertj.core.api.Assertions.*;
@@ -243,8 +242,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
243242
void asynchronousProcessingWithFlowControl() {
244243
int messageCount = 100_000;
245244
publishAndWaitForConfirms(cf, messageCount, stream);
246-
ExecutorService executorService =
247-
Executors.newFixedThreadPool(getRuntime().availableProcessors());
245+
ExecutorService executorService = Executors.newFixedThreadPool(Utils.AVAILABLE_PROCESSORS);
248246
try {
249247
CountDownLatch latch = new CountDownLatch(messageCount);
250248
environment.consumerBuilder().stream(stream)

0 commit comments

Comments
 (0)