Skip to content

Commit 8327c6a

Browse files
committed
Remove executor service argument from Channel#asyncCompletableRpc
Even if CompletableFuture is completed in the reading thread, client API (e.g. CompletableStage#then*, Reactor Flux/Mono) provide ways to control the threading behavior. References #215
1 parent b6c3f96 commit 8327c6a

File tree

6 files changed

+21
-55
lines changed

6 files changed

+21
-55
lines changed

src/main/java/com/rabbitmq/client/Channel.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.io.IOException;
1919
import java.util.Map;
2020
import java.util.concurrent.CompletableFuture;
21-
import java.util.concurrent.ExecutorService;
2221
import java.util.concurrent.TimeoutException;
2322

2423
import com.rabbitmq.client.AMQP.BasicProperties;
@@ -1368,10 +1367,9 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
13681367
/**
13691368
* Asynchronously send a method over this channel.
13701369
* @param method method to transmit over this channel.
1371-
* @param executorService executor used to complete the operation, can be null
13721370
* @return a completable future that completes when the result is received
13731371
* @throws IOException Problem transmitting method.
13741372
*/
1375-
CompletableFuture<Command> asyncCompletableRpc(Method method, ExecutorService executorService) throws IOException;
1373+
CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException;
13761374

13771375
}

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import java.io.IOException;
3131
import java.util.concurrent.CompletableFuture;
32-
import java.util.concurrent.ExecutorService;
3332
import java.util.concurrent.TimeoutException;
3433
import java.util.function.Supplier;
3534

@@ -146,11 +145,11 @@ public AMQCommand exnWrappingRpc(Method m)
146145
}
147146
}
148147

149-
public CompletableFuture<Command> exnWrappingAsyncRpc(Method m, ExecutorService executorService)
148+
public CompletableFuture<Command> exnWrappingAsyncRpc(Method m)
150149
throws IOException
151150
{
152151
try {
153-
return privateAsyncRpc(m, executorService);
152+
return privateAsyncRpc(m);
154153
} catch (AlreadyClosedException ace) {
155154
// Do not wrap it since it means that connection/channel
156155
// was closed in some action in the past
@@ -205,8 +204,8 @@ public void enqueueRpc(RpcContinuation k)
205204
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
206205
}
207206

208-
public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future, ExecutorService executorService) {
209-
doEnqueueRpc(() -> new CompletableFutureRpcWrapper(method, future, executorService));
207+
public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future) {
208+
doEnqueueRpc(() -> new CompletableFutureRpcWrapper(method, future));
210209
}
211210

212211
private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
@@ -309,11 +308,11 @@ protected ChannelContinuationTimeoutException wrapTimeoutException(final Method
309308
return new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
310309
}
311310

312-
private CompletableFuture<Command> privateAsyncRpc(Method m, ExecutorService executorService)
311+
private CompletableFuture<Command> privateAsyncRpc(Method m)
313312
throws IOException, ShutdownSignalException
314313
{
315314
CompletableFuture<Command> future = new CompletableFuture<>();
316-
asyncRpc(m, future, executorService);
315+
asyncRpc(m, future);
317316
return future;
318317
}
319318

@@ -348,20 +347,20 @@ public void quiescingRpc(Method m, RpcContinuation k)
348347
}
349348
}
350349

351-
public void asyncRpc(Method m, CompletableFuture<Command> future, ExecutorService executorService)
350+
public void asyncRpc(Method m, CompletableFuture<Command> future)
352351
throws IOException
353352
{
354353
synchronized (_channelMutex) {
355354
ensureIsOpen();
356-
quiescingAsyncRpc(m, future, executorService);
355+
quiescingAsyncRpc(m, future);
357356
}
358357
}
359358

