Skip to content

Commit cc7eb6c

Browse files
committed
Add deliver/cancel/shutdown lambda consume methods
References #247
1 parent 9753b88 commit cc7eb6c

File tree

7 files changed

+263
-3
lines changed

7 files changed

+263
-3
lines changed

src/main/java/com/rabbitmq/client/CancelCallback.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* @see ConsumerShutdownSignalCallback
2727
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
2828
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
29+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback)
2930
* @since 5.0
3031
*/
3132
@FunctionalInterface

src/main/java/com/rabbitmq/client/Channel.java

+120
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,27 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
836836
*/
837837
String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
838838

839+
/**
840+
* Start a non-nolocal, non-exclusive consumer, with
841+
* explicit acknowledgement and a server-generated consumerTag.
842+
* Provide access to <code>basic.deliver</code>, <code>basic.cancel</code>
843+
* and shutdown signal callbacks (which is sufficient
844+
* for most cases). See methods with a {@link Consumer} argument
845+
* to have access to all the application callbacks.
846+
* @param queue the name of the queue
847+
* @param deliverCallback callback when a message is delivered
848+
* @param cancelCallback callback when the consumer is cancelled
849+
* @param shutdownSignalCallback callback when the channel/connection is shut down
850+
* @return the consumerTag generated by the server
851+
* @throws IOException if an error is encountered
852+
* @see com.rabbitmq.client.AMQP.Basic.Consume
853+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
854+
* @see #basicAck
855+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
856+
* @since 5.0
857+
*/
858+
String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
859+
839860
/**
840861
* Start a non-nolocal, non-exclusive consumer, with
841862
* a server-generated consumerTag.
@@ -898,6 +919,30 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
898919
*/
899920
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
900921

922+
/**
923+
* Start a non-nolocal, non-exclusive consumer, with
924+
* a server-generated consumerTag.
925+
* Provide access to <code>basic.deliver</code>, <code>basic.cancel</code>
926+
* and shutdown signal callbacks (which is sufficient
927+
* for most cases). See methods with a {@link Consumer} argument
928+
* to have access to all the application callbacks.
929+
* @param queue the name of the queue
930+
* @param autoAck true if the server should consider messages
931+
* acknowledged once delivered; false if the server should expect
932+
* explicit acknowledgements
933+
* @param deliverCallback callback when a message is delivered
934+
* @param cancelCallback callback when the consumer is cancelled
935+
* @param shutdownSignalCallback callback when the channel/connection is shut down
936+
* @return the consumerTag generated by the server
937+
* @throws IOException if an error is encountered
938+
* @see com.rabbitmq.client.AMQP.Basic.Consume
939+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
940+
* @see #basicAck
941+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
942+
* @since 5.0
943+
*/
944+
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
945+
901946
/**
902947
* Start a non-nolocal, non-exclusive consumer, with
903948
* a server-generated consumerTag and specified arguments.
@@ -963,6 +1008,31 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
9631008
*/
9641009
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
9651010

1011+
/**
1012+
* Start a non-nolocal, non-exclusive consumer, with
1013+
* a server-generated consumerTag and specified arguments.
1014+
* Provide access to <code>basic.deliver</code>, <code>basic.cancel</code>
1015+
* and shutdown signal callbacks (which is sufficient
1016+
* for most cases). See methods with a {@link Consumer} argument
1017+
* to have access to all the application callbacks.
1018+
* @param queue the name of the queue
1019+
* @param autoAck true if the server should consider messages
1020+
* acknowledged once delivered; false if the server should expect
1021+
* explicit acknowledgements
1022+
* @param arguments a set of arguments for the consume
1023+
* @param deliverCallback callback when a message is delivered
1024+
* @param cancelCallback callback when the consumer is cancelled
1025+
* @param shutdownSignalCallback callback when the channel/connection is shut down
1026+
* @return the consumerTag generated by the server
1027+
* @throws IOException if an error is encountered
1028+
* @see com.rabbitmq.client.AMQP.Basic.Consume
1029+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
1030+
* @see #basicAck
1031+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
1032+
* @since 5.0
1033+
*/
1034+
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
1035+
9661036
/**
9671037
* Start a non-nolocal, non-exclusive consumer.
9681038
* @param queue the name of the queue
@@ -1023,6 +1093,29 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
10231093
*/
10241094
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
10251095

1096+
/**
1097+
* Start a non-nolocal, non-exclusive consumer.
1098+
* Provide access to <code>basic.deliver</code>, <code>basic.cancel</code>
1099+
* and shutdown signal callbacks (which is sufficient
1100+
* for most cases). See methods with a {@link Consumer} argument
1101+
* to have access to all the application callbacks.
1102+
* @param queue the name of the queue
1103+
* @param autoAck true if the server should consider messages
1104+
* acknowledged once delivered; false if the server should expect
1105+
* explicit acknowledgements
1106+
* @param consumerTag a client-generated consumer tag to establish context
1107+
* @param deliverCallback callback when a message is delivered
1108+
* @param cancelCallback callback when the consumer is cancelled
1109+
* @param shutdownSignalCallback callback when the channel/connection is shut down
1110+
* @return the consumerTag associated with the new consumer
1111+
* @throws java.io.IOException if an error is encountered
1112+
* @see com.rabbitmq.client.AMQP.Basic.Consume
1113+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
1114+
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
1115+
* @since 5.0
1116+
*/
1117+
String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
1118+
10261119
/**
10271120
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
10281121
* method.
@@ -1095,6 +1188,33 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
10951188
*/
10961189
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
10971190

