Skip to content

Commit def5523

Browse files
committed
Use lock in executor service factory
Fixes #690
1 parent e424739 commit def5523

File tree

1 file changed

+44
-22
lines changed

1 file changed

+44
-22
lines changed

Diff for: src/main/java/com/rabbitmq/stream/impl/DefaultExecutorServiceFactory.java

+44-22
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.concurrent.ThreadFactory;
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicInteger;
28+
import java.util.concurrent.locks.Lock;
29+
import java.util.concurrent.locks.ReentrantLock;
2830
import java.util.function.Supplier;
2931
import java.util.stream.IntStream;
3032
import org.slf4j.Logger;
@@ -42,12 +44,13 @@ class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
4244
private final int minSize;
4345
private final int clientPerExecutor;
4446
private final Supplier<Executor> executorFactory;
47+
private final Lock lock = new ReentrantLock();
4548

4649
DefaultExecutorServiceFactory(int minSize, int clientPerExecutor, String prefix) {
4750
this.minSize = minSize;
4851
this.clientPerExecutor = clientPerExecutor;
4952
this.threadFactory = threadFactory(prefix);
50-
this.executorFactory = () -> newExecutor();
53+
this.executorFactory = this::newExecutor;
5154
List<Executor> l = new ArrayList<>(this.minSize);
5255
IntStream.range(0, this.minSize).forEach(ignored -> l.add(this.executorFactory.get()));
5356
executors = new CopyOnWriteArrayList<>(l);
@@ -111,29 +114,39 @@ private Executor newExecutor() {
111114
}
112115

113116
@Override
114-
public synchronized ExecutorService get() {
115-
if (closed.get()) {
116-
throw new IllegalStateException("Executor service factory is closed");
117-
} else {
118-
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
119-
LOGGER.debug("Looking least used executor in {}", this.executors);
120-
Executor executor = this.executors.stream().min(EXECUTOR_COMPARATOR).get();
121-
LOGGER.debug("Least used executor is {}", executor);
122-
executor.incrementUsage();
123-
return executor.executorService;
117+
public ExecutorService get() {
118+
this.lock.lock();
119+
try {
120+
if (closed.get()) {
121+
throw new IllegalStateException("Executor service factory is closed");
122+
} else {
123+
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
124+
LOGGER.debug("Looking least used executor in {}", this.executors);
125+
Executor executor = this.executors.stream().min(EXECUTOR_COMPARATOR).get();
126+
LOGGER.debug("Least used executor is {}", executor);
127+
executor.incrementUsage();
128+
return executor.executorService;
129+
}
130+
} finally {
131+
this.lock.unlock();
124132
}
125133
}
126134

127135
@Override
128-
public synchronized void clientClosed(ExecutorService executorService) {
129-
if (!closed.get()) {
130-
Executor executor = find(executorService);
131-
if (executor == null) {
132-
LOGGER.info("Could not find executor service wrapper");
133-
} else {
134-
executor.decrementUsage();
135-
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
136+
public void clientClosed(ExecutorService executorService) {
137+
this.lock.lock();
138+
try {
139+
if (!closed.get()) {
140+
Executor executor = find(executorService);
141+
if (executor == null) {
142+
LOGGER.info("Could not find executor service wrapper");
143+
} else {
144+
executor.decrementUsage();
145+
maybeResize(this.executors, this.minSize, this.clientPerExecutor, this.executorFactory);
146+
}
136147
}
148+
} finally {
149+
this.lock.unlock();
137150
}
138151
}
139152

@@ -148,17 +161,26 @@ private Executor find(ExecutorService executorService) {
148161

149162
@Override
150163
public synchronized void close() {
151-
if (closed.compareAndSet(false, true)) {
152-
this.executors.forEach(executor -> executor.executorService.shutdownNow());
164+
this.lock.lock();
165+
try {
166+
if (closed.compareAndSet(false, true)) {
167+
this.executors.forEach(executor -> executor.executorService.shutdownNow());
168+
}
169+
} finally {
170+
this.lock.unlock();
153171
}
154172
}
155173

156174
static class Executor {
157175

176+
private static final AtomicInteger ID_SEQUENCE = new AtomicInteger();
177+
158178
private final ExecutorService executorService;
159179
private AtomicInteger usage = new AtomicInteger(0);
180+
private final int id;
160181

161182
Executor(ExecutorService executorService) {
183+
this.id = ID_SEQUENCE.getAndIncrement();
162184
this.executorService = executorService;
163185
}
164186

@@ -192,7 +214,7 @@ private void close() {
192214

193215
@Override
194216
public String toString() {
195-
return "Executor{" + "usage=" + usage.get() + '}';
217+
return "Executor{" + "id=" + id + ", usage=" + usage.get() + '}';
196218
}
197219
}
198220
}

0 commit comments

Comments
 (0)