Skip to content

Commit 27e1e7f

Browse files
committed
Add executor service argument to Channel#asyncCompletableRpc
RPC response comes from the reading threads and this can cause deadlocks if the client doesn't use CompletableFuture#*async methods. References #215
1 parent f250c6e commit 27e1e7f

File tree

6 files changed

+57
-26
lines changed

6 files changed

+57
-26
lines changed

src/main/java/com/rabbitmq/client/Channel.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.io.IOException;
1919
import java.util.Map;
2020
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ExecutorService;
2122
import java.util.concurrent.TimeoutException;
2223

2324
import com.rabbitmq.client.AMQP.BasicProperties;
@@ -1367,9 +1368,10 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
13671368
/**
13681369
* Asynchronously send a method over this channel.
13691370
* @param method method to transmit over this channel.
1371+
* @param executorService executor used to complete the operation, can be null
13701372
* @return a completable future that completes when the result is received
13711373
* @throws IOException Problem transmitting method.
13721374
*/
1373-
CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException;
1375+
CompletableFuture<Command> asyncCompletableRpc(Method method, ExecutorService executorService) throws IOException;
13741376

13751377
}

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.io.IOException;
3131
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.ExecutorService;
3233
import java.util.concurrent.TimeoutException;
3334
import java.util.function.Supplier;
3435

@@ -145,11 +146,11 @@ public AMQCommand exnWrappingRpc(Method m)
145146
}
146147
}
147148

148-
public CompletableFuture<Command> exnWrappingAsyncRpc(Method m)
149+
public CompletableFuture<Command> exnWrappingAsyncRpc(Method m, ExecutorService executorService)
149150
throws IOException
150151
{
151152
try {
152-
return privateAsyncRpc(m);
153+
return privateAsyncRpc(m, executorService);
153154
} catch (AlreadyClosedException ace) {
154155
// Do not wrap it since it means that connection/channel
155156
// was closed in some action in the past
@@ -204,8 +205,8 @@ public void enqueueRpc(RpcContinuation k)
204205
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
205206
}
206207

207-
public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future) {
208-
doEnqueueRpc(() -> new CompletableFutureRpcWrapper(method, future));
208+
public void enqueueAsyncRpc(Method method, CompletableFuture<Command> future, ExecutorService executorService) {
209+
doEnqueueRpc(() -> new CompletableFutureRpcWrapper(method, future, executorService));
209210
}
210211

211212
private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
@@ -308,11 +309,11 @@ protected ChannelContinuationTimeoutException wrapTimeoutException(final Method
308309
return new ChannelContinuationTimeoutException(e, this, this._channelNumber, m);
309310
}
310311

311-
private CompletableFuture<Command> privateAsyncRpc(Method m)
312+
private CompletableFuture<Command> privateAsyncRpc(Method m, ExecutorService executorService)
312313
throws IOException, ShutdownSignalException
313314
{
314315
CompletableFuture<Command> future = new CompletableFuture<>();
315-
asyncRpc(m, future);
316+
asyncRpc(m, future, executorService);
316317
return future;
317318
}
318319

@@ -347,20 +348,20 @@ public void quiescingRpc(Method m, RpcContinuation k)
347348
}
348349
}
349350

350-
public void asyncRpc(Method m, CompletableFuture<Command> future)
351+
public void asyncRpc(Method m, CompletableFuture<Command> future, ExecutorService executorService)
351352
throws IOException
352353
{
353354
synchronized (_channelMutex) {
354355
ensureIsOpen();
355-
quiescingAsyncRpc(m, future);
356+
quiescingAsyncRpc(m, future, executorService);
356357
}
357358
}
358359

