Skip to content

Commit b9d07e8

Browse files
committed
Add lambda-based methods to consume messages
Only basic.deliver and basic.cancel callbacks are covered. They're likely to be the most commonly used. This can provide a straightforward syntax when callbacks are simple (e.g. one line). The Delivery wrapper class from RpcServer has been extracted as a top level class: it's used in the deliver callback to avoid having a lambda with 4 parameters. Fixes #247
1 parent e55a1f9 commit b9d07e8

File tree

11 files changed

+396
-43
lines changed

11 files changed

+396
-43
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
* Callback interface to be notified of the cancellation of a consumer.
22+
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
23+
* if you don't need to implement all the application callbacks.
24+
* @since 5.0
25+
* @see DeliverCallback
26+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
27+
* @since 5.0
28+
*/
29+
@FunctionalInterface
30+
public interface CancelCallback {
31+
32+
/**
33+
* Called when the consumer is cancelled for reasons <i>other than</i> by a call to
34+
* {@link Channel#basicCancel}. For example, the queue has been deleted.
35+
* See {@link Consumer#handleCancelOk} for notification of consumer
36+
* cancellation due to {@link Channel#basicCancel}.
37+
* @param consumerTag the <i>consumer tag</i> associated with the consumer
38+
* @throws IOException
39+
*/
40+
void handle(String consumerTag) throws IOException;
41+
42+
}

src/main/java/com/rabbitmq/client/Channel.java

+115
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,26 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
796796
*/
797797
String basicConsume(String queue, Consumer callback) throws IOException;
798798

799+
/**
800+
* Start a non-nolocal, non-exclusive consumer, with
801+
* explicit acknowledgement and a server-generated consumerTag.
802+
* Provide access only to <code>basic.deliver</code> and
803+
* <code>basic.cancel</code> AMQP methods (which is sufficient
804+
* for most cases). See methods with a {@link Consumer} argument
805+
* to have access to all the application callbacks.
806+
* @param queue the name of the queue
807+
* @param deliverCallback callback when a message is delivered
808+
* @param cancelCallback callback when the consumer is cancelled
809+
* @return the consumerTag generated by the server
810+
* @throws IOException if an error is encountered
811+
* @see com.rabbitmq.client.AMQP.Basic.Consume
812+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
813+
* @see #basicAck
814+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
815+
* @since 5.0
816+
*/
817+
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
818+
799819
/**
800820
* Start a non-nolocal, non-exclusive consumer, with
801821
* a server-generated consumerTag.
@@ -812,6 +832,29 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
812832
*/
813833
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
814834

835+
/**
836+
* Start a non-nolocal, non-exclusive consumer, with
837+
* a server-generated consumerTag.
838+
* Provide access only to <code>basic.deliver</code> and
839+
* <code>basic.cancel</code> AMQP methods (which is sufficient
840+
* for most cases). See methods with a {@link Consumer} argument
841+
* to have access to all the application callbacks.
842+
* @param queue the name of the queue
843+
* @param autoAck true if the server should consider messages
844+
* acknowledged once delivered; false if the server should expect
845+
* explicit acknowledgements
846+
* @param deliverCallback callback when a message is delivered
847+
* @param cancelCallback callback when the consumer is cancelled
848+
* @return the consumerTag generated by the server
849+
* @throws IOException if an error is encountered
850+
* @see com.rabbitmq.client.AMQP.Basic.Consume
851+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
852+
* @see #basicAck
853+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
854+
* @since 5.0
855+
*/
856+
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
857+
815858
/**
816859
* Start a non-nolocal, non-exclusive consumer, with
817860
* a server-generated consumerTag and specified arguments.
@@ -829,6 +872,30 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
829872
*/
830873
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
831874

875+
/**
876+
* Start a non-nolocal, non-exclusive consumer, with
877+
* a server-generated consumerTag and specified arguments.
878+
* Provide access only to <code>basic.deliver</code> and
879+
* <code>basic.cancel</code> AMQP methods (which is sufficient
880+
* for most cases). See methods with a {@link Consumer} argument
881+
* to have access to all the application callbacks.
882+
* @param queue the name of the queue
883+
* @param autoAck true if the server should consider messages
884+
* acknowledged once delivered; false if the server should expect
885+
* explicit acknowledgements
886+
* @param arguments a set of arguments for the consume
887+
* @param deliverCallback callback when a message is delivered
888+
* @param cancelCallback callback when the consumer is cancelled
889+
* @return the consumerTag generated by the server
890+
* @throws IOException if an error is encountered
891+
* @see com.rabbitmq.client.AMQP.Basic.Consume
892+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
893+
* @see #basicAck
894+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
895+
* @since 5.0
896+
*/
897+
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
898+
832899
/**
833900
* Start a non-nolocal, non-exclusive consumer.
834901
* @param queue the name of the queue
@@ -845,6 +912,28 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
845912
*/
846913
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
847914

