Skip to content

Add lambda-based methods to consume messages #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/main/java/com/rabbitmq/client/CancelCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import java.io.IOException;
import java.util.Map;

/**
* Callback interface to be notified of the cancellation of a consumer.
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
* if you don't need to implement all the application callbacks.
* @see DeliverCallback
* @see ConsumerShutdownSignalCallback
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback)
* @since 5.0
*/
@FunctionalInterface
public interface CancelCallback {

/**
* Called when the consumer is cancelled for reasons <i>other than</i> by a call to
* {@link Channel#basicCancel}. For example, the queue has been deleted.
* See {@link Consumer#handleCancelOk} for notification of consumer
* cancellation due to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @throws IOException
*/
void handle(String consumerTag) throws IOException;

}
350 changes: 350 additions & 0 deletions src/main/java/com/rabbitmq/client/Channel.java

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/main/java/com/rabbitmq/client/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
* because this will delay dispatch of messages to other {@link Consumer}s on the same
* {@link Channel}.
*
* For a lambda-oriented syntax, use {@link DeliverCallback},
* {@link CancelCallback}, and {@link ConsumerShutdownSignalCallback}.
*
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
* @see Channel#basicCancel
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import java.util.Map;

/**
* Callback interface to be notified when either the consumer channel
* or the underlying connection has been shut down.
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
* if you don't need to implement all the application callbacks.
* @see CancelCallback
* @see DeliverCallback
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback)
* @since 5.0
*/
@FunctionalInterface
public interface ConsumerShutdownSignalCallback {

/**
* Called when either the channel or the underlying connection has been shut down.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
*/
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

}
43 changes: 43 additions & 0 deletions src/main/java/com/rabbitmq/client/DeliverCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import java.io.IOException;
import java.util.Map;

/**
* Callback interface to be notified when a message is delivered.
* Prefer it over {@link Consumer} for a lambda-oriented syntax,
* if you don't need to implement all the application callbacks.
* @see CancelCallback
* @see ConsumerShutdownSignalCallback
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback)
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, ConsumerShutdownSignalCallback)
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, DeliverCallback, CancelCallback, ConsumerShutdownSignalCallback)
* @since 5.0
*/
@FunctionalInterface
public interface DeliverCallback {

/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param message the delivered message
* @throws IOException if the consumer encounters an I/O error while processing the message
*/
void handle(String consumerTag, Delivery message) throws IOException;

}
55 changes: 55 additions & 0 deletions src/main/java/com/rabbitmq/client/Delivery.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

/**
* Encapsulates an arbitrary message - simple "bean" holder structure.
*/
public class Delivery {
private final Envelope _envelope;
private final AMQP.BasicProperties _properties;
private final byte[] _body;

public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
_envelope = envelope;
_properties = properties;
_body = body;
}

/**
* Retrieve the message envelope.
* @return the message envelope
*/
public Envelope getEnvelope() {
return _envelope;
}

/**
* Retrieve the message properties.
* @return the message properties
*/
public AMQP.BasicProperties getProperties() {
return _properties;
}

/**
* Retrieve the message body.
* @return the message body
*/
public byte[] getBody() {
return _body;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can equals and hashCode be defined in here to support mocking? The same question could be said for https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/Return.java which was recently added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good idea but we can do it in a follow-up PR (feel free to look into a pull request ;))

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I'll try and put one together.

}
39 changes: 0 additions & 39 deletions src/main/java/com/rabbitmq/client/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,45 +235,6 @@ public String getQueueName() {
return _queueName;
}

/**
* Encapsulates an arbitrary message - simple "bean" holder structure.
*/
public static class Delivery {
private final Envelope _envelope;
private final AMQP.BasicProperties _properties;
private final byte[] _body;

public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
_envelope = envelope;
_properties = properties;
_body = body;
}

/**
* Retrieve the message envelope.
* @return the message envelope
*/
public Envelope getEnvelope() {
return _envelope;
}

/**
* Retrieve the message properties.
* @return the message properties
*/
public AMQP.BasicProperties getProperties() {
return _properties;
}

/**
* Retrieve the message body.
* @return the message body
*/
public byte[] getBody() {
return _body;
}
}

public interface RpcConsumer extends Consumer {

Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException;
Expand Down
Loading