Skip to content

Add lambda-based methods to consume messages #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/main/java/com/rabbitmq/client/CancelCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import java.io.IOException;

/**
* Callback interface to be notified of the cancellation of a consumer.
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
* if you don't need to implement all the application callbacks.
* @since 5.0
* @see DeliverCallback
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
* @since 5.0
*/
@FunctionalInterface
public interface CancelCallback {

/**
* Called when the consumer is cancelled for reasons <i>other than</i> by a call to
* {@link Channel#basicCancel}. For example, the queue has been deleted.
* See {@link Consumer#handleCancelOk} for notification of consumer
* cancellation due to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @throws IOException
*/
void handle(String consumerTag) throws IOException;

}
115 changes: 115 additions & 0 deletions src/main/java/com/rabbitmq/client/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,26 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
*/
String basicConsume(String queue, Consumer callback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer, with
* explicit acknowledgement and a server-generated consumerTag.
* Provide access only to <code>basic.deliver</code> and
* <code>basic.cancel</code> AMQP methods (which is sufficient
* for most cases). See methods with a {@link Consumer} argument
* to have access to all the application callbacks.
* @param queue the name of the queue
* @param deliverCallback callback when a message is delivered
* @param cancelCallback callback when the consumer is cancelled
* @return the consumerTag generated by the server
* @throws IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicAck
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
* @since 5.0
*/
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
Expand All @@ -812,6 +832,29 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* Provide access only to <code>basic.deliver</code> and
* <code>basic.cancel</code> AMQP methods (which is sufficient
* for most cases). See methods with a {@link Consumer} argument
* to have access to all the application callbacks.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param deliverCallback callback when a message is delivered
* @param cancelCallback callback when the consumer is cancelled
* @return the consumerTag generated by the server
* @throws IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicAck
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
* @since 5.0
*/
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag and specified arguments.
Expand All @@ -829,6 +872,30 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
*/
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag and specified arguments.
* Provide access only to <code>basic.deliver</code> and
* <code>basic.cancel</code> AMQP methods (which is sufficient
* for most cases). See methods with a {@link Consumer} argument
* to have access to all the application callbacks.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param arguments a set of arguments for the consume
* @param deliverCallback callback when a message is delivered
* @param cancelCallback callback when the consumer is cancelled
* @return the consumerTag generated by the server
* @throws IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicAck
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
* @since 5.0
*/
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer.
* @param queue the name of the queue
Expand All @@ -845,6 +912,28 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
*/
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

/**
* Start a non-nolocal, non-exclusive consumer.
* Provide access only to <code>basic.deliver</code> and
* <code>basic.cancel</code> AMQP methods (which is sufficient
* for most cases). See methods with a {@link Consumer} argument
* to have access to all the application callbacks.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param consumerTag a client-generated consumer tag to establish context
* @param deliverCallback callback when a message is delivered
* @param cancelCallback callback when the consumer is cancelled
* @return the consumerTag associated with the new consumer
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
* @since 5.0
*/
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

/**
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
* method.
Expand All @@ -865,6 +954,32 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
*/
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

/**
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
* method.
* Provide access only to <code>basic.deliver</code> and
* <code>basic.cancel</code> AMQP methods (which is sufficient
* for most cases). See methods with a {@link Consumer} argument
* to have access to all the application callbacks.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param consumerTag a client-generated consumer tag to establish context
* @param noLocal true if the server should not deliver to this consumer
* messages published on this channel's connection
* @param exclusive true if this is an exclusive consumer
* @param arguments a set of arguments for the consume
* @param deliverCallback callback when a message is delivered
* @param cancelCallback callback when the consumer is cancelled
* @return the consumerTag associated with the new consumer
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @since 5.0
*/
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

/**
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
* method.
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/rabbitmq/client/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
* because this will delay dispatch of messages to other {@link Consumer}s on the same
* {@link Channel}.
*
* For a lambda-oriented syntax, use {@link DeliverCallback} and
* {@link CancelCallback}.
*
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
* @see Channel#basicCancel
*/
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/com/rabbitmq/client/DeliverCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import java.io.IOException;
import java.util.Map;

/**
* Callback interface to be notified when a message is delivered.
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
* if you don't need to implement all the application callbacks.
* @see CancelCallback
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
* @since 5.0
*/
@FunctionalInterface
public interface DeliverCallback {

/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param message the delivered message
* @throws IOException if the consumer encounters an I/O error while processing the message
*/
void handle(String consumerTag, Delivery message) throws IOException;

}
55 changes: 55 additions & 0 deletions src/main/java/com/rabbitmq/client/Delivery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

/**
* Encapsulates an arbitrary message - simple "bean" holder structure.
*/
public class Delivery {
private final Envelope _envelope;
private final AMQP.BasicProperties _properties;
private final byte[] _body;

public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
_envelope = envelope;
_properties = properties;
_body = body;
}

/**
* Retrieve the message envelope.
* @return the message envelope
*/
public Envelope getEnvelope() {
return _envelope;
}

/**
* Retrieve the message properties.
* @return the message properties
*/
public AMQP.BasicProperties getProperties() {
return _properties;
}

/**
* Retrieve the message body.
* @return the message body
*/
public byte[] getBody() {
return _body;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can equals and hashCode be defined in here to support mocking? The same question could be said for https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/Return.java which was recently added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea but we can do it in a follow-up PR (feel free to look into a pull request ;))

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I'll try and put one together.

}
39 changes: 0 additions & 39 deletions src/main/java/com/rabbitmq/client/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,45 +235,6 @@ public String getQueueName() {
return _queueName;
}

/**
* Encapsulates an arbitrary message - simple "bean" holder structure.
*/
public static class Delivery {
private final Envelope _envelope;
private final AMQP.BasicProperties _properties;
private final byte[] _body;

public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
_envelope = envelope;
_properties = properties;
_body = body;
}

/**
* Retrieve the message envelope.
* @return the message envelope
*/
public Envelope getEnvelope() {
return _envelope;
}

/**
* Retrieve the message properties.
* @return the message properties
*/
public AMQP.BasicProperties getProperties() {
return _properties;
}

/**
* Retrieve the message body.
* @return the message body
*/
public byte[] getBody() {
return _body;
}
}

public interface RpcConsumer extends Consumer {

Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException;
Expand Down
Loading