915+
/**
916+
* Start a non-nolocal, non-exclusive consumer.
917+
* Provide access only to <code>basic.deliver</code> and
918+
* <code>basic.cancel</code> AMQP methods (which is sufficient
919+
* for most cases). See methods with a {@link Consumer} argument
920+
* to have access to all the application callbacks.
921+
* @param queue the name of the queue
922+
* @param autoAck true if the server should consider messages
923+
* acknowledged once delivered; false if the server should expect
924+
* explicit acknowledgements
925+
* @param consumerTag a client-generated consumer tag to establish context
926+
* @param deliverCallback callback when a message is delivered
927+
* @param cancelCallback callback when the consumer is cancelled
928+
* @return the consumerTag associated with the new consumer
929+
* @throws java.io.IOException if an error is encountered
930+
* @see com.rabbitmq.client.AMQP.Basic.Consume
931+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
932+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
933+
* @since 5.0
934+
*/
935+
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
936+
848937
/**
849938
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
850939
* method.
@@ -865,6 +954,32 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
865954
*/
866955
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
867956

957+
/**
958+
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
959+
* method.
960+
* Provide access only to <code>basic.deliver</code> and
961+
* <code>basic.cancel</code> AMQP methods (which is sufficient
962+
* for most cases). See methods with a {@link Consumer} argument
963+
* to have access to all the application callbacks.
964+
* @param queue the name of the queue
965+
* @param autoAck true if the server should consider messages
966+
* acknowledged once delivered; false if the server should expect
967+
* explicit acknowledgements
968+
* @param consumerTag a client-generated consumer tag to establish context
969+
* @param noLocal true if the server should not deliver to this consumer
970+
* messages published on this channel's connection
971+
* @param exclusive true if this is an exclusive consumer
972+
* @param arguments a set of arguments for the consume
973+
* @param deliverCallback callback when a message is delivered
974+
* @param cancelCallback callback when the consumer is cancelled
975+
* @return the consumerTag associated with the new consumer
976+
* @throws java.io.IOException if an error is encountered
977+
* @see com.rabbitmq.client.AMQP.Basic.Consume
978+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
979+
* @since 5.0
980+
*/
981+
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
982+
868983
/**
869984
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
870985
* method.

src/main/java/com/rabbitmq/client/Consumer.java

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
* because this will delay dispatch of messages to other {@link Consumer}s on the same
3434
* {@link Channel}.
3535
*
36+
* For a lambda-oriented syntax, use {@link DeliverCallback} and
37+
* {@link CancelCallback}.
38+
*
3639
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
3740
* @see Channel#basicCancel
3841
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
2+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
3+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
4+
// please see LICENSE-APACHE2.
5+
//
6+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
7+
// either express or implied. See the LICENSE file for specific language governing
8+
// rights and limitations of this software.
9+
//
10+
// If you have any questions regarding licensing, please contact us at
11+
12+
13+
package com.rabbitmq.client;
14+
15+
import java.io.IOException;
16+
import java.util.Map;
17+
18+
/**
19+
* Callback interface to be notified when a message is delivered.
20+
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
21+
* if you don't need to implement all the application callbacks.
22+
* @see CancelCallback
23+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
24+
* @since 5.0
25+
*/
26+
@FunctionalInterface
27+
public interface DeliverCallback {
28+
29+
/**
30+
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
31+
* @param consumerTag the <i>consumer tag</i> associated with the consumer
32+
* @param message the delivered message
33+
* @throws IOException if the consumer encounters an I/O error while processing the message
34+
*/
35+
void handle(String consumerTag, Delivery message) throws IOException;
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
* Encapsulates an arbitrary message - simple "bean" holder structure.
20+
*/
21+
public class Delivery {
22+
private final Envelope _envelope;
23+
private final AMQP.BasicProperties _properties;
24+
private final byte[] _body;
25+
26+
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
27+
_envelope = envelope;
28+
_properties = properties;
29+
_body = body;
30+
}
31+
32+
/**
33+
* Retrieve the message envelope.
34+
* @return the message envelope
35+
*/
36+
public Envelope getEnvelope() {
37+
return _envelope;
38+
}
39+
40+
/**
41+
* Retrieve the message properties.
42+
* @return the message properties
43+
*/
44+
public AMQP.BasicProperties getProperties() {
45+
return _properties;
46+
}
47+
48+
/**
49+
* Retrieve the message body.
50+
* @return the message body
51+
*/
52+
public byte[] getBody() {
53+
return _body;
54+
}
55+
}

src/main/java/com/rabbitmq/client/RpcServer.java

-39
Original file line numberDiff line numberDiff line change
@@ -235,45 +235,6 @@ public String getQueueName() {
235235
return _queueName;
236236
}
237237

238-
/**
239-
* Encapsulates an arbitrary message - simple "bean" holder structure.
240-
*/
241-
public static class Delivery {
242-
private final Envelope _envelope;
243-
private final AMQP.BasicProperties _properties;
244-
private final byte[] _body;
245-
246-
public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
247-
_envelope = envelope;
248-
_properties = properties;
249-
_body = body;
250-
}
251-
252-
/**
253-
* Retrieve the message envelope.
254-
* @return the message envelope
255-
*/
256-
public Envelope getEnvelope() {
257-
return _envelope;
258-
}
259-
260-
/**
261-
* Retrieve the message properties.
262-
* @return the message properties
263-
*/
264-
public AMQP.BasicProperties getProperties() {
265-
return _properties;
266-
}
267-
268-
/**
269-
* Retrieve the message body.
270-
* @return the message body
271-
*/
272-
public byte[] getBody() {
273-
return _body;
274-
}
275-
}
276-
277238
public interface RpcConsumer extends Consumer {
278239

279240
Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException;

0 commit comments

Comments
 (0)