Skip to content

Commit 6d953de

Browse files
author
Matthew Sackman
committed
Merging default to bug21647
2 parents 70e0b5c + 6a7dd42 commit 6d953de

File tree

6 files changed

+102
-8
lines changed

6 files changed

+102
-8
lines changed

src/com/rabbitmq/client/Consumer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ public interface Consumer {
4949
*/
5050
void handleCancelOk(String consumerTag);
5151

52+
/**
53+
* Called when the consumer is cancelled for reasons other than by a
54+
* basicCancel: e.g. the queue has been deleted (either by this channel or
55+
* by any other channel). See handleCancelOk for notification of consumer
56+
* cancellation due to basicCancel.
57+
*
58+
* @throws IOException
59+
*/
60+
void handleCancel(String consumerTag) throws IOException;
61+
5262
/**
5363
* Called to the consumer that either the channel or the undelying connection has been shut down.
5464
* @param consumerTag the defined consumerTag (either client- or server-generated)

src/com/rabbitmq/client/DefaultConsumer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ public void handleCancelOk(String consumerTag) {
5353
// no work to do
5454
}
5555

56+
/**
57+
* No-op implementation of {@link Consumer#handleCancel(String)}
58+
* @param consumerTag the defined consumer tag (client- or server-generated)
59+
*/
60+
public void handleCancel(String consumerTag) throws IOException {
61+
// no work to do
62+
}
63+
5664
/**
5765
* No-op implementation of {@link Consumer#handleShutdownSignal}.
5866
*/

src/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public static Map<String, Object> defaultClientProperties() {
6565
capabilities.put("publisher_confirms", true);
6666
capabilities.put("exchange_exchange_bindings", true);
6767
capabilities.put("basic.nack", true);
68+
capabilities.put("consumer_cancel_notify", true);
6869
return Frame.buildTable(new Object[] {
6970
"product", LongStringHelper.asLongString("RabbitMQ"),
7071
"version", LongStringHelper.asLongString(ClientVersion.VERSION),

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717

1818
package com.rabbitmq.client.impl;
1919

20-
import com.rabbitmq.client.ConfirmListener;
21-
import com.rabbitmq.client.AMQP.BasicProperties;
20+
import java.io.IOException;
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.concurrent.TimeoutException;
25+
2226
import com.rabbitmq.client.AMQP;
2327
import com.rabbitmq.client.Command;
28+
import com.rabbitmq.client.ConfirmListener;
2429
import com.rabbitmq.client.Connection;
2530
import com.rabbitmq.client.Consumer;
2631
import com.rabbitmq.client.Envelope;
@@ -30,6 +35,7 @@
3035
import com.rabbitmq.client.ReturnListener;
3136
import com.rabbitmq.client.ShutdownSignalException;
3237
import com.rabbitmq.client.UnexpectedMethodError;
38+
import com.rabbitmq.client.AMQP.BasicProperties;
3339
import com.rabbitmq.client.impl.AMQImpl.Basic;
3440
import com.rabbitmq.client.impl.AMQImpl.Channel;
3541
import com.rabbitmq.client.impl.AMQImpl.Confirm;
@@ -38,12 +44,6 @@
3844
import com.rabbitmq.client.impl.AMQImpl.Tx;
3945
import com.rabbitmq.utility.Utility;
4046

41-
import java.io.IOException;
42-
import java.util.Collections;
43-
import java.util.HashMap;
44-
import java.util.Map;
45-
import java.util.concurrent.TimeoutException;
46-
4747

4848
/**
4949
* Main interface to AMQP protocol functionality. Public API -
@@ -339,6 +339,24 @@ public void releaseChannelNumber() {
339339
// be handled by whichever RPC continuation invoked Recover,
340340
// so return false
341341
return false;
342+
} else if (method instanceof Basic.Cancel) {
343+
Basic.Cancel m = (Basic.Cancel)method;
344+
Consumer callback = _consumers.remove(m.consumerTag);
345+
if (callback == null) {
346+
callback = defaultConsumer;
347+
}
348+
if (callback != null) {
349+
try {
350+
callback.handleCancel(m.getConsumerTag());
351+
} catch (Throwable ex) {
352+
_connection.getExceptionHandler().handleConsumerException(this,
353+
ex,
354+
callback,
355+
m.getConsumerTag(),
356+
"handleCancel");
357+
}
358+
}
359+
return true;
342360
} else {
343361
return false;
344362
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client.test.functional;
18+
19+
import java.io.IOException;
20+
21+
import com.rabbitmq.client.Consumer;
22+
import com.rabbitmq.client.QueueingConsumer;
23+
import com.rabbitmq.client.test.BrokerTestCase;
24+
25+
public class ConsumerCancelNotificiation extends BrokerTestCase {
26+
27+
private final String queue = "cancel_notification_queue";
28+
29+
private final Object lock = new Object();
30+
31+
private boolean notified = false;
32+
33+
public void testConsumerCancellationNotification() throws IOException {
34+
channel.queueDeclare(queue, false, true, false, null);
35+
Consumer consumer = new QueueingConsumer(channel) {
36+
@Override
37+
public void handleCancel(String consumerTag) throws IOException {
38+
synchronized (lock) {
39+
notified = !notified;
40+
lock.notifyAll();
41+
}
42+
}
43+
};
44+
channel.basicConsume(queue, consumer);
45+
channel.queueDelete(queue);
46+
synchronized (lock) {
47+
if (!notified) {
48+
try {
49+
lock.wait();
50+
} catch (InterruptedException e) {
51+
}
52+
}
53+
assertTrue(notified);
54+
}
55+
}
56+
}

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static TestSuite suite() {
5454
suite.addTestSuite(DefaultExchange.class);
5555
suite.addTestSuite(UnbindAutoDeleteExchange.class);
5656
suite.addTestSuite(Confirm.class);
57+
suite.addTestSuite(ConsumerCancelNotificiation.class);
5758
suite.addTestSuite(UnexpectedFrames.class);
5859
suite.addTestSuite(PerQueueTTL.class);
5960
suite.addTestSuite(SaslMechanisms.class);

0 commit comments

Comments
 (0)