Skip to content

Commit 9b1038b

Browse files
author
Matthew Sackman
committed
Remove whitespace. Also make this far more useful by having it interrupt someone waiting in the queueingconsumer. Note that we don't throw an exception back to the main reader thread if a msg arrives for this consumer after the cancel. However, the logic in ChannelN shows this is impossible anyway. The point is that it's not a channel level or even connection level exception so the main thread should not be upset by this
1 parent 6d953de commit 9b1038b

File tree

4 files changed

+105
-10
lines changed

4 files changed

+105
-10
lines changed

src/com/rabbitmq/client/Consumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public interface Consumer {
5454
* basicCancel: e.g. the queue has been deleted (either by this channel or
5555
* by any other channel). See handleCancelOk for notification of consumer
5656
* cancellation due to basicCancel.
57-
*
57+
*
5858
* @throws IOException
5959
*/
6060
void handleCancel(String consumerTag) throws IOException;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client;
18+
19+
import com.rabbitmq.utility.SensibleClone;
20+
21+
public class ConsumerCancelledException extends RuntimeException implements
22+
SensibleClone<ConsumerCancelledException> {
23+
24+
@Override
25+
public ConsumerCancelledException sensibleClone() {
26+
try {
27+
return (ConsumerCancelledException) super.clone();
28+
} catch (CloneNotSupportedException e) {
29+
// You've got to be kidding me
30+
throw new Error(e);
31+
}
32+
}
33+
34+
}

src/com/rabbitmq/client/QueueingConsumer.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class QueueingConsumer extends DefaultConsumer {
6464
// When this is non-null the queue is in shutdown mode and nextDelivery should
6565
// throw a shutdown signal exception.
6666
private volatile ShutdownSignalException _shutdown;
67+
private volatile ConsumerCancelledException _cancelled;
6768

6869
// Marker object used to signal the queue is in shutdown mode.
6970
// It is only there to wake up consumers. The canonical representation
@@ -86,6 +87,11 @@ public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q) {
8687
_queue.add(POISON);
8788
}
8889

90+
@Override public void handleCancel(String consumerTag) throws IOException {
91+
_cancelled = new ConsumerCancelledException();
92+
_queue.add(POISON);
93+
}
94+
8995
@Override public void handleDelivery(String consumerTag,
9096
Envelope envelope,
9197
AMQP.BasicProperties properties,
@@ -150,16 +156,19 @@ private void checkShutdown() {
150156
*/
151157
private Delivery handle(Delivery delivery) {
152158
if (delivery == POISON ||
153-
delivery == null && _shutdown != null) {
159+
delivery == null && (_shutdown != null || _cancelled != null)) {
154160
if (delivery == POISON) {
155161
_queue.add(POISON);
156-
if (_shutdown == null) {
162+
if (_shutdown == null && _cancelled == null) {
157163
throw new IllegalStateException(
158-
"POISON in queue, but null _shutdown. " +
164+
"POISON in queue, but null _shutdown and null _cancelled. " +
159165
"This should never happen, please report as a BUG");
160166
}
161167
}
162-
throw Utility.fixStackTrace(_shutdown);
168+
if (null != _shutdown)
169+
throw Utility.fixStackTrace(_shutdown);
170+
if (null != _cancelled)
171+
throw Utility.fixStackTrace(_cancelled);
163172
}
164173
return delivery;
165174
}
@@ -171,7 +180,7 @@ private Delivery handle(Delivery delivery) {
171180
* @throws ShutdownSignalException if the connection is shut down while waiting
172181
*/
173182
public Delivery nextDelivery()
174-
throws InterruptedException, ShutdownSignalException
183+
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
175184
{
176185
return handle(_queue.take());
177186
}
@@ -184,7 +193,7 @@ public Delivery nextDelivery()
184193
* @throws ShutdownSignalException if the connection is shut down while waiting
185194
*/
186195
public Delivery nextDelivery(long timeout)
187-
throws InterruptedException, ShutdownSignalException
196+
throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
188197
{
189198
return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS));
190199
}

test/src/com/rabbitmq/client/test/functional/ConsumerCancelNotificiation.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,31 @@
1919
import java.io.IOException;
2020

2121
import com.rabbitmq.client.Consumer;
22+
import com.rabbitmq.client.ConsumerCancelledException;
2223
import com.rabbitmq.client.QueueingConsumer;
24+
import com.rabbitmq.client.ShutdownSignalException;
2325
import com.rabbitmq.client.test.BrokerTestCase;
2426

2527
public class ConsumerCancelNotificiation extends BrokerTestCase {
2628

2729
private final String queue = "cancel_notification_queue";
28-
30+
2931
private final Object lock = new Object();
30-
32+
3133
private boolean notified = false;
3234

35+
private boolean failed = false;
36+
3337
public void testConsumerCancellationNotification() throws IOException {
38+
synchronized (lock) {
39+
notified = false;
40+
}
3441
channel.queueDeclare(queue, false, true, false, null);
3542
Consumer consumer = new QueueingConsumer(channel) {
3643
@Override
3744
public void handleCancel(String consumerTag) throws IOException {
3845
synchronized (lock) {
39-
notified = !notified;
46+
notified = true;
4047
lock.notifyAll();
4148
}
4249
}
@@ -53,4 +60,49 @@ public void handleCancel(String consumerTag) throws IOException {
5360
assertTrue(notified);
5461
}
5562
}
63+
64+
public void testConsumerCancellationInterruptsQueuingConsumerWait()
65+
throws IOException, InterruptedException {
66+
synchronized (lock) {
67+
notified = false;
68+
failed = false;
69+
}
70+
channel.queueDeclare(queue, false, true, false, null);
71+
final QueueingConsumer consumer = new QueueingConsumer(channel);
72+
Runnable receiver = new Runnable() {
73+
74+
@Override
75+
public void run() {
76+
try {
77+
consumer.nextDelivery();
78+
} catch (ConsumerCancelledException e) {
79+
synchronized (lock) {
80+
notified = true;
81+
lock.notifyAll();
82+
return; // avoid fall through to failure
83+
}
84+
} catch (ShutdownSignalException e) {
85+
} catch (InterruptedException e) {
86+
}
87+
synchronized (lock) {
88+
failed = true;
89+
lock.notifyAll();
90+
}
91+
}
92+
};
93+
Thread t = new Thread(receiver);
94+
t.start();
95+
channel.basicConsume(queue, consumer);
96+
channel.queueDelete(queue);
97+
synchronized (lock) {
98+
if (!(notified || failed)) {
99+
try {
100+
lock.wait();
101+
} catch (InterruptedException e) {
102+
}
103+
}
104+
assertTrue(notified);
105+
}
106+
t.join();
107+
}
56108
}

0 commit comments

Comments
 (0)