From 22e1e532ddd71fdceccb8a2aa063800740da93a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 7 Mar 2023 15:16:13 +0100 Subject: [PATCH] Dispatch messages in dedicated thread Each connection has now a dedicated single-threaded executor to dispatch messages. This is especially suited for long consumers, as they could block the handling of other frames sent by the server. Other server frames are now handled by a shared executor service. The default implementation maintains a list of executor services shared between all connections maintained by the environment instances. The list grows and shrinks depending on the usage. --- .../java/com/rabbitmq/stream/impl/Client.java | 57 ++++- .../impl/DefaultExecutorServiceFactory.java | 201 ++++++++++++++++++ .../stream/impl/ExecutorServiceFactory.java | 26 +++ .../stream/impl/ProducersCoordinator.java | 36 +++- .../stream/impl/StreamEnvironment.java | 12 +- .../stream/impl/ClientFlowControlTest.java | 79 +++++++ .../stream/impl/ConsumersCoordinatorTest.java | 1 + .../DefaultExecutorServiceFactoryTest.java | 126 +++++++++++ 8 files changed, 533 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java create mode 100644 src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java create mode 100644 src/test/java/com/rabbitmq/stream/impl/ClientFlowControlTest.java create mode 100644 src/test/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactoryTest.java diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 21c370e804..d9061799eb 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -55,6 +55,7 @@ import com.rabbitmq.stream.ChunkChecksum; import com.rabbitmq.stream.Codec; import com.rabbitmq.stream.Codec.EncodedMessage; +import com.rabbitmq.stream.Constants; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; import com.rabbitmq.stream.MessageBuilder; @@ -68,6 +69,7 @@ import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason; import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler; import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo; +import com.rabbitmq.stream.impl.Utils.NamedThreadFactory; import com.rabbitmq.stream.metrics.MetricsCollector; import com.rabbitmq.stream.metrics.NoOpMetricsCollector; import com.rabbitmq.stream.sasl.CredentialsProvider; @@ -179,6 +181,7 @@ public class Client implements AutoCloseable { new ConcurrentHashMap<>(); final List subscriptionOffsets = new CopyOnWriteArrayList<>(); final ExecutorService executorService; + final ExecutorService dispatchingExecutorService; final TuneState tuneState; final AtomicBoolean closing = new AtomicBoolean(false); final ChunkChecksum chunkChecksum; @@ -348,8 +351,37 @@ public void initChannel(SocketChannel ch) { this.channel = f.channel(); this.nettyClosing = Utils.makeIdempotent(this::closeNetty); - this.executorService = Executors.newSingleThreadExecutor(); - this.executorServiceClosing = Utils.makeIdempotent(this.executorService::shutdownNow); + ExecutorServiceFactory executorServiceFactory = parameters.executorServiceFactory; + if (executorServiceFactory == null) { + this.executorService = + Executors.newSingleThreadExecutor(new NamedThreadFactory(clientConnectionName + "-")); + } else { + this.executorService = executorServiceFactory.get(); + } + ExecutorServiceFactory dispatchingExecutorServiceFactory = + parameters.dispatchingExecutorServiceFactory; + if (dispatchingExecutorServiceFactory == null) { + this.dispatchingExecutorService = + Executors.newSingleThreadExecutor( + new NamedThreadFactory("dispatching-" + clientConnectionName + "-")); + } else { + this.dispatchingExecutorService = dispatchingExecutorServiceFactory.get(); + } + this.executorServiceClosing = + Utils.makeIdempotent( + () -> { + this.dispatchingExecutorService.shutdownNow(); + if (dispatchingExecutorServiceFactory == null) { + this.dispatchingExecutorService.shutdownNow(); + } else { + dispatchingExecutorServiceFactory.clientClosed(this.dispatchingExecutorService); + } + if (executorServiceFactory == null) { + this.executorService.shutdownNow(); + } else { + executorServiceFactory.clientClosed(this.executorService); + } + }); try { this.tuneState = new TuneState( @@ -2204,6 +2236,10 @@ public static class ClientParameters { private Duration rpcTimeout; private Consumer channelCustomizer = noOpConsumer(); private Consumer bootstrapCustomizer = noOpConsumer(); + // for messages + private ExecutorServiceFactory dispatchingExecutorServiceFactory; + // for other server frames + private ExecutorServiceFactory executorServiceFactory; public ClientParameters host(String host) { this.host = host; @@ -2363,6 +2399,17 @@ public ClientParameters rpcTimeout(Duration rpcTimeout) { return this; } + public ClientParameters dispatchingExecutorServiceFactory( + ExecutorServiceFactory dispatchingExecutorServiceFactory) { + this.dispatchingExecutorServiceFactory = dispatchingExecutorServiceFactory; + return this; + } + + public ClientParameters executorServiceFactory(ExecutorServiceFactory executorServiceFactory) { + this.executorServiceFactory = executorServiceFactory; + return this; + } + String host() { return this.host; } @@ -2585,7 +2632,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } if (task != null) { - executorService.submit(task); + if (commandId == Constants.COMMAND_DELIVER) { + dispatchingExecutorService.submit(task); + } else { + executorService.submit(task); + } } } diff --git a/src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java b/src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java new file mode 100644 index 0000000000..9eb974d999 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java @@ -0,0 +1,201 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import com.rabbitmq.stream.impl.Utils.NamedThreadFactory; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class DefaultExecutorServiceFactory implements ExecutorServiceFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultExecutorServiceFactory.class); + private static final Comparator EXECUTOR_COMPARATOR = + Comparator.comparingInt(Executor::usage); + + private final List executors; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ThreadFactory threadFactory; + private final int minSize; + private final int clientPerExecutor; + private final Supplier executorFactory; + + DefaultExecutorServiceFactory() { + this(Runtime.getRuntime().availableProcessors(), 10); + } + + DefaultExecutorServiceFactory(int minSize, int clientPerExecutor) { + this.minSize = minSize; + this.clientPerExecutor = clientPerExecutor; + this.threadFactory = new NamedThreadFactory("rabbitmq-stream-connection-"); + this.executorFactory = () -> newExecutor(); + List l = new ArrayList<>(Runtime.getRuntime().availableProcessors()); + IntStream.range(0, Runtime.getRuntime().availableProcessors()) + .forEach(ignored -> l.add(this.executorFactory.get())); + executors = new CopyOnWriteArrayList<>(l); + } + + static void maybeResize( + List current, int min, int clientsPerResource, Supplier factory) { + LOGGER.debug( + "Resizing {}, with min = {}, clients per resource = {}", current, min, clientsPerResource); + int clientCount = 0; + for (Executor executor : current) { + clientCount += executor.usage(); + } + LOGGER.debug("Total usage is {}", clientCount); + + int target = Math.max((clientCount / clientsPerResource) + 1, min); + LOGGER.debug("Target size is {}, current size is {}", target, current.size()); + if (target > current.size()) { + LOGGER.debug("Upsizing..."); + List l = new ArrayList<>(); + for (int i = 0; i < target; i++) { + if (i < current.size()) { + l.add(current.get(i)); + } else { + l.add(factory.get()); + } + } + current.clear(); + current.addAll(l); + LOGGER.debug("New list is {}", current); + } else if (target < current.size()) { + LOGGER.debug("Downsizing..."); + boolean hasUnusedExecutors = current.stream().filter(ex -> ex.usage() == 0).count() > 0; + if (!hasUnusedExecutors) { + LOGGER.debug("No downsizing, there is no unused executor"); + } + if (hasUnusedExecutors) { + List l = new ArrayList<>(target); + for (int i = 0; i < current.size(); i++) { + Executor executor = current.get(i); + if (executor.usage() == 0) { + executor.close(); + } else { + l.add(executor); + } + } + if (l.size() < target) { + for (int i = l.size(); i < target; i++) { + l.add(factory.get()); + } + } + current.clear(); + current.addAll(l); + LOGGER.debug("New list is {}", current); + } + } + } + + private Executor newExecutor() { + return new Executor(Executors.newSingleThreadExecutor(threadFactory)); + } + + @Override + public synchronized ExecutorService get() { + if (closed.get()) { + throw new IllegalStateException("Executor service factory is closed"); + } else { + maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory); + LOGGER.debug("Looking least used executor in {}", this.executors); + Executor executor = this.executors.stream().min(EXECUTOR_COMPARATOR).get(); + LOGGER.debug("Least used executor is {}", executor); + executor.incrementUsage(); + return executor.executorService; + } + } + + @Override + public synchronized void clientClosed(ExecutorService executorService) { + if (!closed.get()) { + Executor executor = find(executorService); + if (executor == null) { + LOGGER.info("Could not find executor service wrapper"); + } else { + executor.decrementUsage(); + maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory); + } + } + } + + private Executor find(ExecutorService executorService) { + for (Executor executor : this.executors) { + if (executor.executorService.equals(executorService)) { + return executor; + } + } + return null; + } + + @Override + public synchronized void close() { + if (closed.compareAndSet(false, true)) { + this.executors.forEach(executor -> executor.executorService.shutdownNow()); + } + } + + static class Executor { + + private final ExecutorService executorService; + private AtomicInteger usage = new AtomicInteger(0); + + Executor(ExecutorService executorService) { + this.executorService = executorService; + } + + Executor incrementUsage() { + this.usage.incrementAndGet(); + return this; + } + + Executor decrementUsage() { + this.usage.decrementAndGet(); + return this; + } + + Executor addUsage(int delta) { + this.usage.addAndGet(delta); + return this; + } + + Executor substractUsage(int delta) { + this.usage.addAndGet(-delta); + return this; + } + + private int usage() { + return this.usage.get(); + } + + private void close() { + this.executorService.shutdownNow(); + } + + @Override + public String toString() { + return "Executor{" + "usage=" + usage.get() + '}'; + } + } +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java b/src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java new file mode 100644 index 0000000000..436669a639 --- /dev/null +++ b/src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java @@ -0,0 +1,26 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import java.util.concurrent.ExecutorService; + +interface ExecutorServiceFactory extends AutoCloseable { + + ExecutorService get(); + + void clientClosed(ExecutorService executorService); + + @Override + void close(); +} diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 1c83c6c1b4..43994a93aa 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -36,6 +36,7 @@ import com.rabbitmq.stream.impl.Utils.ClientConnectionType; import com.rabbitmq.stream.impl.Utils.ClientFactory; import com.rabbitmq.stream.impl.Utils.ClientFactoryContext; +import com.rabbitmq.stream.impl.Utils.NamedThreadFactory; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -47,6 +48,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -70,6 +73,7 @@ class ProducersCoordinator { private final AtomicLong trackerIdSequence = new AtomicLong(0); private final boolean debug = false; private final List producerTrackers = new CopyOnWriteArrayList<>(); + private final ExecutorServiceFactory executorServiceFactory; ProducersCoordinator( StreamEnvironment environment, @@ -82,6 +86,27 @@ class ProducersCoordinator { this.maxProducersByClient = maxProducersByClient; this.maxTrackingConsumersByClient = maxTrackingConsumersByClient; this.connectionNamingStrategy = connectionNamingStrategy; + // use the same single-threaded executor for all client connections + // it's meant for message dispatching, so it should not be used + this.executorServiceFactory = + new ExecutorServiceFactory() { + private final ExecutorService executorService = + Executors.newSingleThreadExecutor( + new NamedThreadFactory("rabbitmq-stream-producers-coordinator-dispatcher-")); + + @Override + public ExecutorService get() { + return executorService; + } + + @Override + public void clientClosed(ExecutorService executorService) {} + + @Override + public void close() { + executorService.shutdownNow(); + } + }; } private static String keyForNode(Client.Broker broker) { @@ -127,7 +152,11 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) { private void addToManager(Broker node, AgentTracker tracker) { ClientParameters clientParameters = - environment.clientParametersCopy().host(node.getHost()).port(node.getPort()); + environment + .clientParametersCopy() + .host(node.getHost()) + .port(node.getPort()) + .dispatchingExecutorServiceFactory(this.executorServiceFactory); ClientProducersManager pickedManager = null; while (pickedManager == null) { Iterator iterator = this.managers.iterator(); @@ -230,6 +259,11 @@ void close() { e.getMessage()); } } + try { + this.executorServiceFactory.close(); + } catch (Exception e) { + LOGGER.info("Error while closing executor service factory: {}", e.getMessage()); + } } int clientCount() { diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 427009d575..8a916ca844 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -99,6 +99,7 @@ class StreamEnvironment implements Environment { private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false); private final Runnable locatorInitializationSequence; private final List locators = new CopyOnWriteArrayList<>(); + private final ExecutorServiceFactory executorServiceFactory = new DefaultExecutorServiceFactory(); StreamEnvironment( ScheduledExecutorService scheduledExecutorService, @@ -149,7 +150,10 @@ class StreamEnvironment implements Environment { this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy; this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy; this.byteBufAllocator = byteBufAllocator; - clientParametersPrototype.byteBufAllocator(byteBufAllocator); + clientParametersPrototype = + clientParametersPrototype + .byteBufAllocator(byteBufAllocator) + .executorServiceFactory(this.executorServiceFactory); clientParametersPrototype = maybeSetUpClientParametersFromUris(uris, clientParametersPrototype); this.addressResolver = addressResolver; @@ -601,6 +605,12 @@ public void close() { } } + try { + this.executorServiceFactory.close(); + } catch (Exception e) { + LOGGER.info("Error while closing executor service factory: {}", e.getMessage()); + } + this.clockRefreshFuture.cancel(false); if (privateScheduleExecutorService) { this.scheduledExecutorService.shutdownNow(); diff --git a/src/test/java/com/rabbitmq/stream/impl/ClientFlowControlTest.java b/src/test/java/com/rabbitmq/stream/impl/ClientFlowControlTest.java new file mode 100644 index 0000000000..c8ee1500e6 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/ClientFlowControlTest.java @@ -0,0 +1,79 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok; +import static com.rabbitmq.stream.impl.TestUtils.b; +import static com.rabbitmq.stream.impl.TestUtils.latchAssert; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.stream.OffsetSpecification; +import com.rabbitmq.stream.impl.Client.ClientParameters; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class ClientFlowControlTest { + + String stream; + TestUtils.ClientFactory cf; + + @Test + void longProcessingShouldNotBlockOtherServerFrames(TestInfo info) { + int messageCount = 100_000; + AtomicInteger processedCount = new AtomicInteger(0); + TestUtils.publishAndWaitForConfirms(cf, messageCount, stream); + CountDownLatch metadataLatch = new CountDownLatch(1); + AtomicReference deletedStream = new AtomicReference<>(); + Client consumerClient = + cf.get( + new ClientParameters() + .chunkListener( + (client, subscriptionId, offset, msgCount, dataSize) -> + client.credit(subscriptionId, 1)) + .messageListener( + (subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + processedCount.incrementAndGet(); + }) + .metadataListener( + (stream, code) -> { + deletedStream.set(stream); + metadataLatch.countDown(); + })); + + String toBeDeletedStream = TestUtils.streamName(info); + assertThat(consumerClient.create(toBeDeletedStream)).is(ok()); + assertThat( + consumerClient.subscribe( + TestUtils.b(0), toBeDeletedStream, OffsetSpecification.first(), 1)) + .is(ok()); + assertThat(consumerClient.subscribe(TestUtils.b(1), stream, OffsetSpecification.first(), 1)) + .is(ok()); + + Client deletionClient = cf.get(); + assertThat(deletionClient.delete(toBeDeletedStream)).is(ok()); + assertThat(latchAssert(metadataLatch)).completes(); + assertThat(deletedStream).hasValue(toBeDeletedStream); + assertThat(consumerClient.unsubscribe(b(1))).is(ok()); + assertThat(processedCount).hasValueLessThan(messageCount); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 1a2d886fd1..3e1c383912 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -143,6 +143,7 @@ public Client.ClientParameters shutdownListener( return super.shutdownListener(shutdownListener); } }; + clientParameters.executorServiceFactory(new DefaultExecutorServiceFactory()); mocks = MockitoAnnotations.openMocks(this); when(environment.locator()).thenReturn(locator); when(environment.locatorOperation(any())).thenCallRealMethod(); diff --git a/src/test/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactoryTest.java b/src/test/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactoryTest.java new file mode 100644 index 0000000000..e97dc401b4 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactoryTest.java @@ -0,0 +1,126 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.DefaultExecutorServiceFactory.maybeResize; +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.rabbitmq.stream.impl.DefaultExecutorServiceFactory.Executor; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class DefaultExecutorServiceFactoryTest { + + int minSize = 4; + int clientsPerExecutor = 10; + + @Mock Supplier factory; + @Mock ExecutorService executorService; + AutoCloseable mocks; + List executors; + + @BeforeEach + void init() { + mocks = MockitoAnnotations.openMocks(this); + when(factory.get()).thenReturn(new Executor(executorService)); + } + + @AfterEach + void tearDown() throws Exception { + mocks.close(); + } + + @Test + void resizeShouldDoNothingWhenClientsPerExecutorIsOkForAll() { + executors = + executors( + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(4); + verify(factory, never()).get(); + verify(executorService, never()).shutdownNow(); + } + + @Test + void resizeShouldIncreaseIfMoreClientsPerExecutorForOneExecutor() { + executors = + executors( + executor().addUsage(clientsPerExecutor), + executor().addUsage(clientsPerExecutor), + executor().addUsage(clientsPerExecutor + 1), + executor().addUsage(clientsPerExecutor)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(5); + verify(factory, times(1)).get(); + verify(executorService, never()).shutdownNow(); + } + + @RepeatedTest(10) + void resizeShouldDecreaseIfFewerClientsPerExecutorAndUnusedExecutor() { + // repeated because the initial list is shuffled + executors = + executors( + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5), + executor(), + executor().addUsage(5)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(4); + verify(factory, never()).get(); + verify(executorService, times(1)).shutdownNow(); + } + + @Test + void resizeShouldNotResizeIfNoUnusedExecutor() { + executors = + executors( + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(5), + executor().addUsage(1), + executor().addUsage(5)); + maybeResize(executors, minSize, clientsPerExecutor, factory); + assertThat(executors).hasSize(5); + verify(factory, never()).get(); + verify(executorService, never()).shutdownNow(); + } + + private Executor executor() { + return new Executor(executorService); + } + + private List executors(Executor... executors) { + List l = asList(executors); + Collections.shuffle(l); + return new CopyOnWriteArrayList<>(l); + } +}