359-
public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future)
360+
public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future, ExecutorService executorService)
360361
throws IOException
361362
{
362363
synchronized (_channelMutex) {
363-
enqueueAsyncRpc(m, future);
364+
enqueueAsyncRpc(m, future, executorService);
364365
quiescingTransmit(m);
365366
}
366367
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,7 @@
2222
import java.util.Map;
2323
import java.util.SortedSet;
2424
import java.util.TreeSet;
25-
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.CopyOnWriteArrayList;
27-
import java.util.concurrent.CountDownLatch;
28-
import java.util.concurrent.TimeoutException;
25+
import java.util.concurrent.*;
2926

3027
import com.rabbitmq.client.ConfirmCallback;
3128
import com.rabbitmq.client.*;
@@ -1558,8 +1555,8 @@ public AMQCommand rpc(Method method) throws IOException {
15581555
}
15591556

15601557
@Override
1561-
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
1562-
return exnWrappingAsyncRpc(method);
1558+
public CompletableFuture<Command> asyncCompletableRpc(Method method, ExecutorService executorService) throws IOException {
1559+
return exnWrappingAsyncRpc(method, executorService);
15631560
}
15641561

15651562
@Override

src/main/java/com/rabbitmq/client/impl/CompletableFutureRpcWrapper.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.rabbitmq.client.Method;
2020

2121
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executors;
2224

2325
/**
2426
*
@@ -29,9 +31,12 @@ public class CompletableFutureRpcWrapper implements RpcWrapper {
2931

3032
private final CompletableFuture<Command> completableFuture;
3133

32-
public CompletableFutureRpcWrapper(Method method, CompletableFuture<Command> completableFuture) {
34+
private final ExecutorService executorService;
35+
36+
public CompletableFutureRpcWrapper(Method method, CompletableFuture<Command> completableFuture, ExecutorService executorService) {
3337
this.request = method;
3438
this.completableFuture = completableFuture;
39+
this.executorService = executorService;
3540
}
3641

3742
@Override
@@ -41,7 +46,11 @@ public boolean canHandleReply(AMQCommand command) {
4146

4247
@Override
4348
public void complete(AMQCommand command) {
44-
completableFuture.complete(command);
49+
if (executorService == null) {
50+
completableFuture.complete(command);
51+
} else {
52+
executorService.submit(() -> completableFuture.complete(command));
53+
}
4554
}
4655

4756
@Override

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.*;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.CopyOnWriteArrayList;
25+
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.TimeoutException;
2627

2728
/**
@@ -897,8 +898,8 @@ void updateConsumerTag(String tag, String newTag) {
897898
}
898899

899900
@Override
900-
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
901-
return this.delegate.asyncCompletableRpc(method);
901+
public CompletableFuture<Command> asyncCompletableRpc(Method method, ExecutorService executorService) throws IOException {
902+
return this.delegate.asyncCompletableRpc(method, executorService);
902903
}
903904

904905
@Override

src/test/java/com/rabbitmq/client/test/ChannelAsyncCompletableFutureTest.java

+26-5
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,37 @@
2222
import org.junit.After;
2323
import org.junit.Before;
2424
import org.junit.Test;
25+
import org.junit.runner.RunWith;
26+
import org.junit.runners.Parameterized;
2527

2628
import java.io.IOException;
29+
import java.util.ArrayList;
30+
import java.util.Collection;
31+
import java.util.List;
2732
import java.util.UUID;
28-
import java.util.concurrent.*;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
36+
import java.util.concurrent.TimeUnit;
2937

3038
import static org.junit.Assert.assertTrue;
3139

40+
@RunWith(Parameterized.class)
3241
public class ChannelAsyncCompletableFutureTest extends BrokerTestCase {
3342

34-
ExecutorService executor;
43+
ExecutorService executor, methodArgumentExecutor;
44+
45+
@Parameterized.Parameters
46+
public static Collection<ExecutorService> params() {
47+
List<ExecutorService> executors = new ArrayList<>();
48+
executors.add(null);
49+
executors.add(Executors.newSingleThreadExecutor());
50+
return executors;
51+
}
52+
53+
public ChannelAsyncCompletableFutureTest(ExecutorService methodArgumentExecutor) {
54+
this.methodArgumentExecutor = methodArgumentExecutor;
55+
}
3556

3657
String queue;
3758
String exchange;
@@ -61,7 +82,7 @@ public void async() throws Exception {
6182
.arguments(null)
6283
.build();
6384

64-
channel.asyncCompletableRpc(queueDeclare)
85+
channel.asyncCompletableRpc(queueDeclare, null)
6586
.thenComposeAsync(action -> {
6687
try {
6788
return channel.asyncCompletableRpc(new AMQImpl.Exchange.Declare.Builder()
@@ -70,7 +91,7 @@ public void async() throws Exception {
7091
.durable(false)
7192
.autoDelete(false)
7293
.arguments(null)
73-
.build());
94+
.build(), methodArgumentExecutor);
7495
} catch (IOException e) {
7596
throw new RuntimeException(e);
7697
}
@@ -81,7 +102,7 @@ public void async() throws Exception {
81102
.exchange(exchange)
82103
.routingKey("")
83104
.arguments(null)
84-
.build());
105+
.build(), methodArgumentExecutor);
85106
} catch (IOException e) {
86107
throw new RuntimeException(e);
87108
}

0 commit comments

Comments
 (0)