Skip to content

Add CompletableFuture async RPC to channel #257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 21, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/main/java/com/rabbitmq/client/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
Expand Down Expand Up @@ -1363,4 +1364,12 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
*/
long consumerCount(String queue) throws IOException;

/**
* Asynchronously send a method over this channel.
* @param method method to transmit over this channel.
* @return a completable future that completes when the result is received
* @throws IOException Problem transmitting method.
*/
CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException;

}
72 changes: 61 additions & 11 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

package com.rabbitmq.client.impl;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;
import com.rabbitmq.client.Method;
import com.rabbitmq.utility.BlockingValueOrException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

/**
* Base class modelling an AMQ channel. Subclasses implement
* {@link com.rabbitmq.client.Channel#close} and
Expand Down Expand Up @@ -58,7 +60,7 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
private AMQCommand _command = new AMQCommand();

/** The current outstanding RPC request, if any. (Could become a queue in future.) */
private RpcContinuation _activeRpc = null;
private RpcWrapper _activeRpc = null;

/** Whether transmission of content-bearing methods should be blocked */
public volatile boolean _blockContent = false;
Expand Down Expand Up @@ -135,6 +137,20 @@ public AMQCommand exnWrappingRpc(Method m)
}
}

public CompletableFuture<Command> exnWrappingAsyncRpc(Method m)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public method, I'd like to clarify the name. How about exceptionWrappingAsyncRpc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now see we already have a "non-async" method with a similar name from way back. Disregard the above.

throws IOException
{
try {
return privateAsyncRpc(m);
} catch (AlreadyClosedException ace) {
// Do not wrap it since it means that connection/channel
// was closed in some action in the past
throw ace;
} catch (ShutdownSignalException ex) {
throw wrap(ex);
}
}

/**
* Private API - handle a command which has been assembled
* @throws IOException if there's any problem
Expand All @@ -154,17 +170,25 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
if (!processAsync(command)) {
// The filter decided not to handle/consume the command,
// so it must be some reply to an earlier RPC.
RpcContinuation nextOutstandingRpc = nextOutstandingRpc();
RpcWrapper nextOutstandingRpc = nextOutstandingRpc();
// the outstanding RPC can be null when calling Channel#asyncRpc
if(nextOutstandingRpc != null) {
nextOutstandingRpc.handleCommand(command);
nextOutstandingRpc.complete(command);
markRpcFinished();
}
}
}

public void enqueueRpc(RpcContinuation k)
{
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}

public void enqueueAsyncRpc(CompletableFuture<Command> future) {
doEnqueueRpc(() -> new CompletableFutureRpcWrapper(future));
}

private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
synchronized (_channelMutex) {
boolean waitClearedInterruptStatus = false;
while (_activeRpc != null) {
Expand All @@ -177,7 +201,7 @@ public void enqueueRpc(RpcContinuation k)
if (waitClearedInterruptStatus) {
Thread.currentThread().interrupt();
}
_activeRpc = k;
_activeRpc = rpcWrapperSupplier.get();
}
}

Expand All @@ -188,10 +212,10 @@ public boolean isOutstandingRpc()
}
}

public RpcContinuation nextOutstandingRpc()
public RpcWrapper nextOutstandingRpc()
{
synchronized (_channelMutex) {
RpcContinuation result = _activeRpc;
RpcWrapper result = _activeRpc;
_activeRpc = null;
_channelMutex.notifyAll();
return result;
Expand Down Expand Up @@ -255,6 +279,14 @@ private AMQCommand privateRpc(Method m)
}
}

private CompletableFuture<Command> privateAsyncRpc(Method m)
throws IOException, ShutdownSignalException
{
CompletableFuture<Command> future = new CompletableFuture<>();
asyncRpc(m, future);
return future;
}

private AMQCommand privateRpc(Method m, int timeout)
throws IOException, ShutdownSignalException, TimeoutException {
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
Expand All @@ -281,6 +313,24 @@ public void quiescingRpc(Method m, RpcContinuation k)
}
}

public void asyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
synchronized (_channelMutex) {
ensureIsOpen();
quiescingAsyncRpc(m, future);
}
}

public void quiescingAsyncRpc(Method m, CompletableFuture<Command> future)
throws IOException
{
synchronized (_channelMutex) {
enqueueAsyncRpc(future);
quiescingTransmit(m);
}
}

/**
* Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method
* returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as
Expand Down Expand Up @@ -321,9 +371,9 @@ public void processShutdownSignal(ShutdownSignalException signal,
}

public void notifyOutstandingRpc(ShutdownSignalException signal) {
RpcContinuation k = nextOutstandingRpc();
RpcWrapper k = nextOutstandingRpc();
if (k != null) {
k.handleShutdownSignal(signal);
k.shutdown(signal);
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -1531,6 +1532,11 @@ public AMQCommand rpc(Method method) throws IOException {
return exnWrappingRpc(method);
}

@Override
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
return exnWrappingAsyncRpc(method);
}

@Override
public void enqueueRpc(RpcContinuation k) {
synchronized (_channelMutex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client.impl;

import com.rabbitmq.client.Command;
import com.rabbitmq.client.ShutdownSignalException;

import java.util.concurrent.CompletableFuture;

/**
*
*/
public class CompletableFutureRpcWrapper implements RpcWrapper {

private final CompletableFuture<Command> completableFuture;

public CompletableFutureRpcWrapper(CompletableFuture<Command> completableFuture) {
this.completableFuture = completableFuture;
}

@Override
public void complete(AMQCommand command) {
completableFuture.complete(command);
}

@Override
public void shutdown(ShutdownSignalException signal) {
completableFuture.completeExceptionally(signal);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client.impl;

import com.rabbitmq.client.ShutdownSignalException;

/**
*
*/
public class RpcContinuationRpcWrapper implements RpcWrapper {

private final AMQChannel.RpcContinuation continuation;

public RpcContinuationRpcWrapper(AMQChannel.RpcContinuation continuation) {
this.continuation = continuation;
}

@Override
public void complete(AMQCommand command) {
continuation.handleCommand(command);
}

@Override
public void shutdown(ShutdownSignalException signal) {
continuation.handleShutdownSignal(signal);
}
}
29 changes: 29 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/RpcWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2017-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client.impl;

import com.rabbitmq.client.ShutdownSignalException;

/**
*
*/
public interface RpcWrapper {

void complete(AMQCommand command);

void shutdown(ShutdownSignalException signal);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;

Expand Down Expand Up @@ -895,6 +896,11 @@ void updateConsumerTag(String tag, String newTag) {
}
}

@Override
public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
return this.delegate.asyncCompletableRpc(method);
}

@Override
public String toString() {
return this.delegate.toString();
Expand Down
Loading