diff --git a/src/main/java/com/rabbitmq/client/Channel.java b/src/main/java/com/rabbitmq/client/Channel.java index 52546d2dd0..1b2cfd33dc 100644 --- a/src/main/java/com/rabbitmq/client/Channel.java +++ b/src/main/java/com/rabbitmq/client/Channel.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; @@ -1363,4 +1364,12 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ long consumerCount(String queue) throws IOException; + /** + * Asynchronously send a method over this channel. + * @param method method to transmit over this channel. + * @return a completable future that completes when the result is received + * @throws IOException Problem transmitting method. + */ + CompletableFuture asyncCompletableRpc(Method method) throws IOException; + } diff --git a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java index 6726f728f8..986a59c210 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQChannel.java @@ -16,15 +16,17 @@ package com.rabbitmq.client.impl; -import java.io.IOException; -import java.util.concurrent.TimeoutException; - import com.rabbitmq.client.*; import com.rabbitmq.client.Method; import com.rabbitmq.utility.BlockingValueOrException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + /** * Base class modelling an AMQ channel. Subclasses implement * {@link com.rabbitmq.client.Channel#close} and @@ -58,7 +60,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent { private AMQCommand _command = new AMQCommand(); /** The current outstanding RPC request, if any. (Could become a queue in future.) */ - private RpcContinuation _activeRpc = null; + private RpcWrapper _activeRpc = null; /** Whether transmission of content-bearing methods should be blocked */ public volatile boolean _blockContent = false; @@ -135,6 +137,20 @@ public AMQCommand exnWrappingRpc(Method m) } } + public CompletableFuture exnWrappingAsyncRpc(Method m) + throws IOException + { + try { + return privateAsyncRpc(m); + } catch (AlreadyClosedException ace) { + // Do not wrap it since it means that connection/channel + // was closed in some action in the past + throw ace; + } catch (ShutdownSignalException ex) { + throw wrap(ex); + } + } + /** * Private API - handle a command which has been assembled * @throws IOException if there's any problem @@ -154,10 +170,10 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException if (!processAsync(command)) { // The filter decided not to handle/consume the command, // so it must be some reply to an earlier RPC. - RpcContinuation nextOutstandingRpc = nextOutstandingRpc(); + RpcWrapper nextOutstandingRpc = nextOutstandingRpc(); // the outstanding RPC can be null when calling Channel#asyncRpc if(nextOutstandingRpc != null) { - nextOutstandingRpc.handleCommand(command); + nextOutstandingRpc.complete(command); markRpcFinished(); } } @@ -165,6 +181,14 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException public void enqueueRpc(RpcContinuation k) { + doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k)); + } + + public void enqueueAsyncRpc(CompletableFuture future) { + doEnqueueRpc(() -> new CompletableFutureRpcWrapper(future)); + } + + private void doEnqueueRpc(Supplier rpcWrapperSupplier) { synchronized (_channelMutex) { boolean waitClearedInterruptStatus = false; while (_activeRpc != null) { @@ -177,7 +201,7 @@ public void enqueueRpc(RpcContinuation k) if (waitClearedInterruptStatus) { Thread.currentThread().interrupt(); } - _activeRpc = k; + _activeRpc = rpcWrapperSupplier.get(); } } @@ -188,10 +212,10 @@ public boolean isOutstandingRpc() } } - public RpcContinuation nextOutstandingRpc() + public RpcWrapper nextOutstandingRpc() { synchronized (_channelMutex) { - RpcContinuation result = _activeRpc; + RpcWrapper result = _activeRpc; _activeRpc = null; _channelMutex.notifyAll(); return result; @@ -255,6 +279,14 @@ private AMQCommand privateRpc(Method m) } } + private CompletableFuture privateAsyncRpc(Method m) + throws IOException, ShutdownSignalException + { + CompletableFuture future = new CompletableFuture<>(); + asyncRpc(m, future); + return future; + } + private AMQCommand privateRpc(Method m, int timeout) throws IOException, ShutdownSignalException, TimeoutException { SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(); @@ -281,6 +313,24 @@ public void quiescingRpc(Method m, RpcContinuation k) } } + public void asyncRpc(Method m, CompletableFuture future) + throws IOException + { + synchronized (_channelMutex) { + ensureIsOpen(); + quiescingAsyncRpc(m, future); + } + } + + public void quiescingAsyncRpc(Method m, CompletableFuture future) + throws IOException + { + synchronized (_channelMutex) { + enqueueAsyncRpc(future); + quiescingTransmit(m); + } + } + /** * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method * returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as @@ -321,9 +371,9 @@ public void processShutdownSignal(ShutdownSignalException signal, } public void notifyOutstandingRpc(ShutdownSignalException signal) { - RpcContinuation k = nextOutstandingRpc(); + RpcWrapper k = nextOutstandingRpc(); if (k != null) { - k.handleShutdownSignal(signal); + k.shutdown(signal); } } diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index e51816747b..99d917b2dc 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; @@ -1531,6 +1532,11 @@ public AMQCommand rpc(Method method) throws IOException { return exnWrappingRpc(method); } + @Override + public CompletableFuture asyncCompletableRpc(Method method) throws IOException { + return exnWrappingAsyncRpc(method); + } + @Override public void enqueueRpc(RpcContinuation k) { synchronized (_channelMutex) { diff --git a/src/main/java/com/rabbitmq/client/impl/CompletableFutureRpcWrapper.java b/src/main/java/com/rabbitmq/client/impl/CompletableFutureRpcWrapper.java new file mode 100644 index 0000000000..780c3d6036 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/CompletableFutureRpcWrapper.java @@ -0,0 +1,43 @@ +// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl; + +import com.rabbitmq.client.Command; +import com.rabbitmq.client.ShutdownSignalException; + +import java.util.concurrent.CompletableFuture; + +/** + * + */ +public class CompletableFutureRpcWrapper implements RpcWrapper { + + private final CompletableFuture completableFuture; + + public CompletableFutureRpcWrapper(CompletableFuture completableFuture) { + this.completableFuture = completableFuture; + } + + @Override + public void complete(AMQCommand command) { + completableFuture.complete(command); + } + + @Override + public void shutdown(ShutdownSignalException signal) { + completableFuture.completeExceptionally(signal); + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/RpcContinuationRpcWrapper.java b/src/main/java/com/rabbitmq/client/impl/RpcContinuationRpcWrapper.java new file mode 100644 index 0000000000..180b3c4db7 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/RpcContinuationRpcWrapper.java @@ -0,0 +1,40 @@ +// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl; + +import com.rabbitmq.client.ShutdownSignalException; + +/** + * + */ +public class RpcContinuationRpcWrapper implements RpcWrapper { + + private final AMQChannel.RpcContinuation continuation; + + public RpcContinuationRpcWrapper(AMQChannel.RpcContinuation continuation) { + this.continuation = continuation; + } + + @Override + public void complete(AMQCommand command) { + continuation.handleCommand(command); + } + + @Override + public void shutdown(ShutdownSignalException signal) { + continuation.handleShutdownSignal(signal); + } +} diff --git a/src/main/java/com/rabbitmq/client/impl/RpcWrapper.java b/src/main/java/com/rabbitmq/client/impl/RpcWrapper.java new file mode 100644 index 0000000000..636019ea5c --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/RpcWrapper.java @@ -0,0 +1,29 @@ +// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl; + +import com.rabbitmq.client.ShutdownSignalException; + +/** + * + */ +public interface RpcWrapper { + + void complete(AMQCommand command); + + void shutdown(ShutdownSignalException signal); + +} diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index bbcacaae8c..5c66c38014 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; @@ -895,6 +896,11 @@ void updateConsumerTag(String tag, String newTag) { } } + @Override + public CompletableFuture asyncCompletableRpc(Method method) throws IOException { + return this.delegate.asyncCompletableRpc(method); + } + @Override public String toString() { return this.delegate.toString(); diff --git a/src/test/java/com/rabbitmq/client/test/ChannelAsyncCompletableFutureTest.java b/src/test/java/com/rabbitmq/client/test/ChannelAsyncCompletableFutureTest.java new file mode 100644 index 0000000000..d3beffc25e --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/ChannelAsyncCompletableFutureTest.java @@ -0,0 +1,110 @@ +// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.impl.AMQImpl; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.*; + +import static org.junit.Assert.assertTrue; + +public class ChannelAsyncCompletableFutureTest extends BrokerTestCase { + + ExecutorService executor; + + String queue; + String exchange; + + @Before public void init() { + executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + queue = UUID.randomUUID().toString(); + exchange = UUID.randomUUID().toString(); + } + + @After public void tearDown() throws IOException { + executor.shutdownNow(); + channel.queueDelete(queue); + channel.exchangeDelete(exchange); + } + + @Test + public void async() throws Exception { + channel.confirmSelect(); + + CountDownLatch latch = new CountDownLatch(1); + AMQP.Queue.Declare queueDeclare = new AMQImpl.Queue.Declare.Builder() + .queue(queue) + .durable(true) + .exclusive(false) + .autoDelete(false) + .arguments(null) + .build(); + + channel.asyncCompletableRpc(queueDeclare) + .thenComposeAsync(action -> { + try { + return channel.asyncCompletableRpc(new AMQImpl.Exchange.Declare.Builder() + .exchange(exchange) + .type("fanout") + .durable(false) + .autoDelete(false) + .arguments(null) + .build()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, executor).thenComposeAsync(action -> { + try { + return channel.asyncCompletableRpc(new AMQImpl.Queue.Bind.Builder() + .queue(queue) + .exchange(exchange) + .routingKey("") + .arguments(null) + .build()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, executor).thenAcceptAsync(action -> { + try { + channel.basicPublish("", queue, null, "dummy".getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, executor).thenAcceptAsync((whatever) -> { + try { + channel.basicConsume(queue, true, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + latch.countDown(); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, executor); + channel.waitForConfirmsOrDie(1000); + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + +} diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index de2b189d9d..84c901c494 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -50,7 +50,8 @@ RecoveryAwareAMQConnectionFactoryTest.class, RpcTest.class, SslContextFactoryTest.class, - LambdaCallbackTest.class + LambdaCallbackTest.class, + ChannelAsyncCompletableFutureTest.class }) public class ClientTests {