360-
public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future, ExecutorService executorService)
359+
public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future)
361360
throws IOException
362361
{
363362
synchronized (_channelMutex) {
364-
enqueueAsyncRpc(m, future, executorService);
363+
enqueueAsyncRpc(m, future);
365364
quiescingTransmit(m);
366365
}
367366
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1555,8 +1555,8 @@ public AMQCommand rpc(Method method) throws IOException {
15551555
}
15561556

15571557
@Override
1558-
public CompletableFuture<Command> asyncCompletableRpc(Method method, ExecutorService executorService) throws IOException {
1559-
return exnWrappingAsyncRpc(method, executorService);
1558+
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
1559+
return exnWrappingAsyncRpc(method);
15601560
}
15611561

15621562
@Override

src/main/java/com/rabbitmq/client/impl/CompletableFutureRpcWrapper.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import com.rabbitmq.client.Method;
2020

2121
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.ExecutorService;
23-
import java.util.concurrent.Executors;
2422

2523
/**
2624
*
@@ -31,12 +29,9 @@ public class CompletableFutureRpcWrapper implements RpcWrapper {
3129

3230
private final CompletableFuture<Command> completableFuture;
3331

34-
private final ExecutorService executorService;
35-
36-
public CompletableFutureRpcWrapper(Method method, CompletableFuture<Command> completableFuture, ExecutorService executorService) {
32+
public CompletableFutureRpcWrapper(Method method, CompletableFuture<Command> completableFuture) {
3733
this.request = method;
3834
this.completableFuture = completableFuture;
39-
this.executorService = executorService;
4035
}
4136

4237
@Override
@@ -46,11 +41,7 @@ public boolean canHandleReply(AMQCommand command) {
4641

4742
@Override
4843
public void complete(AMQCommand command) {
49-
if (executorService == null) {
50-
completableFuture.complete(command);
51-
} else {
52-
executorService.submit(() -> completableFuture.complete(command));
53-
}
44+
completableFuture.complete(command);
5445
}
5546

5647
@Override

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.*;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.CopyOnWriteArrayList;
25-
import java.util.concurrent.ExecutorService;
2625
import java.util.concurrent.TimeoutException;
2726

2827
/**
@@ -898,8 +897,8 @@ void updateConsumerTag(String tag, String newTag) {
898897
}
899898

900899
@Override
901-
public CompletableFuture<Command> asyncCompletableRpc(Method method, ExecutorService executorService) throws IOException {
902-
return this.delegate.asyncCompletableRpc(method, executorService);
900+
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
901+
return this.delegate.asyncCompletableRpc(method);
903902
}
904903

905904
@Override

src/test/java/com/rabbitmq/client/test/ChannelAsyncCompletableFutureTest.java

+4-25
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,8 @@
2222
import org.junit.After;
2323
import org.junit.Before;
2424
import org.junit.Test;
25-
import org.junit.runner.RunWith;
26-
import org.junit.runners.Parameterized;
2725

2826
import java.io.IOException;
29-
import java.util.ArrayList;
30-
import java.util.Collection;
31-
import java.util.List;
3227
import java.util.UUID;
3328
import java.util.concurrent.CountDownLatch;
3429
import java.util.concurrent.ExecutorService;
@@ -37,22 +32,9 @@
3732

3833
import static org.junit.Assert.assertTrue;
3934

40-
@RunWith(Parameterized.class)
4135
public class ChannelAsyncCompletableFutureTest extends BrokerTestCase {
4236

43-
ExecutorService executor, methodArgumentExecutor;
44-
45-
@Parameterized.Parameters
46-
public static Collection<ExecutorService> params() {
47-
List<ExecutorService> executors = new ArrayList<>();
48-
executors.add(null);
49-
executors.add(Executors.newSingleThreadExecutor());
50-
return executors;
51-
}
52-
53-
public ChannelAsyncCompletableFutureTest(ExecutorService methodArgumentExecutor) {
54-
this.methodArgumentExecutor = methodArgumentExecutor;
55-
}
37+
ExecutorService executor;
5638

5739
String queue;
5840
String exchange;
@@ -67,9 +49,6 @@ public ChannelAsyncCompletableFutureTest(ExecutorService methodArgumentExecutor)
6749
executor.shutdownNow();
6850
channel.queueDelete(queue);
6951
channel.exchangeDelete(exchange);
70-
if (methodArgumentExecutor != null) {
71-
methodArgumentExecutor.shutdownNow();
72-
}
7352
}
7453

7554
@Test
@@ -85,7 +64,7 @@ public void async() throws Exception {
8564
.arguments(null)
8665
.build();
8766

88-
channel.asyncCompletableRpc(queueDeclare, null)
67+
channel.asyncCompletableRpc(queueDeclare)
8968
.thenComposeAsync(action -> {
9069
try {
9170
return channel.asyncCompletableRpc(new AMQImpl.Exchange.Declare.Builder()
@@ -94,7 +73,7 @@ public void async() throws Exception {
9473
.durable(false)
9574
.autoDelete(false)
9675
.arguments(null)
97-
.build(), methodArgumentExecutor);
76+
.build());
9877
} catch (IOException e) {
9978
throw new RuntimeException(e);
10079
}
@@ -105,7 +84,7 @@ public void async() throws Exception {
10584
.exchange(exchange)
10685
.routingKey("")
10786
.arguments(null)
108-
.build(), methodArgumentExecutor);
87+
.build());
10988
} catch (IOException e) {
11089
throw new RuntimeException(e);
11190
}

0 commit comments

Comments
 (0)