Skip to content

Commit cffafcc

Browse files
author
Matthew Sackman
committed
Merging bug23410 to default
2 parents f448f28 + f873872 commit cffafcc

File tree

9 files changed

+115
-59
lines changed

9 files changed

+115
-59
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,16 @@ public interface Channel extends ShutdownNotifier {
135135
void setFlowListener(FlowListener listener);
136136

137137
/**
138-
* Return the current {@link AckListener}.
138+
* Return the current {@link ConfirmListener}.
139139
* @return an interface to the current ack listener.
140140
*/
141-
AckListener getAckListener();
141+
ConfirmListener getConfirmListener();
142142

143143
/**
144-
* Set the current {@link AckListener}.
144+
* Set the current {@link ConfirmListener}.
145145
* @param listener the listener to use, or null indicating "don't use one".
146146
*/
147-
void setAckListener(AckListener listener);
147+
void setConfirmListener(ConfirmListener listener);
148148

149149
/**
150150
* Get the current default consumer. @see setDefaultConsumer for rationale.

src/com/rabbitmq/client/AckListener.java renamed to src/com/rabbitmq/client/ConfirmListener.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020
import java.io.IOException;
2121

2222
/**
23-
* Implement this interface in order to be notified of Basic.Ack
24-
* events.
23+
* Implement this interface in order to be notified of Confirm events.
24+
* Acks represent messages handled succesfully; Nacks represent
25+
* messages lost by the broker. Note, the lost messages could still
26+
* have been delivered to consumers, but the broker cannot guarantee
27+
* this.
2528
*/
26-
public interface AckListener {
29+
public interface ConfirmListener {
2730
void handleAck(long deliveryTag, boolean multiple)
2831
throws IOException;
32+
33+
void handleNack(long deliveryTag, boolean multiple)
34+
throws IOException;
2935
}

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package com.rabbitmq.client.impl;
1919

20-
import com.rabbitmq.client.AckListener;
20+
import com.rabbitmq.client.ConfirmListener;
2121
import com.rabbitmq.client.AMQP.BasicProperties;
2222
import com.rabbitmq.client.AMQP;
2323
import com.rabbitmq.client.Command;
@@ -87,9 +87,9 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
8787
*/
8888
public volatile FlowListener flowListener = null;
8989

90-
/** Reference to the currently-active AckListener, or null if there is none.
90+
/** Reference to the currently-active ConfirmListener, or null if there is none.
9191
*/
92-
public volatile AckListener ackListener = null;
92+
public volatile ConfirmListener confirmListener = null;
9393

9494
/** Sequence number of next published message requiring confirmation.
9595
*/
@@ -150,17 +150,17 @@ public void setFlowListener(FlowListener listener) {
150150
flowListener = listener;
151151
}
152152

153-
/** Returns the current AckListener. */
154-
public AckListener getAckListener() {
155-
return ackListener;
153+
/** Returns the current ConfirmkListener. */
154+
public ConfirmListener getConfirmListener() {
155+
return confirmListener;
156156
}
157157

158158
/**
159-
* Sets the current AckListener.
160-
* A null argument is interpreted to mean "do not use an ack listener".
159+
* Sets the current ConfirmListener.
160+
* A null argument is interpreted to mean "do not use a confirm listener".
161161
*/
162-
public void setAckListener(AckListener listener) {
163-
ackListener = listener;
162+
public void setConfirmListener(ConfirmListener listener) {
163+
confirmListener = listener;
164164
}
165165

166166
/** Returns the current default consumer. */
@@ -322,12 +322,23 @@ public void releaseChannelNumber() {
322322
return true;
323323
} else if (method instanceof Basic.Ack) {
324324
Basic.Ack ack = (Basic.Ack) method;
325-
AckListener l = getAckListener();
325+
ConfirmListener l = getConfirmListener();
326326
if (l != null) {
327327
try {
328328
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
329329
} catch (Throwable ex) {
330-
_connection.getExceptionHandler().handleAckListenerException(this, ex);
330+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
331+
}
332+
}
333+
return true;
334+
} else if (method instanceof Basic.Nack) {
335+
Basic.Nack nack = (Basic.Nack) method;
336+
ConfirmListener l = getConfirmListener();
337+
if (l != null) {
338+
try {
339+
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
340+
} catch (Throwable ex) {
341+
_connection.getExceptionHandler().handleConfirmListenerException(this, ex);
331342
}
332343
}
333344
return true;

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public void handleFlowListenerException(Channel channel, Throwable exception) {
4141
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
4242
}
4343

44-
public void handleAckListenerException(Channel channel, Throwable exception) {
45-
handleChannelKiller(channel, exception, "AckListener.handleAck");
44+
public void handleConfirmListenerException(Channel channel, Throwable exception) {
45+
handleChannelKiller(channel, exception, "ConfirmListener.handle{N,A}ck");
4646
}
4747

4848
public void handleConsumerException(Channel channel, Throwable exception,

src/com/rabbitmq/client/impl/ExceptionHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ public interface ExceptionHandler {
5555

5656
/**
5757
* Perform any required exception processing for the situation
58-
* when the driver thread for the connection has called an
59-
* AckListener's handleAck method, and that method has
60-
* thrown an exeption.
61-
* @param channel the ChannelN that held the AckListener
62-
* @param exception the exception thrown by AckListener.handleAck
58+
* when the driver thread for the connection has called a
59+
* ConfirmListener's handleAck or handleNack method, and that
60+
* method has thrown an exception.
61+
* @param channel the ChannelN that held the ConfirmListener
62+
* @param exception the exception thrown by ConfirmListener.handleAck
6363
*/
64-
void handleAckListenerException(Channel channel, Throwable exception);
64+
void handleConfirmListenerException(Channel channel, Throwable exception);
6565

6666
/**
6767
* Perform any required exception processing for the situation

test/src/com/rabbitmq/client/test/AMQConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ public void handleFlowListenerException(Channel ch, Throwable ex) {
184184
fail("handleFlowListenerException: " + ex);
185185
}
186186

187-
public void handleAckListenerException(Channel ch, Throwable ex) {
188-
fail("handleAckListenerException: " + ex);
187+
public void handleConfirmListenerException(Channel ch, Throwable ex) {
188+
fail("handleConfirmListenerException: " + ex);
189189
}
190190

191191
public void handleConsumerException(Channel ch,

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import com.rabbitmq.client.test.BrokerTestCase;
2121
import com.rabbitmq.client.AMQP;
22-
import com.rabbitmq.client.AckListener;
22+
import com.rabbitmq.client.ConfirmListener;
2323
import com.rabbitmq.client.Channel;
2424
import com.rabbitmq.client.DefaultConsumer;
2525
import com.rabbitmq.client.GetResponse;
@@ -35,17 +35,21 @@ public class Confirm extends BrokerTestCase
3535
{
3636
final static int NUM_MESSAGES = 1000;
3737
private static final String TTL_ARG = "x-message-ttl";
38-
private SortedSet<Long> ackSet;
38+
private SortedSet<Long> unconfirmedSet;
3939

4040
@Override
4141
protected void setUp() throws IOException {
4242
super.setUp();
43-
ackSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
44-
channel.setAckListener(new AckListener() {
45-
public void handleAck(long seqNo,
46-
boolean multiple) {
43+
unconfirmedSet =
44+
Collections.synchronizedSortedSet(new TreeSet<Long>());
45+
channel.setConfirmListener(new ConfirmListener() {
46+
public void handleAck(long seqNo, boolean multiple) {
4747
Confirm.this.handleAck(seqNo, multiple);
4848
}
49+
50+
public void handleNack(long seqNo, boolean multiple) {
51+
Confirm.this.fail("got a nack");
52+
}
4953
});
5054
channel.confirmSelect();
5155
channel.queueDeclare("confirm-test", true, true, false, null);
@@ -236,7 +240,7 @@ private void publishN(String exchangeName, String queueName,
236240
throws IOException
237241
{
238242
for (long i = 0; i < NUM_MESSAGES; i++) {
239-
ackSet.add(channel.getNextPublishSeqNo());
243+
unconfirmedSet.add(channel.getNextPublishSeqNo());
240244
publish(exchangeName, queueName, persistent, mandatory, immediate);
241245
}
242246
}
@@ -254,13 +258,13 @@ private void publish(String exchangeName, String queueName,
254258
}
255259

256260
private void handleAck(long msgSeqNo, boolean multiple) {
257-
if (!ackSet.contains(msgSeqNo)) {
261+
if (!unconfirmedSet.contains(msgSeqNo)) {
258262
fail("got duplicate ack: " + msgSeqNo);
259263
}
260264
if (multiple) {
261-
ackSet.headSet(msgSeqNo + 1).clear();
265+
unconfirmedSet.headSet(msgSeqNo + 1).clear();
262266
} else {
263-
ackSet.remove(msgSeqNo);
267+
unconfirmedSet.remove(msgSeqNo);
264268
}
265269
}
266270

@@ -278,7 +282,7 @@ private void basicRejectCommon(boolean requeue)
278282
}
279283

280284
private void waitAcks() throws InterruptedException {
281-
while (ackSet.size() > 0)
285+
while (unconfirmedSet.size() > 0)
282286
Thread.sleep(10);
283287
}
284288
}

test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package com.rabbitmq.examples;
1919

2020
import com.rabbitmq.client.AMQP;
21-
import com.rabbitmq.client.AckListener;
21+
import com.rabbitmq.client.ConfirmListener;
2222
import com.rabbitmq.client.Channel;
2323
import com.rabbitmq.client.Connection;
2424
import com.rabbitmq.client.ConnectionFactory;
@@ -48,7 +48,7 @@ public static void main(String[] args)
4848
}
4949

5050
static class Publisher implements Runnable {
51-
private volatile SortedSet<Long> ackSet =
51+
private volatile SortedSet<Long> unconfirmedSet =
5252
Collections.synchronizedSortedSet(new TreeSet<Long>());
5353

5454
public void run() {
@@ -60,27 +60,41 @@ public void run() {
6060
Channel ch = conn.createChannel();
6161
ch.queueDeclare(QUEUE_NAME, true, false, true, null);
6262
ch.confirmSelect();
63-
ch.setAckListener(new AckListener() {
64-
public void handleAck(long seqNo,
65-
boolean multiple) {
63+
ch.setConfirmListener(new ConfirmListener() {
64+
public void handleAck(long seqNo, boolean multiple) {
6665
if (multiple) {
67-
ackSet.headSet(seqNo+1).clear();
66+
unconfirmedSet.headSet(seqNo+1).clear();
6867
} else {
69-
ackSet.remove(seqNo);
68+
unconfirmedSet.remove(seqNo);
7069
}
7170
}
71+
72+
public void handleNack(long seqNo, boolean multiple) {
73+
int lost = 0;
74+
if (multiple) {
75+
SortedSet<Long> nackd =
76+
unconfirmedSet.headSet(seqNo+1);
77+
lost = nackd.size();
78+
nackd.clear();
79+
} else {
80+
lost = 1;
81+
unconfirmedSet.remove(seqNo);
82+
}
83+
System.out.printf("Probably lost %d messages.\n",
84+
lost);
85+
}
7286
});
7387

7488
// Publish
7589
for (long i = 0; i < MSG_COUNT; ++i) {
76-
ackSet.add(ch.getNextPublishSeqNo());
90+
unconfirmedSet.add(ch.getNextPublishSeqNo());
7791
ch.basicPublish("", QUEUE_NAME,
7892
MessageProperties.PERSISTENT_BASIC,
7993
"nop".getBytes());
8094
}
8195

8296
// Wait
83-
while (ackSet.size() > 0)
97+
while (unconfirmedSet.size() > 0)
8498
Thread.sleep(10);
8599

86100
// Cleanup

0 commit comments

Comments
 (0)