Skip to content

Commit cfd4a2f

Browse files
author
Matthew Sackman
committed
Merge bug22236 ⇨ default
2 parents 680f98e + bfb2172 commit cfd4a2f

File tree

3 files changed

+119
-21
lines changed

3 files changed

+119
-21
lines changed

src/com/rabbitmq/client/QueueingConsumer.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,37 @@
3737
import java.util.concurrent.TimeUnit;
3838

3939
import com.rabbitmq.client.AMQP.BasicProperties;
40-
import com.rabbitmq.utility.ValueOrException;
40+
import com.rabbitmq.utility.Utility;
4141

4242
/**
4343
* Convenience class: an implementation of {@link Consumer} with straightforward blocking semantics
4444
*/
4545
public class QueueingConsumer extends DefaultConsumer {
46-
public BlockingQueue<ValueOrException<Delivery, ShutdownSignalException>> _queue;
46+
private final BlockingQueue<Delivery> _queue;
47+
48+
// When this is non-null the queue is in shutdown mode and nextDelivery should
49+
// throw a shutdown signal exception.
50+
private volatile ShutdownSignalException _shutdown;
51+
52+
// Marker object used to signal the queue is in shutdown mode.
53+
// It is only there to wake up consumers. The canonical representation
54+
// of shutting down is the presence of _shutdown.
55+
// Invariant: This is never on _queue unless _shutdown != null.
56+
private static final Delivery POISON = new Delivery(null, null, null);
4757

4858
public QueueingConsumer(Channel ch) {
49-
this(ch,
50-
new LinkedBlockingQueue<ValueOrException<Delivery, ShutdownSignalException>>());
59+
this(ch, new LinkedBlockingQueue<Delivery>());
5160
}
5261

53-
public QueueingConsumer(Channel ch,
54-
BlockingQueue<ValueOrException<Delivery, ShutdownSignalException>> q)
62+
public QueueingConsumer(Channel ch, BlockingQueue<Delivery> q)
5563
{
5664
super(ch);
5765
this._queue = q;
5866
}
5967

6068
@Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
61-
this._queue.add(ValueOrException. <Delivery, ShutdownSignalException> makeException(sig));
69+
_shutdown = sig;
70+
_queue.add(POISON);
6271
}
6372

6473
@Override public void handleDelivery(String consumerTag,
@@ -67,8 +76,8 @@ public QueueingConsumer(Channel ch,
6776
byte[] body)
6877
throws IOException
6978
{
70-
this._queue.add(ValueOrException. <Delivery, ShutdownSignalException> makeValue
71-
(new Delivery(envelope, properties, body)));
79+
checkShutdown();
80+
this._queue.add(new Delivery(envelope, properties, body));
7281
}
7382

7483
/**
@@ -110,6 +119,27 @@ public byte[] getBody() {
110119
}
111120
}
112121

122+
/**
123+
* Check if we are in shutdown mode and if so throw an exception.
124+
*/
125+
private void checkShutdown(){
126+
if(_shutdown != null) throw Utility.fixStackTrace(_shutdown);
127+
}
128+
129+
/**
130+
* If this is a non-POISON non-null delivery simply return it.
131+
* If this is POISON we are in shutdown mode, throw _shutdown
132+
* If this is null, we may be in shutdown mode. Check and see.
133+
*/
134+
private Delivery handle(Delivery delivery)
135+
{
136+
if(delivery == POISON || (delivery == null && _shutdown != null)){
137+
if(delivery == POISON) _queue.add(POISON);
138+
throw Utility.fixStackTrace(_shutdown);
139+
}
140+
return delivery;
141+
}
142+
113143
/**
114144
* Main application-side API: wait for the next message delivery and return it.
115145
* @return the next message
@@ -119,7 +149,7 @@ public byte[] getBody() {
119149
public Delivery nextDelivery()
120150
throws InterruptedException, ShutdownSignalException
121151
{
122-
return _queue.take().getValue();
152+
return handle(_queue.take());
123153
}
124154

125155
/**
@@ -132,16 +162,7 @@ public Delivery nextDelivery()
132162
public Delivery nextDelivery(long timeout)
133163
throws InterruptedException, ShutdownSignalException
134164
{
135-
ValueOrException<Delivery, ShutdownSignalException> r =
136-
_queue.poll(timeout, TimeUnit.MILLISECONDS);
137-
return r == null ? null : r.getValue();
138-
}
139-
140-
/**
141-
* Retrieve the underlying blocking queue.
142-
* @return the queue where incoming messages are stored
143-
*/
144-
public BlockingQueue<ValueOrException<Delivery, ShutdownSignalException>> getQueue() {
145-
return _queue;
165+
checkShutdown();
166+
return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS));
146167
}
147168
}

test/src/com/rabbitmq/client/test/ClientTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public static TestSuite suite() {
4747
suite.addTestSuite(Bug20004Test.class);
4848
suite.addTestSuite(CloseInMainLoop.class);
4949
suite.addTestSuite(ChannelNumberAllocationTests.class);
50+
suite.addTestSuite(QueueingConsumerShutdownTests.class);
5051
return suite;
5152
}
5253
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2009 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
package com.rabbitmq.client.test;
32+
33+
import com.rabbitmq.client.*;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.TimeUnit;
37+
38+
public class QueueingConsumerShutdownTests extends BrokerTestCase{
39+
static final String QUEUE = "some-queue";
40+
static final int THREADS = 5;
41+
42+
public void testNThreadShutdown() throws Exception{
43+
Channel channel = connection.createChannel();
44+
final QueueingConsumer c = new QueueingConsumer(channel);
45+
channel.queueDeclare(QUEUE);
46+
channel.basicConsume(QUEUE, c);
47+
final AtomicInteger count = new AtomicInteger(THREADS);
48+
final CountDownLatch latch = new CountDownLatch(THREADS);
49+
50+
for(int i = 0; i < THREADS; i++){
51+
new Thread(){
52+
@Override public void run(){
53+
try {
54+
while(true){
55+
c.nextDelivery();
56+
}
57+
} catch (ShutdownSignalException sig) {
58+
count.decrementAndGet();
59+
} catch (Exception e) {
60+
throw new RuntimeException(e);
61+
} finally {
62+
latch.countDown();
63+
}
64+
}
65+
}.start();
66+
}
67+
68+
connection.close();
69+
70+
// Far longer than this could reasonably take
71+
assertTrue(latch.await(5, TimeUnit.SECONDS));
72+
assertEquals(0, count.get());
73+
}
74+
75+
76+
}

0 commit comments

Comments
 (0)