Skip to content

Commit ee05871

Browse files
committed
Finalize lambdas for *Listener objects
Add functional interfaces for confirm, blocked, and return listeners. Fixes #246
1 parent b59af73 commit ee05871

13 files changed

+144
-49
lines changed

src/main/java/com/rabbitmq/client/BlockedCallback.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
import java.io.IOException;
1919

2020
/**
21-
*
21+
* Implement this interface in order to be notified of connection block events.
22+
* Prefer it over {@link BlockedListener} for a lambda-oriented syntax.
23+
* @see BlockedListener
24+
* @see UnblockedCallback
2225
*/
26+
@FunctionalInterface
2327
public interface BlockedCallback {
2428

2529
void handle(String reason) throws IOException;

src/main/java/com/rabbitmq/client/BlockedListener.java

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
/**
2222
* Implement this interface in order to be notified of connection block and
2323
* unblock events.
24+
* For a lambda-oriented syntax, use {@link BlockedCallback} and
25+
* {@link UnblockedCallback}.
2426
*/
2527
public interface BlockedListener {
2628
void handleBlocked(String reason) throws IOException;

src/main/java/com/rabbitmq/client/Channel.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ public interface Channel extends ShutdownNotifier {
107107
*/
108108
void addReturnListener(ReturnListener listener);
109109

110+
/**
111+
* Add a lambda-based {@link ReturnListener}.
112+
* @see ReturnListener
113+
* @see ReturnCallback
114+
* @see Return
115+
* @param returnCallback the callback when the message is returned
116+
* @return the listener that wraps the callback
117+
*/
110118
ReturnListener addReturnListener(ReturnCallback returnCallback);
111119

112120
/**
@@ -129,10 +137,12 @@ public interface Channel extends ShutdownNotifier {
129137
void addConfirmListener(ConfirmListener listener);
130138

131139
/**
132-
*
133-
* @param ackCallback
134-
* @param nackCallback
135-
* @return
140+
* Add a lambda-based {@link ConfirmListener}.
141+
* @see ConfirmListener
142+
* @see ConfirmCallback
143+
* @param ackCallback callback on ack
144+
* @param nackCallback call on ack
145+
* @return the listener that wraps the callbacks
136146
*/
137147
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
138148

src/main/java/com/rabbitmq/client/ConfirmCallback.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,16 @@
1919
import java.io.IOException;
2020

2121
/**
22-
*
22+
* Implement this interface in order to be notified of Confirm events.
23+
* Acks represent messages handled successfully; Nacks represent
24+
* messages lost by the broker. Note, the lost messages could still
25+
* have been delivered to consumers, but the broker cannot guarantee
26+
* this.
27+
* Prefer this interface over {@link ConfirmListener} for
28+
* a lambda-oriented syntax.
29+
* @see ConfirmListener
2330
*/
31+
@FunctionalInterface
2432
public interface ConfirmCallback {
2533

2634
void handle(long deliveryTag, boolean multiple) throws IOException;

src/main/java/com/rabbitmq/client/ConfirmListener.java

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* messages lost by the broker. Note, the lost messages could still
2525
* have been delivered to consumers, but the broker cannot guarantee
2626
* this.
27+
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
2728
*/
2829
public interface ConfirmListener {
2930
void handleAck(long deliveryTag, boolean multiple)

src/main/java/com/rabbitmq/client/Connection.java

+9
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,15 @@ public interface Connection extends ShutdownNotifier, Closeable { // rename to A
239239
*/
240240
void addBlockedListener(BlockedListener listener);
241241

242+
/**
243+
* Add a lambda-based {@link BlockedListener}.
244+
* @see BlockedListener
245+
* @see BlockedCallback
246+
* @see UnblockedCallback
247+
* @param blockedCallback the callback when the connection is blocked
248+
* @param unblockedCallback the callback when the connection is unblocked
249+
* @return the listener that wraps the callback
250+
*/
242251
BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback);
243252

244253
/**

src/main/java/com/rabbitmq/client/ReturnCallback.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,16 @@
1616
package com.rabbitmq.client;
1717

1818
/**
19-
*
19+
* Implement this interface in order to be notified of failed
20+
* deliveries when basicPublish is called with "mandatory" or
21+
* "immediate" flags set.
22+
* Prefer this interface over {@link ReturnListener} for
23+
* a simpler, lambda-oriented syntax.
24+
* @see Channel#basicPublish
25+
* @see ReturnListener
26+
* @see Return
2027
*/
28+
@FunctionalInterface
2129
public interface ReturnCallback {
2230

2331
void handle(Return returnMessage);

src/main/java/com/rabbitmq/client/ReturnListener.java

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* Implement this interface in order to be notified of failed
2323
* deliveries when basicPublish is called with "mandatory" or
2424
* "immediate" flags set.
25+
* For a lambda-oriented syntax, use {@link ReturnCallback}.
2526
* @see Channel#basicPublish
2627
*/
2728
public interface ReturnListener {

src/main/java/com/rabbitmq/client/ShutdownListener.java

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* @see ShutdownNotifier
3030
* @see ShutdownSignalException
3131
*/
32+
@FunctionalInterface
3233
public interface ShutdownListener extends EventListener {
3334
void shutdownCompleted(ShutdownSignalException cause);
3435
}

src/main/java/com/rabbitmq/client/UnblockedCallback.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
import java.io.IOException;
1919

2020
/**
21-
*
21+
* Implement this interface in order to be notified of connection unblock events.
22+
* Prefer it over {@link BlockedListener} for a lambda-oriented syntax.
23+
* @see BlockedListener
24+
* @see BlockedCallback
2225
*/
26+
@FunctionalInterface
2327
public interface UnblockedCallback {
2428

2529
void handle() throws IOException;

src/test/java/com/rabbitmq/client/test/ClientTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646
StandardMetricsCollectorTest.class,
4747
DnsSrvRecordAddressResolverTest.class,
4848
JavaNioTest.class,
49-
RpcTest.class
49+
RpcTest.class,
50+
LambdaListenerCallbackTest.class
5051
})
5152
public class ClientTests {
5253

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Channel;
19+
import com.rabbitmq.client.Connection;
20+
import org.junit.Test;
21+
22+
import java.io.IOException;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import static org.junit.Assert.assertTrue;
27+
28+
public class LambdaListenerCallbackTest extends BrokerTestCase {
29+
30+
protected void releaseResources() throws IOException {
31+
try {
32+
unblock();
33+
} catch (InterruptedException e) {
34+
e.printStackTrace();
35+
}
36+
}
37+
38+
@Test public void shutdownListener() throws Exception {
39+
CountDownLatch latch = new CountDownLatch(2);
40+
try(Connection connection = TestUtils.connectionFactory().newConnection()) {
41+
connection.addShutdownListener(cause -> latch.countDown());
42+
Channel channel = connection.createChannel();
43+
channel.addShutdownListener(cause -> latch.countDown());
44+
}
45+
assertTrue("Connection closed, shutdown listeners should have been called", latch.await(1, TimeUnit.SECONDS));
46+
}
47+
48+
@Test public void confirmListener() throws Exception {
49+
channel.confirmSelect();
50+
CountDownLatch latch = new CountDownLatch(1);
51+
channel.addConfirmListener(
52+
(deliveryTag, multiple) -> latch.countDown(),
53+
(deliveryTag, multiple) -> {}
54+
);
55+
channel.basicPublish("", "whatever", null, "dummy".getBytes());
56+
assertTrue("Should have received publisher confirm", latch.await(1, TimeUnit.SECONDS));
57+
}
58+
59+
@Test public void returnListener() throws Exception {
60+
CountDownLatch latch = new CountDownLatch(1);
61+
channel.addReturnListener(returnMessage -> latch.countDown());
62+
channel.basicPublish("", "notlikelytoexist", true, null, "dummy".getBytes());
63+
assertTrue("Should have received returned message", latch.await(1, TimeUnit.SECONDS));
64+
}
65+
66+
@Test public void blockedListener() throws Exception {
67+
final CountDownLatch latch = new CountDownLatch(1);
68+
try(Connection connection = TestUtils.connectionFactory().newConnection()) {
69+
connection.addBlockedListener(
70+
reason -> {
71+
try {
72+
unblock();
73+
} catch (InterruptedException e) {
74+
e.printStackTrace();
75+
}
76+
},
77+
() -> latch.countDown()
78+
);
79+
block();
80+
Channel ch = connection.createChannel();
81+
ch.basicPublish("", "", null, "dummy".getBytes());
82+
assertTrue("Should have been blocked and unblocked", latch.await(10, TimeUnit.SECONDS));
83+
}
84+
}
85+
86+
}

src/test/java/com/rabbitmq/client/test/ListenerCallbackTest.java

-40
This file was deleted.

0 commit comments

Comments
 (0)