Skip to content

Commit f79da9e

Browse files
committed
Use lambdas for *Listener objects
A proposal to use lambdas in listeners objects. Some callback interfaces have been introduced when necessary (not necessary for e.g. ShutdownListener). Lambdas are wrapped into the original listener interfaces. Provide a simple test to experiment with the API. References #246
1 parent aaa6f90 commit f79da9e

14 files changed

+318
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
*
22+
*/
23+
public interface BlockedCallback {
24+
25+
void handle(String reason) throws IOException;
26+
27+
}

src/main/java/com/rabbitmq/client/Channel.java

+10
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ public interface Channel extends ShutdownNotifier {
107107
*/
108108
void addReturnListener(ReturnListener listener);
109109

110+
ReturnListener addReturnListener(ReturnCallback returnCallback);
111+
110112
/**
111113
* Remove a {@link ReturnListener}.
112114
* @param listener the listener to remove
@@ -126,6 +128,14 @@ public interface Channel extends ShutdownNotifier {
126128
*/
127129
void addConfirmListener(ConfirmListener listener);
128130

131+
/**
132+
*
133+
* @param ackCallback
134+
* @param nackCallback
135+
* @return
136+
*/
137+
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
138+
129139
/**
130140
* Remove a {@link ConfirmListener}.
131141
* @param listener the listener to remove
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
17+
package com.rabbitmq.client;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
*
23+
*/
24+
public interface ConfirmCallback {
25+
26+
void handle(long deliveryTag, boolean multiple) throws IOException;
27+
28+
}

src/main/java/com/rabbitmq/client/Connection.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -239,6 +239,8 @@ public interface Connection extends ShutdownNotifier, Closeable { // rename to A
239239
*/
240240
void addBlockedListener(BlockedListener listener);
241241

242+
BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback);
243+
242244
/**
243245
* Remove a {@link BlockedListener}.
244246
* @param listener the listener to remove
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
*
20+
*/
21+
public class Return {
22+
23+
private final int replyCode;
24+
private final String replyText;
25+
private final String exchange;
26+
private final String routingKey;
27+
private final AMQP.BasicProperties properties;
28+
private final byte[] body;
29+
30+
public Return(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) {
31+
this.replyCode = replyCode;
32+
this.replyText = replyText;
33+
this.exchange = exchange;
34+
this.routingKey = routingKey;
35+
this.properties = properties;
36+
this.body = body;
37+
}
38+
39+
public int getReplyCode() {
40+
return replyCode;
41+
}
42+
43+
public String getReplyText() {
44+
return replyText;
45+
}
46+
47+
public String getExchange() {
48+
return exchange;
49+
}
50+
51+
public String getRoutingKey() {
52+
return routingKey;
53+
}
54+
55+
public AMQP.BasicProperties getProperties() {
56+
return properties;
57+
}
58+
59+
public byte[] getBody() {
60+
return body;
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
*
20+
*/
21+
public interface ReturnCallback {
22+
23+
void handle(Return returnMessage);
24+
25+
}

src/main/java/com/rabbitmq/client/ShutdownListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,5 @@
3030
* @see ShutdownSignalException
3131
*/
3232
public interface ShutdownListener extends EventListener {
33-
public void shutdownCompleted(ShutdownSignalException cause);
33+
void shutdownCompleted(ShutdownSignalException cause);
3434
}

src/main/java/com/rabbitmq/client/ShutdownNotifier.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,26 @@ public interface ShutdownNotifier {
3030
*
3131
* @param listener {@link ShutdownListener} to the component
3232
*/
33-
public void addShutdownListener(ShutdownListener listener);
33+
void addShutdownListener(ShutdownListener listener);
3434

3535
/**
3636
* Remove shutdown listener for the component.
3737
*
3838
* @param listener {@link ShutdownListener} to be removed
3939
*/
40-
public void removeShutdownListener(ShutdownListener listener);
40+
void removeShutdownListener(ShutdownListener listener);
4141

4242
/**
4343
* Get the shutdown reason object
4444
* @return ShutdownSignalException if component is closed, null otherwise
4545
*/
46-
public ShutdownSignalException getCloseReason();
46+
ShutdownSignalException getCloseReason();
4747

4848
/**
4949
* Protected API - notify the listeners attached to the component
5050
* @see com.rabbitmq.client.ShutdownListener
5151
*/
52-
public void notifyListeners();
52+
void notifyListeners();
5353

5454
/**
5555
* Determine whether the component is currently open.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
*
22+
*/
23+
public interface UnblockedCallback {
24+
25+
void handle() throws IOException;
26+
27+
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

+18
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,24 @@ public void addBlockedListener(BlockedListener listener) {
10311031
blockedListeners.add(listener);
10321032
}
10331033

1034+
@Override
1035+
public BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) {
1036+
BlockedListener blockedListener = new BlockedListener() {
1037+
1038+
@Override
1039+
public void handleBlocked(String reason) throws IOException {
1040+
blockedCallback.handle(reason);
1041+
}
1042+
1043+
@Override
1044+
public void handleUnblocked() throws IOException {
1045+
unblockedCallback.handle();
1046+
}
1047+
};
1048+
this.addBlockedListener(blockedListener);
1049+
return blockedListener;
1050+
}
1051+
10341052
@Override
10351053
public boolean removeBlockedListener(BlockedListener listener) {
10361054
return blockedListeners.remove(listener);

src/main/java/com/rabbitmq/client/impl/ChannelN.java

+28
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeoutException;
2828

29+
import com.rabbitmq.client.ConfirmCallback;
2930
import com.rabbitmq.client.*;
3031
import com.rabbitmq.client.AMQP.BasicProperties;
3132
import com.rabbitmq.client.Method;
@@ -135,6 +136,15 @@ public void addReturnListener(ReturnListener listener) {
135136
returnListeners.add(listener);
136137
}
137138

139+
@Override
140+
public ReturnListener addReturnListener(ReturnCallback returnCallback) {
141+
ReturnListener returnListener = (replyCode, replyText, exchange, routingKey, properties, body) -> returnCallback.handle(new Return(
142+
replyCode, replyText, exchange, routingKey, properties, body
143+
));
144+
this.addReturnListener(returnListener);
145+
return returnListener;
146+
}
147+
138148
@Override
139149
public boolean removeReturnListener(ReturnListener listener) {
140150
return returnListeners.remove(listener);
@@ -150,6 +160,24 @@ public void addConfirmListener(ConfirmListener listener) {
150160
confirmListeners.add(listener);
151161
}
152162

163+
@Override
164+
public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) {
165+
ConfirmListener confirmListener = new ConfirmListener() {
166+
167+
@Override
168+
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
169+
ackCallback.handle(deliveryTag, multiple);
170+
}
171+
172+
@Override
173+
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
174+
nackCallback.handle(deliveryTag, multiple);
175+
}
176+
};
177+
this.addConfirmListener(confirmListener);
178+
return confirmListener;
179+
}
180+
153181
@Override
154182
public boolean removeConfirmListener(ConfirmListener listener) {
155183
return confirmListeners.remove(listener);

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

+27
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ public void addReturnListener(ReturnListener listener) {
9898
delegate.addReturnListener(listener);
9999
}
100100

101+
@Override
102+
public ReturnListener addReturnListener(ReturnCallback returnCallback) {
103+
ReturnListener returnListener = (replyCode, replyText, exchange, routingKey, properties, body) -> returnCallback.handle(new Return(
104+
replyCode, replyText, exchange, routingKey, properties, body
105+
));
106+
this.addReturnListener(returnListener);
107+
return returnListener;
108+
}
109+
101110
@Override
102111
public boolean removeReturnListener(ReturnListener listener) {
103112
this.returnListeners.remove(listener);
@@ -116,6 +125,24 @@ public void addConfirmListener(ConfirmListener listener) {
116125
delegate.addConfirmListener(listener);
117126
}
118127

128+
@Override
129+
public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) {
130+
ConfirmListener confirmListener = new ConfirmListener() {
131+
132+
@Override
133+
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
134+
ackCallback.handle(deliveryTag, multiple);
135+
}
136+
137+
@Override
138+
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
139+
nackCallback.handle(deliveryTag, multiple);
140+
}
141+
};
142+
this.addConfirmListener(confirmListener);
143+
return confirmListener;
144+
}
145+
119146
@Override
120147
public boolean removeConfirmListener(ConfirmListener listener) {
121148
this.confirmListeners.remove(listener);

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

+18
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,24 @@ public void addBlockedListener(BlockedListener listener) {
306306
delegate.addBlockedListener(listener);
307307
}
308308

309+
@Override
310+
public BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) {
311+
BlockedListener blockedListener = new BlockedListener() {
312+
313+
@Override
314+
public void handleBlocked(String reason) throws IOException {
315+
blockedCallback.handle(reason);
316+
}
317+
318+
@Override
319+
public void handleUnblocked() throws IOException {
320+
unblockedCallback.handle();
321+
}
322+
};
323+
this.addBlockedListener(blockedListener);
324+
return blockedListener;
325+
}
326+
309327
/**
310328
* @see Connection#removeBlockedListener(com.rabbitmq.client.BlockedListener)
311329
*/

0 commit comments

Comments
 (0)