1191+
/**
1192+
* Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
1193+
* method.
1194+
* Provide access to <code>basic.deliver</code>, <code>basic.cancel</code>
1195+
* and shutdown signal callbacks (which is sufficient
1196+
* for most cases). See methods with a {@link Consumer} argument
1197+
* to have access to all the application callbacks.
1198+
* @param queue the name of the queue
1199+
* @param autoAck true if the server should consider messages
1200+
* acknowledged once delivered; false if the server should expect
1201+
* explicit acknowledgements
1202+
* @param consumerTag a client-generated consumer tag to establish context
1203+
* @param noLocal true if the server should not deliver to this consumer
1204+
* messages published on this channel's connection
1205+
* @param exclusive true if this is an exclusive consumer
1206+
* @param arguments a set of arguments for the consume
1207+
* @param deliverCallback callback when a message is delivered
1208+
* @param cancelCallback callback when the consumer is cancelled
1209+
* @param shutdownSignalCallback callback when the channel/connection is shut down
1210+
* @return the consumerTag associated with the new consumer
1211+
* @throws java.io.IOException if an error is encountered
1212+
* @see com.rabbitmq.client.AMQP.Basic.Consume
1213+
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
1214+
* @since 5.0
1215+
*/
1216+
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
1217+
10981218
/**
10991219
* Cancel a consumer. Calls the consumer's {@link Consumer#handleCancelOk}
11001220
* method.

src/main/java/com/rabbitmq/client/ConsumerShutdownSignalCallback.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* @see DeliverCallback
2727
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
2828
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
29+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback)
2930
* @since 5.0
3031
*/
3132
@FunctionalInterface

src/main/java/com/rabbitmq/client/DeliverCallback.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* @see ConsumerShutdownSignalCallback
2727
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
2828
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
29+
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback)
2930
* @since 5.0
3031
*/
3132
@FunctionalInterface

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+62-1
Original file line numberDiff line numberDiff line change
@@ -1201,6 +1201,13 @@ public String basicConsume(String queue, DeliverCallback deliverCallback, Consum
12011201
return basicConsume(queue, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
12021202
}
12031203

1204+
/** Public API - {@inheritDoc} */
1205+
@Override
1206+
public String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback,
1207+
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1208+
return basicConsume(queue, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback));
1209+
}
1210+
12041211
/** Public API - {@inheritDoc} */
12051212
@Override
12061213
public String basicConsume(String queue, boolean autoAck, Consumer callback)
@@ -1222,6 +1229,12 @@ public String basicConsume(String queue, boolean autoAck, DeliverCallback delive
12221229
return basicConsume(queue, autoAck, "", consumerFromDeliverCancelCallbacks(deliverCallback, cancelCallback));
12231230
}
12241231

1232+
@Override
1233+
public String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback,
1234+
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1235+
return basicConsume(queue, autoAck, "", consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback));
1236+
}
1237+
12251238
/** Public API - {@inheritDoc} */
12261239
@Override
12271240
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments,
@@ -1245,6 +1258,13 @@ public String basicConsume(String queue, boolean autoAck, Map<String, Object> ar
12451258
return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
12461259
}
12471260

1261+
/** Public API - {@inheritDoc} */
1262+
@Override
1263+
public String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback,
1264+
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1265+
return basicConsume(queue, autoAck, "", false, false, arguments, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback));
1266+
}
1267+
12481268
/** Public API - {@inheritDoc} */
12491269
@Override
12501270
public String basicConsume(String queue, boolean autoAck, String consumerTag,
@@ -1265,7 +1285,14 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, De
12651285
@Override
12661286
public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback,
12671287
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1268-
return basicConsume(queue, autoAck, consumerTag, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
1288+
return basicConsume(queue, autoAck, consumerTag, false, false, null, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
1289+
}
1290+
1291+
/** Public API - {@inheritDoc} */
1292+
@Override
1293+
public String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback,
1294+
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1295+
return basicConsume(queue, autoAck, consumerTag , false, false, null, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback));
12691296
}
12701297

12711298
/** Public API - {@inheritDoc} */
@@ -1282,6 +1309,12 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, bo
12821309
return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverShutdownCallbacks(deliverCallback, shutdownSignalCallback));
12831310
}
12841311

1312+
@Override
1313+
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments,
1314+
DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException {
1315+
return basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumerFromDeliverCancelShutdownCallbacks(deliverCallback, cancelCallback, shutdownSignalCallback));
1316+
}
1317+
12851318
/** Public API - {@inheritDoc} */
12861319
@Override
12871320
public String basicConsume(String queue, final boolean autoAck, String consumerTag,
@@ -1373,6 +1406,34 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
13731406
};
13741407
}
13751408

1409+
private Consumer consumerFromDeliverCancelShutdownCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback, final ConsumerShutdownSignalCallback shutdownSignalCallback) {
1410+
return new Consumer() {
1411+
@Override
1412+
public void handleConsumeOk(String consumerTag) { }
1413+
1414+
@Override
1415+
public void handleCancelOk(String consumerTag) { }
1416+
1417+
@Override
1418+
public void handleCancel(String consumerTag) throws IOException {
1419+
cancelCallback.handle(consumerTag);
1420+
}
1421+
1422+
@Override
1423+
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
1424+
shutdownSignalCallback.handleShutdownSignal(consumerTag, sig);
1425+
}
1426+
1427+
@Override
1428+
public void handleRecoverOk(String consumerTag) { }
1429+
1430+
@Override
1431+
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
1432+
deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body));
1433+
}
1434+
};
1435+
}
1436+
13761437
/** Public API - {@inheritDoc} */
13771438
@Override
13781439
public void basicCancel(final String consumerTag)

0 commit comments

Comments
 (0)