Skip to content

Commit 6d0bc60

Browse files
author
Alexandru Scvortov
committed
merge AckListener and NackListener into ConfirmListener
1 parent 01460e8 commit 6d0bc60

File tree

10 files changed

+66
-118
lines changed

10 files changed

+66
-118
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -135,28 +135,16 @@ public interface Channel extends ShutdownNotifier {
135135
void setFlowListener(FlowListener listener);
136136

137137
/**
138-
* Return the current {@link AckListener}.
138+
* Return the current {@link ConfirmListener}.
139139
* @return an interface to the current ack listener.
140140
*/
141-
AckListener getAckListener();
141+
ConfirmListener getConfirmListener();
142142

143143
/**
144-
* Set the current {@link AckListener}.
144+
* Set the current {@link ConfirmListener}.
145145
* @param listener the listener to use, or null indicating "don't use one".
146146
*/
147-
void setAckListener(AckListener listener);
148-
149-
/**
150-
* Return the current {@link NackListener}.
151-
* @return an interface to the current nack listener.
152-
*/
153-
NackListener getNackListener();
154-
155-
/**
156-
* Set the current {@link NackListener}.
157-
* @param listener the listener to use, or null indicating "don't use one".
158-
*/
159-
void setNackListener(NackListener listener);
147+
void setConfirmListener(ConfirmListener listener);
160148

161149
/**
162150
* Get the current default consumer. @see setDefaultConsumer for rationale.

src/com/rabbitmq/client/AckListener.java renamed to src/com/rabbitmq/client/ConfirmListener.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020
import java.io.IOException;
2121

2222
/**
23-
* Implement this interface in order to be notified of Basic.Ack
24-
* events.
23+
* Implement this interface in order to be notified of Confirm events.
24+
* Acks represent messages handled succesfully; Nacks represent
25+
* messages lost by the broker. Note, the lost messages could still
26+
* have been delivered to consumers, but the broker cannot guarantee
27+
* this.
2528
*/
26-
public interface AckListener {
29+
public interface ConfirmListener {
2730
void handleAck(long deliveryTag, boolean multiple)
2831
throws IOException;
32+
33+
void handleNack(long deliveryTag, boolean multiple)
34+
throws IOException;
2935
}

src/com/rabbitmq/client/NackListener.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package com.rabbitmq.client.impl;
1919

20-
import com.rabbitmq.client.AckListener;
21-
import com.rabbitmq.client.NackListener;
20+
import com.rabbitmq.client.ConfirmListener;
2221
import com.rabbitmq.client.AMQP.BasicProperties;
2322
import com.rabbitmq.client.AMQP;
2423
import com.rabbitmq.client.Command;
@@ -88,13 +87,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
8887
*/
8988
public volatile FlowListener flowListener = null;
9089

91-
/** Reference to the currently-active AckListener, or null if there is none.
90+
/** Reference to the currently-active ConfirmListener, or null if there is none.
9291
*/
93-
public volatile AckListener ackListener = null;
94-
95-
/** Reference to the currently-active NackListener, or null if there is none.
96-
*/
97-
public volatile NackListener nackListener = null;
92+
public volatile ConfirmListener confirmListener = null;
9893

9994
/** Sequence number of next published message requiring confirmation.
10095
*/
@@ -155,30 +150,17 @@ public void setFlowListener(FlowListener listener) {
155150
flowListener = listener;
156151
}
157152

158-
/** Returns the current AckListener. */
159-
public AckListener getAckListener() {
160-
return ackListener;
161-
}
162-
163-
/**
164-
* Sets the current AckListener.
165-
* A null argument is interpreted to mean "do not use an ack listener".
166-
*/
167-
public void setAckListener(AckListener listener) {
168-
ackListener = listener;
169-
}
170-
171-
/** Returns the current NackListener. */
172-
public NackListener getNackListener() {
173-
return nackListener;
153+
/** Returns the current ConfirmkListener. */
154+
public ConfirmListener getConfirmListener() {
155+
return confirmListener;
174156
}
175157

176158
/**
177-
* Sets the current NackListener.
178-
* A null argument is interpreted to mean "do not use a nack listener".
159+
* Sets the current ConfirmListener.
160+
* A null argument is interpreted to mean "do not use a confirm listener".
179161
*/
180-
public void setNackListener(NackListener listener) {
181-
nackListener = listener;
162+
public void setConfirmListener(ConfirmListener listener) {
163+
confirmListener = listener;
182164
}
183165

184166
/** Returns the current default consumer. */
@@ -340,23 +322,23 @@ public void releaseChannelNumber() {
340322
return true;
341323
} else if (method instanceof Basic.Ack) {
342324
Basic.Ack ack = (Basic.Ack) method;
343-
AckListener l = getAckListener();
325+
ConfirmListener l = getConfirmListener();
344326
if (l != null) {
345327
try {
346328
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
347329
} catch (Throwable ex) {
348-
_connection.getExceptionHandler().handleAckListenerException(this, ex);
330+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
349331
}
350332
}
351333
return true;
352334
} else if (method instanceof Basic.Nack) {
353335
Basic.Nack nack = (Basic.Nack) method;
354-
NackListener l = getNackListener();
336+
ConfirmListener l = getConfirmListener();
355337
if (l != null) {
356338
try {
357339
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
358340
} catch (Throwable ex) {
359-
_connection.getExceptionHandler().handleNackListenerException(this, ex);
341+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
360342
}
361343
}
362344
return true;

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,8 @@ public void handleFlowListenerException(Channel channel, Throwable exception) {
4141
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
4242
}
4343

44-
public void handleAckListenerException(Channel channel, Throwable exception) {
45-
handleChannelKiller(channel, exception, "AckListener.handleAck");
46-
}
47-
48-
public void handleNackListenerException(Channel channel, Throwable exception) {
49-
handleChannelKiller(channel, exception, "NackListener.handleNack");
44+
public void handleConfirmListenerException(Channel channel, Throwable exception) {
45+
handleChannelKiller(channel, exception, "ConfirmListener.handle{N,A}ck");
5046
}
5147

5248
public void handleConsumerException(Channel channel, Throwable exception,

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,22 +56,12 @@ public interface ExceptionHandler {
5656
/**
5757
* Perform any required exception processing for the situation
5858
* when the driver thread for the connection has called an
59-
* AckListener's handleAck method, and that method has
60-
* thrown an exeption.
61-
* @param channel the ChannelN that held the AckListener
62-
* @param exception the exception thrown by AckListener.handleAck
63-
*/
64-
void handleAckListenerException(Channel channel, Throwable exception);
65-
66-
/**
67-
* Perform any required exception processing for the situation
68-
* when the driver thread for the connection has called an
69-
* NackListener's handleNack method, and that method has
70-
* thrown an exeption.
71-
* @param channel the ChannelN that held the NackListener
72-
* @param exception the exception thrown by NackListener.handleNack
59+
* ConfirmListener's handleAck or handleNack method, and that
60+
* method has thrown an exeption.
61+
* @param channel the ChannelN that held the ConfirmListener
62+
* @param exception the exception thrown by ConfirmListener.handleAck
7363
*/
74-
void handleNackListenerException(Channel channel, Throwable exception);
64+
void handleConfirmListenerException(Channel channel, Throwable exception);
7565

7666
/**
7767
* Perform any required exception processing for the situation

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ public void handleFlowListenerException(Channel ch, Throwable ex) {
184184
fail("handleFlowListenerException: " + ex);
185185
}
186186

187-
public void handleAckListenerException(Channel ch, Throwable ex) {
188-
fail("handleAckListenerException: " + ex);
187+
public void handleConfirmListenerException(Channel ch, Throwable ex) {
188+
fail("handleConfirmListenerException: " + ex);
189189
}
190190

191191
public void handleConsumerException(Channel ch,

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import com.rabbitmq.client.test.BrokerTestCase;
2121
import com.rabbitmq.client.AMQP;
22-
import com.rabbitmq.client.AckListener;
22+
import com.rabbitmq.client.ConfirmListener;
2323
import com.rabbitmq.client.Channel;
2424
import com.rabbitmq.client.DefaultConsumer;
2525
import com.rabbitmq.client.GetResponse;
@@ -41,11 +41,14 @@ public class Confirm extends BrokerTestCase
4141
protected void setUp() throws IOException {
4242
super.setUp();
4343
ackSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
44-
channel.setAckListener(new AckListener() {
45-
public void handleAck(long seqNo,
46-
boolean multiple) {
44+
channel.setConfirmListener(new ConfirmListener() {
45+
public void handleAck(long seqNo, boolean multiple) {
4746
Confirm.this.handleAck(seqNo, multiple);
4847
}
48+
49+
public void handleNack(long seqNo, boolean multiple) {
50+
Confirm.this.fail("got a nack");
51+
}
4952
});
5053
channel.confirmSelect();
5154
channel.queueDeclare("confirm-test", true, true, false, null);

test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package com.rabbitmq.examples;
1919

2020
import com.rabbitmq.client.AMQP;
21-
import com.rabbitmq.client.AckListener;
21+
import com.rabbitmq.client.ConfirmListener;
2222
import com.rabbitmq.client.Channel;
2323
import com.rabbitmq.client.Connection;
2424
import com.rabbitmq.client.ConnectionFactory;
@@ -60,15 +60,29 @@ public void run() {
6060
Channel ch = conn.createChannel();
6161
ch.queueDeclare(QUEUE_NAME, true, false, true, null);
6262
ch.confirmSelect();
63-
ch.setAckListener(new AckListener() {
64-
public void handleAck(long seqNo,
65-
boolean multiple) {
63+
ch.setConfirmListener(new ConfirmListener() {
64+
public void handleAck(long seqNo, boolean multiple) {
6665
if (multiple) {
6766
ackSet.headSet(seqNo+1).clear();
6867
} else {
6968
ackSet.remove(seqNo);
7069
}
7170
}
71+
72+
public void handleNack(long seqNo, boolean multiple) {
73+
int lost = 0;
74+
if (multiple) {
75+
SortedSet<Long> nackd =
76+
ackSet.headSet(seqNo+1);
77+
lost = nackd.size();
78+
nackd.clear();
79+
} else {
80+
lost = 1;
81+
ackSet.remove(seqNo);
82+
}
83+
System.out.printf("Probably lost %d messages.\n",
84+
lost);
85+
}
7286
});
7387

7488
// Publish

test/src/com/rabbitmq/examples/MulticastMain.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@
3838
import org.apache.commons.cli.Options;
3939
import org.apache.commons.cli.ParseException;
4040

41-
import com.rabbitmq.client.AckListener;
41+
import com.rabbitmq.client.ConfirmListener;
4242
import com.rabbitmq.client.AMQP;
4343
import com.rabbitmq.client.Address;
4444
import com.rabbitmq.client.Channel;
4545
import com.rabbitmq.client.Connection;
4646
import com.rabbitmq.client.ConnectionFactory;
4747
import com.rabbitmq.client.Envelope;
4848
import com.rabbitmq.client.MessageProperties;
49-
import com.rabbitmq.client.NackListener;
5049
import com.rabbitmq.client.QueueingConsumer;
5150
import com.rabbitmq.client.ReturnListener;
5251
import com.rabbitmq.client.ShutdownSignalException;
@@ -140,8 +139,7 @@ public static void main(String[] args) {
140139
rateLimit, minMsgSize, timeLimit,
141140
confirm, confirmMax);
142141
channel.setReturnListener(p);
143-
channel.setAckListener(p);
144-
channel.setNackListener(p);
142+
channel.setConfirmListener(p);
145143
Thread t = new Thread(p);
146144
producerThreads[i] = t;
147145
t.start();
@@ -216,9 +214,9 @@ private static List lstArg(CommandLine cmd, char opt) {
216214
return Arrays.asList(vals);
217215
}
218216

219-
public static class Producer implements Runnable, ReturnListener, AckListener,
220-
NackListener {
221-
217+
public static class Producer implements Runnable, ReturnListener,
218+
ConfirmListener
219+
{
222220
private Channel channel;
223221
private String exchangeName;
224222
private String id;

0 commit comments

Comments
 (0)