diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 21c370e804..d9061799eb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -55,6 +55,7 @@ import com.rabbitmq.stream.ChunkChecksum; import com.rabbitmq.stream.Codec; import com.rabbitmq.stream.Codec.EncodedMessage; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.MessageBuilder; @@ -68,6 +69,7 @@ import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason; import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler; import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo; +import com.rabbitmq.stream.impl.Utils.NamedThreadFactory; import com.rabbitmq.stream.metrics.MetricsCollector; import com.rabbitmq.stream.metrics.NoOpMetricsCollector; import com.rabbitmq.stream.sasl.CredentialsProvider; @@ -179,6 +181,7 @@ public class Client implements AutoCloseable { new ConcurrentHashMap<>(); final List subscriptionOffsets = new CopyOnWriteArrayList<>(); final ExecutorService executorService; + final ExecutorService dispatchingExecutorService; final TuneState tuneState; final AtomicBoolean closing = new AtomicBoolean(false); final ChunkChecksum chunkChecksum; @@ -348,8 +351,37 @@ public void initChannel(SocketChannel ch) { this.channel = f.channel(); this.nettyClosing = Utils.makeIdempotent(this::closeNetty); - this.executorService = Executors.newSingleThreadExecutor(); - this.executorServiceClosing = Utils.makeIdempotent(this.executorService::shutdownNow); + ExecutorServiceFactory executorServiceFactory = parameters.executorServiceFactory; + if (executorServiceFactory == null) { + this.executorService = + Executors.newSingleThreadExecutor(new NamedThreadFactory(clientConnectionName + "-")); + } else { + this.executorService = executorServiceFactory.get(); + } + ExecutorServiceFactory dispatchingExecutorServiceFactory = + parameters.dispatchingExecutorServiceFactory; + if (dispatchingExecutorServiceFactory == null) { + this.dispatchingExecutorService = + Executors.newSingleThreadExecutor( + new NamedThreadFactory("dispatching-" + clientConnectionName + "-")); + } else { + this.dispatchingExecutorService = dispatchingExecutorServiceFactory.get(); + } + this.executorServiceClosing = + Utils.makeIdempotent( + () -> { + this.dispatchingExecutorService.shutdownNow(); + if (dispatchingExecutorServiceFactory == null) { + this.dispatchingExecutorService.shutdownNow(); + } else { + dispatchingExecutorServiceFactory.clientClosed(this.dispatchingExecutorService); + } + if (executorServiceFactory == null) { + this.executorService.shutdownNow(); + } else { + executorServiceFactory.clientClosed(this.executorService); + } + }); try { this.tuneState = new TuneState( @@ -2204,6 +2236,10 @@ public static class ClientParameters { private Duration rpcTimeout; private Consumer channelCustomizer = noOpConsumer(); private Consumer bootstrapCustomizer = noOpConsumer(); + // for messages + private ExecutorServiceFactory dispatchingExecutorServiceFactory; + // for other server frames + private ExecutorServiceFactory executorServiceFactory; public ClientParameters host(String host) { this.host = host; @@ -2363,6 +2399,17 @@ public ClientParameters rpcTimeout(Duration rpcTimeout) { return this; } + public ClientParameters dispatchingExecutorServiceFactory( + ExecutorServiceFactory dispatchingExecutorServiceFactory) { + this.dispatchingExecutorServiceFactory = dispatchingExecutorServiceFactory; + return this; + } + + public ClientParameters executorServiceFactory(ExecutorServiceFactory executorServiceFactory) { + this.executorServiceFactory = executorServiceFactory; + return this; + } + String host() { return this.host; } @@ -2585,7 +2632,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } if (task != null) { - executorService.submit(task); + if (commandId == Constants.COMMAND_DELIVER) { + dispatchingExecutorService.submit(task); + } else { + executorService.submit(task); + } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java b/src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java new file mode 100644 index 0000000000..9eb974d999 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java @@ -0,0 +1,201 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.impl.Utils.NamedThreadFactory; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DefaultExecutorServiceFactory implements ExecutorServiceFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultExecutorServiceFactory.class); + private static final Comparator EXECUTOR_COMPARATOR = + Comparator.comparingInt(Executor::usage); + + private final List executors; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ThreadFactory threadFactory; + private final int minSize; + private final int clientPerExecutor; + private final Supplier executorFactory; + + DefaultExecutorServiceFactory() { + this(Runtime.getRuntime().availableProcessors(), 10); + } + + DefaultExecutorServiceFactory(int minSize, int clientPerExecutor) { + this.minSize = minSize; + this.clientPerExecutor = clientPerExecutor; + this.threadFactory = new NamedThreadFactory("rabbitmq-stream-connection-"); + this.executorFactory = () -> newExecutor(); + List l = new ArrayList<>(Runtime.getRuntime().availableProcessors()); + IntStream.range(0, Runtime.getRuntime().availableProcessors()) + .forEach(ignored -> l.add(this.executorFactory.get())); + executors = new CopyOnWriteArrayList<>(l); + } + + static void maybeResize( + List current, int min, int clientsPerResource, Supplier factory) { + LOGGER.debug( + "Resizing {}, with min = {}, clients per resource = {}", current, min, clientsPerResource); + int clientCount = 0; + for (Executor executor : current) { + clientCount += executor.usage(); + } + LOGGER.debug("Total usage is {}", clientCount); + + int target = Math.max((clientCount / clientsPerResource) + 1, min); + LOGGER.debug("Target size is {}, current size is {}", target, current.size()); + if (target > current.size()) { + LOGGER.debug("Upsizing..."); + List l = new ArrayList<>(); + for (int i = 0; i < target; i++) { + if (i < current.size()) { + l.add(current.get(i)); + } else { + l.add(factory.get()); + } + } + current.clear(); + current.addAll(l); + LOGGER.debug("New list is {}", current); + } else if (target < current.size()) { + LOGGER.debug("Downsizing..."); + boolean hasUnusedExecutors = current.stream().filter(ex -> ex.usage() == 0).count() > 0; + if (!hasUnusedExecutors) { + LOGGER.debug("No downsizing, there is no unused executor"); + } + if (hasUnusedExecutors) { + List l = new ArrayList<>(target); + for (int i = 0; i < current.size(); i++) { + Executor executor = current.get(i); + if (executor.usage() == 0) { + executor.close(); + } else { + l.add(executor); + } + } + if (l.size() < target) { + for (int i = l.size(); i < target; i++) { + l.add(factory.get()); + } + } + current.clear(); + current.addAll(l); + LOGGER.debug("New list is {}", current); + } + } + } + + private Executor newExecutor() { + return new Executor(Executors.newSingleThreadExecutor(threadFactory)); + } + + @Override + public synchronized ExecutorService get() { + if (closed.get()) { + throw new IllegalStateException("Executor service factory is closed"); + } else { + maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory); + LOGGER.debug("Looking least used executor in {}", this.executors); + Executor executor = this.executors.stream().min(EXECUTOR_COMPARATOR).get(); + LOGGER.debug("Least used executor is {}", executor); + executor.incrementUsage(); + return executor.executorService; + } + } + + @Override + public synchronized void clientClosed(ExecutorService executorService) { + if (!closed.get()) { + Executor executor = find(executorService); + if (executor == null) { + LOGGER.info("Could not find executor service wrapper"); + } else { + executor.decrementUsage(); + maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory); + } + } + } + + private Executor find(ExecutorService executorService) { + for (Executor executor : this.executors) { + if (executor.executorService.equals(executorService)) { + return executor; + } + } + return null; + } + + @Override + public synchronized void close() { + if (closed.compareAndSet(false, true)) { + this.executors.forEach(executor -> executor.executorService.shutdownNow()); + } + } + + static class Executor { + + private final ExecutorService executorService; + private AtomicInteger usage = new AtomicInteger(0); + + Executor(ExecutorService executorService) { + this.executorService = executorService; + } + + Executor incrementUsage() { + this.usage.incrementAndGet(); + return this; + } + + Executor decrementUsage() { + this.usage.decrementAndGet(); + return this; + } + + Executor addUsage(int delta) { + this.usage.addAndGet(delta); + return this; + } + + Executor substractUsage(int delta) { + this.usage.addAndGet(-delta); + return this; + } + + private int usage() { + return this.usage.get(); + } + + private void close() { + this.executorService.shutdownNow(); + } + + @Override + public String toString() { + return "Executor{" + "usage=" + usage.get() + '}'; + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java b/src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java new file mode 100644 index 0000000000..436669a639 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java @@ -0,0 +1,26 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import java.util.concurrent.ExecutorService; + +interface ExecutorServiceFactory extends AutoCloseable { + + ExecutorService get(); + + void clientClosed(ExecutorService executorService); + + @Override + void close(); +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 1c83c6c1b4..43994a93aa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -36,6 +36,7 @@ import com.rabbitmq.stream.impl.Utils.ClientConnectionType; import com.rabbitmq.stream.impl.Utils.ClientFactory; import com.rabbitmq.stream.impl.Utils.ClientFactoryContext; +import com.rabbitmq.stream.impl.Utils.NamedThreadFactory; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -47,6 +48,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -70,6 +73,7 @@ class ProducersCoordinator { private final AtomicLong trackerIdSequence = new AtomicLong(0); private final boolean debug = false; private final List producerTrackers = new CopyOnWriteArrayList<>(); + private final ExecutorServiceFactory executorServiceFactory; ProducersCoordinator( StreamEnvironment environment, @@ -82,6 +86,27 @@ class ProducersCoordinator { this.maxProducersByClient = maxProducersByClient; this.maxTrackingConsumersByClient = maxTrackingConsumersByClient; this.connectionNamingStrategy = connectionNamingStrategy; + // use the same single-threaded executor for all client connections + // it's meant for message dispatching, so it should not be used + this.executorServiceFactory = + new ExecutorServiceFactory() { + private final ExecutorService executorService = + Executors.newSingleThreadExecutor( + new NamedThreadFactory("rabbitmq-stream-producers-coordinator-dispatcher-")); + + @Override + public ExecutorService get() { + return executorService; + } + + @Override + public void clientClosed(ExecutorService executorService) {} + + @Override + public void close() { + executorService.shutdownNow(); + } + }; } private static String keyForNode(Client.Broker broker) { @@ -127,7 +152,11 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) { private void addToManager(Broker node, AgentTracker tracker) { ClientParameters clientParameters = - environment.clientParametersCopy().host(node.getHost()).port(node.getPort()); + environment + .clientParametersCopy() + .host(node.getHost()) + .port(node.getPort()) + .dispatchingExecutorServiceFactory(this.executorServiceFactory); ClientProducersManager pickedManager = null; while (pickedManager == null) { Iterator iterator = this.managers.iterator(); @@ -230,6 +259,11 @@ void close() { e.getMessage()); } } + try { + this.executorServiceFactory.close(); + } catch (Exception e) { + LOGGER.info("Error while closing executor service factory: {}", e.getMessage()); + } } int clientCount() { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 427009d575..8a916ca844 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -99,6 +99,7 @@ class StreamEnvironment implements Environment { private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false); private final Runnable locatorInitializationSequence; private final List locators = new CopyOnWriteArrayList<>(); + private final ExecutorServiceFactory executorServiceFactory = new DefaultExecutorServiceFactory(); StreamEnvironment( ScheduledExecutorService scheduledExecutorService, @@ -149,7 +150,10 @@ class StreamEnvironment implements Environment { this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy; this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy; this.byteBufAllocator = byteBufAllocator; - clientParametersPrototype.byteBufAllocator(byteBufAllocator); + clientParametersPrototype = + clientParametersPrototype + .byteBufAllocator(byteBufAllocator) + .executorServiceFactory(this.executorServiceFactory); clientParametersPrototype = maybeSetUpClientParametersFromUris(uris, clientParametersPrototype); this.addressResolver = addressResolver; @@ -601,6 +605,12 @@ public void close() { } } + try { + this.executorServiceFactory.close(); + } catch (Exception e) { + LOGGER.info("Error while closing executor service factory: {}", e.getMessage()); + } + this.clockRefreshFuture.cancel(false); if (privateScheduleExecutorService) { this.scheduledExecutorService.shutdownNow(); diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientFlowControlTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientFlowControlTest.java new file mode 100644 index 0000000000..c8ee1500e6 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/ClientFlowControlTest.java @@ -0,0 +1,79 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok; +import static com.rabbitmq.stream.impl.TestUtils.b; +import static com.rabbitmq.stream.impl.TestUtils.latchAssert; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.impl.Client.ClientParameters; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class ClientFlowControlTest { + + String stream; + TestUtils.ClientFactory cf; + + @Test + void longProcessingShouldNotBlockOtherServerFrames(TestInfo info) { + int messageCount = 100_000; + AtomicInteger processedCount = new AtomicInteger(0); + TestUtils.publishAndWaitForConfirms(cf, messageCount, stream); + CountDownLatch metadataLatch = new CountDownLatch(1); + AtomicReference deletedStream = new AtomicReference<>(); + Client consumerClient = + cf.get( + new ClientParameters() + .chunkListener( + (client, subscriptionId, offset, msgCount, dataSize) -> + client.credit(subscriptionId, 1)) + .messageListener( + (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + processedCount.incrementAndGet(); + }) + .metadataListener( + (stream, code) -> { + deletedStream.set(stream); + metadataLatch.countDown(); + })); + + String toBeDeletedStream = TestUtils.streamName(info); + assertThat(consumerClient.create(toBeDeletedStream)).is(ok()); + assertThat( + consumerClient.subscribe( + TestUtils.b(0), toBeDeletedStream, OffsetSpecification.first(), 1)) + .is(ok()); + assertThat(consumerClient.subscribe(TestUtils.b(1), stream, OffsetSpecification.first(), 1)) + .is(ok()); + + Client deletionClient = cf.get(); + assertThat(deletionClient.delete(toBeDeletedStream)).is(ok()); + assertThat(latchAssert(metadataLatch)).completes(); + assertThat(deletedStream).hasValue(toBeDeletedStream); + assertThat(consumerClient.unsubscribe(b(1))).is(ok()); + assertThat(processedCount).hasValueLessThan(messageCount); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 1a2d886fd1..3e1c383912 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -143,6 +143,7 @@ public Client.ClientParameters shutdownListener( return super.shutdownListener(shutdownListener); } }; + clientParameters.executorServiceFactory(new DefaultExecutorServiceFactory()); mocks = MockitoAnnotations.openMocks(this); when(environment.locator()).thenReturn(locator); when(environment.locatorOperation(any())).thenCallRealMethod(); diff --git a/src/test/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactoryTest.java b/src/test/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactoryTest.java new file mode 100644 index 0000000000..e97dc401b4 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactoryTest.java @@ -0,0 +1,126 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.DefaultExecutorServiceFactory.maybeResize; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.rabbitmq.stream.impl.DefaultExecutorServiceFactory.Executor; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class DefaultExecutorServiceFactoryTest { + + int minSize = 4; + int clientsPerExecutor = 10; + + @Mock Supplier factory; + @Mock ExecutorService executorService; + AutoCloseable mocks; + List executors; + + @BeforeEach + void init() { + mocks = MockitoAnnotations.openMocks(this); + when(factory.get()).thenReturn(new Executor(executorService)); + } + + @AfterEach + void tearDown() throws Exception { + mocks.close(); + } + + @Test + void resizeShouldDoNothingWhenClientsPerExecutorIsOkForAll() { + executors = + executors( + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(4); + verify(factory, never()).get(); + verify(executorService, never()).shutdownNow(); + } + + @Test + void resizeShouldIncreaseIfMoreClientsPerExecutorForOneExecutor() { + executors = + executors( + executor().addUsage(clientsPerExecutor), + executor().addUsage(clientsPerExecutor), + executor().addUsage(clientsPerExecutor + 1), + executor().addUsage(clientsPerExecutor)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(5); + verify(factory, times(1)).get(); + verify(executorService, never()).shutdownNow(); + } + + @RepeatedTest(10) + void resizeShouldDecreaseIfFewerClientsPerExecutorAndUnusedExecutor() { + // repeated because the initial list is shuffled + executors = + executors( + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5), + executor(), + executor().addUsage(5)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(4); + verify(factory, never()).get(); + verify(executorService, times(1)).shutdownNow(); + } + + @Test + void resizeShouldNotResizeIfNoUnusedExecutor() { + executors = + executors( + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(1), + executor().addUsage(5)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(5); + verify(factory, never()).get(); + verify(executorService, never()).shutdownNow(); + } + + private Executor executor() { + return new Executor(executorService); + } + + private List executors(Executor... executors) { + List l = asList(executors); + Collections.shuffle(l); + return new CopyOnWriteArrayList<>(l); + } +}