From b9d07e89e5e86f598ce296d02cc42f8ac039908e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 28 Feb 2017 14:49:32 +0100 Subject: [PATCH 1/3] Add lambda-based methods to consume messages Only basic.deliver and basic.cancel callbacks are covered. They're likely to be the most commonly used. This can provide a straightforward syntax when callbacks are simple (e.g. one line). The Delivery wrapper class from RpcServer has been extracted as a top level class: it's used in the deliver callback to avoid having a lambda with 4 parameters. Fixes #247 --- .../com/rabbitmq/client/CancelCallback.java | 42 +++++++ .../java/com/rabbitmq/client/Channel.java | 115 ++++++++++++++++++ .../java/com/rabbitmq/client/Consumer.java | 3 + .../com/rabbitmq/client/DeliverCallback.java | 37 ++++++ .../java/com/rabbitmq/client/Delivery.java | 55 +++++++++ .../java/com/rabbitmq/client/RpcServer.java | 39 ------ .../com/rabbitmq/client/impl/ChannelN.java | 60 +++++++++ .../impl/recovery/AutorecoveringChannel.java | 55 +++++++++ .../com/rabbitmq/client/test/ClientTests.java | 2 +- ...lbackTest.java => LambdaCallbackTest.java} | 29 ++++- .../com/rabbitmq/client/test/RpcTest.java | 2 - 11 files changed, 396 insertions(+), 43 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/CancelCallback.java create mode 100644 src/main/java/com/rabbitmq/client/DeliverCallback.java create mode 100644 src/main/java/com/rabbitmq/client/Delivery.java rename src/test/java/com/rabbitmq/client/test/{LambdaListenerCallbackTest.java => LambdaCallbackTest.java} (72%) diff --git a/src/main/java/com/rabbitmq/client/CancelCallback.java b/src/main/java/com/rabbitmq/client/CancelCallback.java new file mode 100644 index 0000000000..22c5d1f303 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/CancelCallback.java @@ -0,0 +1,42 @@ +// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +import java.io.IOException; + +/** + * Callback interface to be notified of the cancellation of a consumer. + * Prefer it over {@link Consumer} for a lambda-oriented syntax, + * if you don't need to implement all the application callbacks. + * @since 5.0 + * @see DeliverCallback + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) + * @since 5.0 + */ +@FunctionalInterface +public interface CancelCallback { + + /** + * Called when the consumer is cancelled for reasons other than by a call to + * {@link Channel#basicCancel}. For example, the queue has been deleted. + * See {@link Consumer#handleCancelOk} for notification of consumer + * cancellation due to {@link Channel#basicCancel}. + * @param consumerTag the consumer tag associated with the consumer + * @throws IOException + */ + void handle(String consumerTag) throws IOException; + +} diff --git a/src/main/java/com/rabbitmq/client/Channel.java b/src/main/java/com/rabbitmq/client/Channel.java index 7218338d60..567599670c 100644 --- a/src/main/java/com/rabbitmq/client/Channel.java +++ b/src/main/java/com/rabbitmq/client/Channel.java @@ -796,6 +796,26 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, Consumer callback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * explicit acknowledgement and a server-generated consumerTag. + * Provide access only to basic.deliver and + * basic.cancel AMQP methods (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. @@ -812,6 +832,29 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * a server-generated consumerTag. + * Provide access only to basic.deliver and + * basic.cancel AMQP methods (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag and specified arguments. @@ -829,6 +872,30 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, Map arguments, Consumer callback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * a server-generated consumerTag and specified arguments. + * Provide access only to basic.deliver and + * basic.cancel AMQP methods (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param arguments a set of arguments for the consume + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer. * @param queue the name of the queue @@ -845,6 +912,28 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer. + * Provide access only to basic.deliver and + * basic.cancel AMQP methods (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk} * method. @@ -865,6 +954,32 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, Consumer callback) throws IOException; + /** + * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk} + * method. + * Provide access only to basic.deliver and + * basic.cancel AMQP methods (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param noLocal true if the server should not deliver to this consumer + * messages published on this channel's connection + * @param exclusive true if this is an exclusive consumer + * @param arguments a set of arguments for the consume + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** * Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk} * method. diff --git a/src/main/java/com/rabbitmq/client/Consumer.java b/src/main/java/com/rabbitmq/client/Consumer.java index f1844cc73f..da5923e7c9 100644 --- a/src/main/java/com/rabbitmq/client/Consumer.java +++ b/src/main/java/com/rabbitmq/client/Consumer.java @@ -33,6 +33,9 @@ * because this will delay dispatch of messages to other {@link Consumer}s on the same * {@link Channel}. * + * For a lambda-oriented syntax, use {@link DeliverCallback} and + * {@link CancelCallback}. + * * @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer) * @see Channel#basicCancel */ diff --git a/src/main/java/com/rabbitmq/client/DeliverCallback.java b/src/main/java/com/rabbitmq/client/DeliverCallback.java new file mode 100644 index 0000000000..2b8cca0d18 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/DeliverCallback.java @@ -0,0 +1,37 @@ +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +import java.io.IOException; +import java.util.Map; + +/** + * Callback interface to be notified when a message is delivered. + * Prefer it over {@link Consumer} for a lambda-oriented syntax, + * if you don't need to implement all the application callbacks. + * @see CancelCallback + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) + * @since 5.0 + */ +@FunctionalInterface +public interface DeliverCallback { + + /** + * Called when a basic.deliver is received for this consumer. + * @param consumerTag the consumer tag associated with the consumer + * @param message the delivered message + * @throws IOException if the consumer encounters an I/O error while processing the message + */ + void handle(String consumerTag, Delivery message) throws IOException; + +} diff --git a/src/main/java/com/rabbitmq/client/Delivery.java b/src/main/java/com/rabbitmq/client/Delivery.java new file mode 100644 index 0000000000..eca6971be4 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/Delivery.java @@ -0,0 +1,55 @@ +// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +/** + * Encapsulates an arbitrary message - simple "bean" holder structure. + */ +public class Delivery { + private final Envelope _envelope; + private final AMQP.BasicProperties _properties; + private final byte[] _body; + + public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + _envelope = envelope; + _properties = properties; + _body = body; + } + + /** + * Retrieve the message envelope. + * @return the message envelope + */ + public Envelope getEnvelope() { + return _envelope; + } + + /** + * Retrieve the message properties. + * @return the message properties + */ + public AMQP.BasicProperties getProperties() { + return _properties; + } + + /** + * Retrieve the message body. + * @return the message body + */ + public byte[] getBody() { + return _body; + } +} diff --git a/src/main/java/com/rabbitmq/client/RpcServer.java b/src/main/java/com/rabbitmq/client/RpcServer.java index cadf6ca767..eec9f490b4 100644 --- a/src/main/java/com/rabbitmq/client/RpcServer.java +++ b/src/main/java/com/rabbitmq/client/RpcServer.java @@ -235,45 +235,6 @@ public String getQueueName() { return _queueName; } - /** - * Encapsulates an arbitrary message - simple "bean" holder structure. - */ - public static class Delivery { - private final Envelope _envelope; - private final AMQP.BasicProperties _properties; - private final byte[] _body; - - public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { - _envelope = envelope; - _properties = properties; - _body = body; - } - - /** - * Retrieve the message envelope. - * @return the message envelope - */ - public Envelope getEnvelope() { - return _envelope; - } - - /** - * Retrieve the message properties. - * @return the message properties - */ - public AMQP.BasicProperties getProperties() { - return _properties; - } - - /** - * Retrieve the message body. - * @return the message body - */ - public byte[] getBody() { - return _body; - } - } - public interface RpcConsumer extends Consumer { Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException; diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 5d1b25fe24..1c0240570c 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -1189,6 +1189,12 @@ public String basicConsume(String queue, Consumer callback) return basicConsume(queue, false, callback); } + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + return basicConsume(queue, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, Consumer callback) @@ -1197,6 +1203,12 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback) return basicConsume(queue, autoAck, "", callback); } + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + return basicConsume(queue, autoAck, "", consumerFromCallbacks(deliverCallback, cancelCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, Map arguments, @@ -1206,6 +1218,13 @@ public String basicConsume(String queue, boolean autoAck, Map ar return basicConsume(queue, autoAck, "", false, false, arguments, callback); } + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) + throws IOException { + return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, @@ -1215,6 +1234,20 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, return basicConsume(queue, autoAck, consumerTag, false, false, null, callback); } + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) + throws IOException { + return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, + DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, final boolean autoAck, String consumerTag, @@ -1253,6 +1286,33 @@ public String transformReply(AMQCommand replyCommand) { } } + private Consumer consumerFromCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) { + return new Consumer() { + + @Override + public void handleConsumeOk(String consumerTag) { } + + @Override + public void handleCancelOk(String consumerTag) { } + + @Override + public void handleCancel(String consumerTag) throws IOException { + cancelCallback.handle(consumerTag); + } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } + + @Override + public void handleRecoverOk(String consumerTag) { } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { + deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body)); + } + }; + } + /** Public API - {@inheritDoc} */ @Override public void basicCancel(final String consumerTag) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 31bf0c7074..2a9ae119a5 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -450,21 +450,43 @@ public String basicConsume(String queue, Consumer callback) throws IOException { return basicConsume(queue, false, callback); } + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + return basicConsume(queue, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + @Override public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException { return basicConsume(queue, autoAck, "", callback); } + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + return basicConsume(queue, autoAck, "", consumerFromCallbacks(deliverCallback, cancelCallback)); + } + @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException { return basicConsume(queue, autoAck, consumerTag, false, false, null, callback); } + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) + throws IOException { + return basicConsume(queue, autoAck, consumerTag, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + @Override public String basicConsume(String queue, boolean autoAck, Map arguments, Consumer callback) throws IOException { return basicConsume(queue, autoAck, "", false, false, arguments, callback); } + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) + throws IOException { + return basicConsume(queue, autoAck, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, Consumer callback) throws IOException { final String result = delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback); @@ -472,6 +494,39 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo return result; } + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, + DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + } + + private Consumer consumerFromCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) { + return new Consumer() { + + @Override + public void handleConsumeOk(String consumerTag) { } + + @Override + public void handleCancelOk(String consumerTag) { } + + @Override + public void handleCancel(String consumerTag) throws IOException { + cancelCallback.handle(consumerTag); + } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } + + @Override + public void handleRecoverOk(String consumerTag) { } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body)); + } + }; + } + @Override public void basicCancel(String consumerTag) throws IOException { RecordedConsumer c = this.deleteRecordedConsumer(consumerTag); diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index 9efd2fe08c..6908650ef5 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -47,7 +47,7 @@ DnsSrvRecordAddressResolverTest.class, JavaNioTest.class, RpcTest.class, - LambdaListenerCallbackTest.class + LambdaCallbackTest.class }) public class ClientTests { diff --git a/src/test/java/com/rabbitmq/client/test/LambdaListenerCallbackTest.java b/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java similarity index 72% rename from src/test/java/com/rabbitmq/client/test/LambdaListenerCallbackTest.java rename to src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java index 0f63c3e0bc..be86d8ce61 100644 --- a/src/test/java/com/rabbitmq/client/test/LambdaListenerCallbackTest.java +++ b/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java @@ -20,14 +20,25 @@ import org.junit.Test; import java.io.IOException; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertTrue; -public class LambdaListenerCallbackTest extends BrokerTestCase { +public class LambdaCallbackTest extends BrokerTestCase { + String queue; + + @Override + protected void createResources() throws IOException, TimeoutException { + queue = channel.queueDeclare(UUID.randomUUID().toString(), true, false, false, null).getQueue(); + } + + @Override protected void releaseResources() throws IOException { + channel.queueDelete(queue); try { unblock(); } catch (InterruptedException e) { @@ -83,4 +94,20 @@ protected void releaseResources() throws IOException { } } + @Test public void basicConsume() throws Exception { + final CountDownLatch cancelLatch = new CountDownLatch(1); + try(Connection connection = TestUtils.connectionFactory().newConnection()) { + final CountDownLatch consumingLatch = new CountDownLatch(1); + Channel consumingChannel = connection.createChannel(); + consumingChannel.basicConsume(queue, true, + (consumerTag, delivery) -> consumingLatch.countDown(), + consumerTag -> cancelLatch.countDown() + ); + this.channel.basicPublish("", queue, null, "dummy".getBytes()); + assertTrue("deliver callback should have been called", consumingLatch.await(1, TimeUnit.SECONDS)); + this.channel.queueDelete(queue); + assertTrue("cancel callback should have been called", cancelLatch.await(1, TimeUnit.SECONDS)); + } + } + } diff --git a/src/test/java/com/rabbitmq/client/test/RpcTest.java b/src/test/java/com/rabbitmq/client/test/RpcTest.java index bddee58e8e..160dd10ee3 100644 --- a/src/test/java/com/rabbitmq/client/test/RpcTest.java +++ b/src/test/java/com/rabbitmq/client/test/RpcTest.java @@ -18,12 +18,10 @@ import com.rabbitmq.client.*; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; From 9753b88634d1969682831edeb3fa68107d4af503 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 28 Feb 2017 17:07:30 +0100 Subject: [PATCH 2/3] Add deliver/shutdown lambda-based consume methods References #247 --- .../com/rabbitmq/client/CancelCallback.java | 4 +- .../java/com/rabbitmq/client/Channel.java | 115 ++++++++++++++++++ .../java/com/rabbitmq/client/Consumer.java | 4 +- .../ConsumerShutdownSignalCallback.java | 41 +++++++ .../com/rabbitmq/client/DeliverCallback.java | 5 + .../com/rabbitmq/client/impl/ChannelN.java | 72 ++++++++++- .../impl/recovery/AutorecoveringChannel.java | 67 +++++++++- .../client/test/LambdaCallbackTest.java | 19 ++- 8 files changed, 310 insertions(+), 17 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java diff --git a/src/main/java/com/rabbitmq/client/CancelCallback.java b/src/main/java/com/rabbitmq/client/CancelCallback.java index 22c5d1f303..46e642cee8 100644 --- a/src/main/java/com/rabbitmq/client/CancelCallback.java +++ b/src/main/java/com/rabbitmq/client/CancelCallback.java @@ -16,14 +16,16 @@ package com.rabbitmq.client; import java.io.IOException; +import java.util.Map; /** * Callback interface to be notified of the cancellation of a consumer. * Prefer it over {@link Consumer} for a lambda-oriented syntax, * if you don't need to implement all the application callbacks. - * @since 5.0 * @see DeliverCallback + * @see ConsumerShutdownSignalCallback * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback) * @since 5.0 */ @FunctionalInterface diff --git a/src/main/java/com/rabbitmq/client/Channel.java b/src/main/java/com/rabbitmq/client/Channel.java index 567599670c..d894eb66e2 100644 --- a/src/main/java/com/rabbitmq/client/Channel.java +++ b/src/main/java/com/rabbitmq/client/Channel.java @@ -816,6 +816,26 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * explicit acknowledgement and a server-generated consumerTag. + * Provide access only to basic.deliver and + * shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param deliverCallback callback when a message is delivered + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. @@ -855,6 +875,29 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * a server-generated consumerTag. + * Provide access only to basic.deliver and + * shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param deliverCallback callback when a message is delivered + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag and specified arguments. @@ -896,6 +939,30 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * a server-generated consumerTag and specified arguments. + * Provide access only to basic.deliver and + * shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param arguments a set of arguments for the consume + * @param deliverCallback callback when a message is delivered + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer. * @param queue the name of the queue @@ -934,6 +1001,28 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer. + * Provide access only to basic.deliver and + * shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param deliverCallback callback when a message is delivered + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk} * method. @@ -980,6 +1069,32 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; + /** + * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk} + * method. + * Provide access only to basic.deliver and + * shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param noLocal true if the server should not deliver to this consumer + * messages published on this channel's connection + * @param exclusive true if this is an exclusive consumer + * @param arguments a set of arguments for the consume + * @param deliverCallback callback when a message is delivered + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk} * method. diff --git a/src/main/java/com/rabbitmq/client/Consumer.java b/src/main/java/com/rabbitmq/client/Consumer.java index da5923e7c9..61e799ae85 100644 --- a/src/main/java/com/rabbitmq/client/Consumer.java +++ b/src/main/java/com/rabbitmq/client/Consumer.java @@ -33,8 +33,8 @@ * because this will delay dispatch of messages to other {@link Consumer}s on the same * {@link Channel}. * - * For a lambda-oriented syntax, use {@link DeliverCallback} and - * {@link CancelCallback}. + * For a lambda-oriented syntax, use {@link DeliverCallback}, + * {@link CancelCallback}, and {@link ConsumerShutdownSignalCallback}. * * @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer) * @see Channel#basicCancel diff --git a/src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java b/src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java new file mode 100644 index 0000000000..5bbf00f926 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java @@ -0,0 +1,41 @@ +// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client; + +import java.util.Map; + +/** + * Callback interface to be notified when either the consumer channel + * or the underlying connection has been shut down. + * Prefer it over {@link Consumer} for a lambda-oriented syntax, + * if you don't need to implement all the application callbacks. + * @see CancelCallback + * @see DeliverCallback + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback) + * @since 5.0 + */ +@FunctionalInterface +public interface ConsumerShutdownSignalCallback { + + /** + * Called when either the channel or the underlying connection has been shut down. + * @param consumerTag the consumer tag associated with the consumer + * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down + */ + void handleShutdownSignal(String consumerTag, ShutdownSignalException sig); + +} diff --git a/src/main/java/com/rabbitmq/client/DeliverCallback.java b/src/main/java/com/rabbitmq/client/DeliverCallback.java index 2b8cca0d18..6d2f8eb058 100644 --- a/src/main/java/com/rabbitmq/client/DeliverCallback.java +++ b/src/main/java/com/rabbitmq/client/DeliverCallback.java @@ -1,3 +1,6 @@ +// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 // ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see // LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, @@ -20,7 +23,9 @@ * Prefer it over {@link Consumer} for a lambda-oriented syntax, * if you don't need to implement all the application callbacks. * @see CancelCallback + * @see ConsumerShutdownSignalCallback * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback) * @since 5.0 */ @FunctionalInterface diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 1c0240570c..c2f10a7a9a 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -1192,7 +1192,13 @@ public String basicConsume(String queue, Consumer callback) /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } /** Public API - {@inheritDoc} */ @@ -1203,10 +1209,17 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback) return basicConsume(queue, autoAck, "", callback); } + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) + throws IOException { + return basicConsume(queue, autoAck, "", consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, "", consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, "", consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); } /** Public API - {@inheritDoc} */ @@ -1222,7 +1235,14 @@ public String basicConsume(String queue, boolean autoAck, Map ar @Override public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } /** Public API - {@inheritDoc} */ @@ -1238,14 +1258,28 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, + DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } /** Public API - {@inheritDoc} */ @@ -1286,7 +1320,7 @@ public String transformReply(AMQCommand replyCommand) { } } - private Consumer consumerFromCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) { + private Consumer consumerFromDeliverCancelCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) { return new Consumer() { @Override @@ -1313,6 +1347,32 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie }; } + private Consumer consumerFromDeliverShutdownCallbacks(final DeliverCallback deliverCallback, final ConsumerShutdownSignalCallback shutdownSignalCallback) { + return new Consumer() { + @Override + public void handleConsumeOk(String consumerTag) { } + + @Override + public void handleCancelOk(String consumerTag) { } + + @Override + public void handleCancel(String consumerTag) throws IOException { } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + shutdownSignalCallback.handleShutdownSignal(consumerTag, sig); + } + + @Override + public void handleRecoverOk(String consumerTag) { } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { + deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body)); + } + }; + } + /** Public API - {@inheritDoc} */ @Override public void basicCancel(final String consumerTag) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 2a9ae119a5..901644db2c 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -452,7 +452,12 @@ public String basicConsume(String queue, Consumer callback) throws IOException { @Override public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } @Override @@ -462,7 +467,13 @@ public String basicConsume(String queue, boolean autoAck, Consumer callback) thr @Override public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, "", consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, "", consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) + throws IOException { + return basicConsume(queue, autoAck, "", consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } @Override @@ -473,7 +484,13 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, Co @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, consumerTag, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } @Override @@ -484,7 +501,13 @@ public String basicConsume(String queue, boolean autoAck, Map ar @Override public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } @Override @@ -497,10 +520,16 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, + DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } - private Consumer consumerFromCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) { + private Consumer consumerFromDeliverCancelCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) { return new Consumer() { @Override @@ -527,6 +556,32 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp }; } + private Consumer consumerFromDeliverShutdownCallbacks(final DeliverCallback deliverCallback, final ConsumerShutdownSignalCallback shutdownSignalCallback) { + return new Consumer() { + @Override + public void handleConsumeOk(String consumerTag) { } + + @Override + public void handleCancelOk(String consumerTag) { } + + @Override + public void handleCancel(String consumerTag) throws IOException { } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + shutdownSignalCallback.handleShutdownSignal(consumerTag, sig); + } + + @Override + public void handleRecoverOk(String consumerTag) { } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body)); + } + }; + } + @Override public void basicCancel(String consumerTag) throws IOException { RecordedConsumer c = this.deleteRecordedConsumer(consumerTag); diff --git a/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java b/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java index be86d8ce61..6f2f19a667 100644 --- a/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java +++ b/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java @@ -94,10 +94,10 @@ protected void releaseResources() throws IOException { } } - @Test public void basicConsume() throws Exception { - final CountDownLatch cancelLatch = new CountDownLatch(1); + @Test public void basicConsumeDeliverCancel() throws Exception { try(Connection connection = TestUtils.connectionFactory().newConnection()) { final CountDownLatch consumingLatch = new CountDownLatch(1); + final CountDownLatch cancelLatch = new CountDownLatch(1); Channel consumingChannel = connection.createChannel(); consumingChannel.basicConsume(queue, true, (consumerTag, delivery) -> consumingLatch.countDown(), @@ -110,4 +110,19 @@ protected void releaseResources() throws IOException { } } + @Test public void basicConsumeDeliverShutdown() throws Exception { + final CountDownLatch shutdownLatch = new CountDownLatch(1); + try(Connection connection = TestUtils.connectionFactory().newConnection()) { + final CountDownLatch consumingLatch = new CountDownLatch(1); + Channel consumingChannel = connection.createChannel(); + consumingChannel.basicConsume(queue, true, + (consumerTag, delivery) -> consumingLatch.countDown(), + (consumerTag, sig) -> shutdownLatch.countDown() + ); + this.channel.basicPublish("", queue, null, "dummy".getBytes()); + assertTrue("deliver callback should have been called", consumingLatch.await(1, TimeUnit.SECONDS)); + } + assertTrue("shutdown callback should have been called", shutdownLatch.await(1, TimeUnit.SECONDS)); + } + } From cc7eb6c4fc4d1dced6f862b752411081f24303c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 1 Mar 2017 08:51:32 +0100 Subject: [PATCH 3/3] Add deliver/cancel/shutdown lambda consume methods References #247 --- .../com/rabbitmq/client/CancelCallback.java | 1 + .../java/com/rabbitmq/client/Channel.java | 120 ++++++++++++++++++ .../ConsumerShutdownSignalCallback.java | 1 + .../com/rabbitmq/client/DeliverCallback.java | 1 + .../com/rabbitmq/client/impl/ChannelN.java | 63 ++++++++- .../impl/recovery/AutorecoveringChannel.java | 62 ++++++++- .../client/test/LambdaCallbackTest.java | 18 +++ 7 files changed, 263 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/CancelCallback.java b/src/main/java/com/rabbitmq/client/CancelCallback.java index 46e642cee8..c2691f053e 100644 --- a/src/main/java/com/rabbitmq/client/CancelCallback.java +++ b/src/main/java/com/rabbitmq/client/CancelCallback.java @@ -26,6 +26,7 @@ * @see ConsumerShutdownSignalCallback * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback) + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback) * @since 5.0 */ @FunctionalInterface diff --git a/src/main/java/com/rabbitmq/client/Channel.java b/src/main/java/com/rabbitmq/client/Channel.java index d894eb66e2..d5db3f709c 100644 --- a/src/main/java/com/rabbitmq/client/Channel.java +++ b/src/main/java/com/rabbitmq/client/Channel.java @@ -836,6 +836,27 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * explicit acknowledgement and a server-generated consumerTag. + * Provide access to basic.deliver, basic.cancel + * and shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. @@ -898,6 +919,30 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * a server-generated consumerTag. + * Provide access to basic.deliver, basic.cancel + * and shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag and specified arguments. @@ -963,6 +1008,31 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer, with + * a server-generated consumerTag and specified arguments. + * Provide access to basic.deliver, basic.cancel + * and shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param arguments a set of arguments for the consume + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag generated by the server + * @throws IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a non-nolocal, non-exclusive consumer. * @param queue the name of the queue @@ -1023,6 +1093,29 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** + * Start a non-nolocal, non-exclusive consumer. + * Provide access to basic.deliver, basic.cancel + * and shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk} * method. @@ -1095,6 +1188,33 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue) */ String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** + * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk} + * method. + * Provide access to basic.deliver, basic.cancel + * and shutdown signal callbacks (which is sufficient + * for most cases). See methods with a {@link Consumer} argument + * to have access to all the application callbacks. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param noLocal true if the server should not deliver to this consumer + * messages published on this channel's connection + * @param exclusive true if this is an exclusive consumer + * @param arguments a set of arguments for the consume + * @param deliverCallback callback when a message is delivered + * @param cancelCallback callback when the consumer is cancelled + * @param shutdownSignalCallback callback when the channel/connection is shut down + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @since 5.0 + */ + String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; + /** * Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk} * method. diff --git a/src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java b/src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java index 5bbf00f926..27e0f8ad39 100644 --- a/src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java +++ b/src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java @@ -26,6 +26,7 @@ * @see DeliverCallback * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback) + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback) * @since 5.0 */ @FunctionalInterface diff --git a/src/main/java/com/rabbitmq/client/DeliverCallback.java b/src/main/java/com/rabbitmq/client/DeliverCallback.java index 6d2f8eb058..ad44b7cf13 100644 --- a/src/main/java/com/rabbitmq/client/DeliverCallback.java +++ b/src/main/java/com/rabbitmq/client/DeliverCallback.java @@ -26,6 +26,7 @@ * @see ConsumerShutdownSignalCallback * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback) * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback) + * @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback) * @since 5.0 */ @FunctionalInterface diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index c2f10a7a9a..e51816747b 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -1201,6 +1201,13 @@ public String basicConsume(String queue, DeliverCallback deliverCallback, Consum return basicConsume(queue, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, Consumer callback) @@ -1222,6 +1229,12 @@ public String basicConsume(String queue, boolean autoAck, DeliverCallback delive return basicConsume(queue, autoAck, "", consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); } + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, "", consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, Map arguments, @@ -1245,6 +1258,13 @@ public String basicConsume(String queue, boolean autoAck, Map ar return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, @@ -1265,7 +1285,14 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, De @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { - return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); + return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); + } + + /** Public API - {@inheritDoc} */ + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag , false, false, null, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); } /** Public API - {@inheritDoc} */ @@ -1282,6 +1309,12 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, + DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + /** Public API - {@inheritDoc} */ @Override public String basicConsume(String queue, final boolean autoAck, String consumerTag, @@ -1373,6 +1406,34 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie }; } + private Consumer consumerFromDeliverCancelShutdownCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback, final ConsumerShutdownSignalCallback shutdownSignalCallback) { + return new Consumer() { + @Override + public void handleConsumeOk(String consumerTag) { } + + @Override + public void handleCancelOk(String consumerTag) { } + + @Override + public void handleCancel(String consumerTag) throws IOException { + cancelCallback.handle(consumerTag); + } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + shutdownSignalCallback.handleShutdownSignal(consumerTag, sig); + } + + @Override + public void handleRecoverOk(String consumerTag) { } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { + deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body)); + } + }; + } + /** Public API - {@inheritDoc} */ @Override public void basicCancel(final String consumerTag) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 901644db2c..bbcacaae8c 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -460,6 +460,12 @@ public String basicConsume(String queue, DeliverCallback deliverCallback, Consum return basicConsume(queue, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } + @Override + public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, false, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + @Override public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException { return basicConsume(queue, autoAck, "", callback); @@ -476,6 +482,12 @@ public String basicConsume(String queue, boolean autoAck, DeliverCallback delive return basicConsume(queue, autoAck, "", consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } + @Override + public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, "", consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException { return basicConsume(queue, autoAck, consumerTag, false, false, null, callback); @@ -484,13 +496,19 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, Co @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException { - return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); + return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback)); } @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { - return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); + return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); + } + + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); } @Override @@ -510,6 +528,12 @@ public String basicConsume(String queue, boolean autoAck, Map ar return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } + @Override + public String basicConsume(String queue, boolean autoAck, Map arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, + ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + @Override public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, Consumer callback) throws IOException { final String result = delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback); @@ -529,6 +553,12 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback)); } + @Override + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, + DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException { + return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback)); + } + private Consumer consumerFromDeliverCancelCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback) { return new Consumer() { @@ -582,6 +612,34 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp }; } + private Consumer consumerFromDeliverCancelShutdownCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback, final ConsumerShutdownSignalCallback shutdownSignalCallback) { + return new Consumer() { + @Override + public void handleConsumeOk(String consumerTag) { } + + @Override + public void handleCancelOk(String consumerTag) { } + + @Override + public void handleCancel(String consumerTag) throws IOException { + cancelCallback.handle(consumerTag); + } + + @Override + public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { + shutdownSignalCallback.handleShutdownSignal(consumerTag, sig); + } + + @Override + public void handleRecoverOk(String consumerTag) { } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body)); + } + }; + } + @Override public void basicCancel(String consumerTag) throws IOException { RecordedConsumer c = this.deleteRecordedConsumer(consumerTag); diff --git a/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java b/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java index 6f2f19a667..571824a73a 100644 --- a/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java +++ b/src/test/java/com/rabbitmq/client/test/LambdaCallbackTest.java @@ -125,4 +125,22 @@ protected void releaseResources() throws IOException { assertTrue("shutdown callback should have been called", shutdownLatch.await(1, TimeUnit.SECONDS)); } + @Test public void basicConsumeCancelDeliverShutdown() throws Exception { + final CountDownLatch shutdownLatch = new CountDownLatch(1); + try(Connection connection = TestUtils.connectionFactory().newConnection()) { + final CountDownLatch consumingLatch = new CountDownLatch(1); + Channel consumingChannel = connection.createChannel(); + // not both cancel and shutdown callback can be called on the same consumer + // testing just shutdown + consumingChannel.basicConsume(queue, true, + (consumerTag, delivery) -> consumingLatch.countDown(), + (consumerTag) -> { }, + (consumerTag, sig) -> shutdownLatch.countDown() + ); + this.channel.basicPublish("", queue, null, "dummy".getBytes()); + assertTrue("deliver callback should have been called", consumingLatch.await(1, TimeUnit.SECONDS)); + } + assertTrue("shutdown callback should have been called", shutdownLatch.await(1, TimeUnit.SECONDS)); + } + }