Skip to content

Dispatch messages in dedicated thread #298

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.rabbitmq.stream.ChunkChecksum;
import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.Codec.EncodedMessage;
import com.rabbitmq.stream.Constants;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
Expand All @@ -68,6 +69,7 @@
import com.rabbitmq.stream.impl.Client.ShutdownContext.ShutdownReason;
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandler;
import com.rabbitmq.stream.impl.ServerFrameHandler.FrameHandlerInfo;
import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
import com.rabbitmq.stream.sasl.CredentialsProvider;
Expand Down Expand Up @@ -179,6 +181,7 @@ public class Client implements AutoCloseable {
new ConcurrentHashMap<>();
final List<SubscriptionOffset> subscriptionOffsets = new CopyOnWriteArrayList<>();
final ExecutorService executorService;
final ExecutorService dispatchingExecutorService;
final TuneState tuneState;
final AtomicBoolean closing = new AtomicBoolean(false);
final ChunkChecksum chunkChecksum;
Expand Down Expand Up @@ -348,8 +351,37 @@ public void initChannel(SocketChannel ch) {

this.channel = f.channel();
this.nettyClosing = Utils.makeIdempotent(this::closeNetty);
this.executorService = Executors.newSingleThreadExecutor();
this.executorServiceClosing = Utils.makeIdempotent(this.executorService::shutdownNow);
ExecutorServiceFactory executorServiceFactory = parameters.executorServiceFactory;
if (executorServiceFactory == null) {
this.executorService =
Executors.newSingleThreadExecutor(new NamedThreadFactory(clientConnectionName + "-"));
} else {
this.executorService = executorServiceFactory.get();
}
ExecutorServiceFactory dispatchingExecutorServiceFactory =
parameters.dispatchingExecutorServiceFactory;
if (dispatchingExecutorServiceFactory == null) {
this.dispatchingExecutorService =
Executors.newSingleThreadExecutor(
new NamedThreadFactory("dispatching-" + clientConnectionName + "-"));
} else {
this.dispatchingExecutorService = dispatchingExecutorServiceFactory.get();
}
this.executorServiceClosing =
Utils.makeIdempotent(
() -> {
this.dispatchingExecutorService.shutdownNow();
if (dispatchingExecutorServiceFactory == null) {
this.dispatchingExecutorService.shutdownNow();
} else {
dispatchingExecutorServiceFactory.clientClosed(this.dispatchingExecutorService);
}
if (executorServiceFactory == null) {
this.executorService.shutdownNow();
} else {
executorServiceFactory.clientClosed(this.executorService);
}
});
try {
this.tuneState =
new TuneState(
Expand Down Expand Up @@ -2204,6 +2236,10 @@ public static class ClientParameters {
private Duration rpcTimeout;
private Consumer<Channel> channelCustomizer = noOpConsumer();
private Consumer<Bootstrap> bootstrapCustomizer = noOpConsumer();
// for messages
private ExecutorServiceFactory dispatchingExecutorServiceFactory;
// for other server frames
private ExecutorServiceFactory executorServiceFactory;

public ClientParameters host(String host) {
this.host = host;
Expand Down Expand Up @@ -2363,6 +2399,17 @@ public ClientParameters rpcTimeout(Duration rpcTimeout) {
return this;
}

public ClientParameters dispatchingExecutorServiceFactory(
ExecutorServiceFactory dispatchingExecutorServiceFactory) {
this.dispatchingExecutorServiceFactory = dispatchingExecutorServiceFactory;
return this;
}

public ClientParameters executorServiceFactory(ExecutorServiceFactory executorServiceFactory) {
this.executorServiceFactory = executorServiceFactory;
return this;
}

String host() {
return this.host;
}
Expand Down Expand Up @@ -2585,7 +2632,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}

if (task != null) {
executorService.submit(task);
if (commandId == Constants.COMMAND_DELIVER) {
dispatchingExecutorService.submit(task);
} else {
executorService.submit(task);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.impl.Utils.NamedThreadFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DefaultExecutorServiceFactory implements ExecutorServiceFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultExecutorServiceFactory.class);
private static final Comparator<Executor> EXECUTOR_COMPARATOR =
Comparator.comparingInt(Executor::usage);

private final List<Executor> executors;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ThreadFactory threadFactory;
private final int minSize;
private final int clientPerExecutor;
private final Supplier<Executor> executorFactory;

DefaultExecutorServiceFactory() {
this(Runtime.getRuntime().availableProcessors(), 10);
}

DefaultExecutorServiceFactory(int minSize, int clientPerExecutor) {
this.minSize = minSize;
this.clientPerExecutor = clientPerExecutor;
this.threadFactory = new NamedThreadFactory("rabbitmq-stream-connection-");
this.executorFactory = () -> newExecutor();
List<Executor> l = new ArrayList<>(Runtime.getRuntime().availableProcessors());
IntStream.range(0, Runtime.getRuntime().availableProcessors())
.forEach(ignored -> l.add(this.executorFactory.get()));
executors = new CopyOnWriteArrayList<>(l);
}

static void maybeResize(
List<Executor> current, int min, int clientsPerResource, Supplier<Executor> factory) {
LOGGER.debug(
"Resizing {}, with min = {}, clients per resource = {}", current, min, clientsPerResource);
int clientCount = 0;
for (Executor executor : current) {
clientCount += executor.usage();
}
LOGGER.debug("Total usage is {}", clientCount);

int target = Math.max((clientCount / clientsPerResource) + 1, min);
LOGGER.debug("Target size is {}, current size is {}", target, current.size());
if (target > current.size()) {
LOGGER.debug("Upsizing...");
List<Executor> l = new ArrayList<>();
for (int i = 0; i < target; i++) {
if (i < current.size()) {
l.add(current.get(i));
} else {
l.add(factory.get());
}
}
current.clear();
current.addAll(l);
LOGGER.debug("New list is {}", current);
} else if (target < current.size()) {
LOGGER.debug("Downsizing...");
boolean hasUnusedExecutors = current.stream().filter(ex -> ex.usage() == 0).count() > 0;
if (!hasUnusedExecutors) {
LOGGER.debug("No downsizing, there is no unused executor");
}
if (hasUnusedExecutors) {
List<Executor> l = new ArrayList<>(target);
for (int i = 0; i < current.size(); i++) {
Executor executor = current.get(i);
if (executor.usage() == 0) {
executor.close();
} else {
l.add(executor);
}
}
if (l.size() < target) {
for (int i = l.size(); i < target; i++) {
l.add(factory.get());
}
}
current.clear();
current.addAll(l);
LOGGER.debug("New list is {}", current);
}
}
}

private Executor newExecutor() {
return new Executor(Executors.newSingleThreadExecutor(threadFactory));
}

@Override
public synchronized ExecutorService get() {
if (closed.get()) {
throw new IllegalStateException("Executor service factory is closed");
} else {
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
LOGGER.debug("Looking least used executor in {}", this.executors);
Executor executor = this.executors.stream().min(EXECUTOR_COMPARATOR).get();
LOGGER.debug("Least used executor is {}", executor);
executor.incrementUsage();
return executor.executorService;
}
}

@Override
public synchronized void clientClosed(ExecutorService executorService) {
if (!closed.get()) {
Executor executor = find(executorService);
if (executor == null) {
LOGGER.info("Could not find executor service wrapper");
} else {
executor.decrementUsage();
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
}
}
}

private Executor find(ExecutorService executorService) {
for (Executor executor : this.executors) {
if (executor.executorService.equals(executorService)) {
return executor;
}
}
return null;
}

@Override
public synchronized void close() {
if (closed.compareAndSet(false, true)) {
this.executors.forEach(executor -> executor.executorService.shutdownNow());
}
}

static class Executor {

private final ExecutorService executorService;
private AtomicInteger usage = new AtomicInteger(0);

Executor(ExecutorService executorService) {
this.executorService = executorService;
}

Executor incrementUsage() {
this.usage.incrementAndGet();
return this;
}

Executor decrementUsage() {
this.usage.decrementAndGet();
return this;
}

Executor addUsage(int delta) {
this.usage.addAndGet(delta);
return this;
}

Executor substractUsage(int delta) {
this.usage.addAndGet(-delta);
return this;
}

private int usage() {
return this.usage.get();
}

private void close() {
this.executorService.shutdownNow();
}

@Override
public String toString() {
return "Executor{" + "usage=" + usage.get() + '}';
}
}
}
26 changes: 26 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/ExecutorServiceFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream.impl;

import java.util.concurrent.ExecutorService;

interface ExecutorServiceFactory extends AutoCloseable {

ExecutorService get();

void clientClosed(ExecutorService executorService);

@Override
void close();
}
Loading