Skip to content

Commit 74baab7

Browse files
author
Matthew Sackman
committed
Merging bug 22587 into default
2 parents 59ed27f + 5751564 commit 74baab7

File tree

4 files changed

+107
-2
lines changed

4 files changed

+107
-2
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,40 @@ public interface Channel extends ShutdownNotifier {
147147
*/
148148
void setFlowListener(FlowListener listener);
149149

150+
/**
151+
* Get the current default consumer. @see setDefaultConsumer for rationale.
152+
* @return an interface to the current default consumer.
153+
*/
154+
Consumer getDefaultConsumer();
155+
156+
/**
157+
* Set the current default consumer.
158+
*
159+
* Under certain circumstances it is possible for a channel to receive a
160+
* message delivery which does not match any consumer which is currently
161+
* set up via basicConsume(). This will occur after the following sequence
162+
* of events:
163+
*
164+
* ctag = basicConsume(queue, consumer); // i.e. with explicit acks
165+
* // some deliveries take place but are not acked
166+
* basicCancel(ctag);
167+
* basicRecover(false);
168+
*
169+
* Since requeue is specified to be false in the basicRecover, the spec
170+
* states that the message must be redelivered to "the original recipient"
171+
* - i.e. the same channel / consumer-tag. But the consumer is no longer
172+
* active.
173+
*
174+
* In these circumstances, you can register a default consumer to handle
175+
* such deliveries. If no default consumer is registered an
176+
* IllegalStateException will be thrown when such a delivery arrives.
177+
*
178+
* Most people will not need to use this.
179+
*
180+
* @param consumer the consumer to use, or null indicating "don't use one".
181+
*/
182+
void setDefaultConsumer(Consumer consumer);
183+
150184
/**
151185
* Request specific "quality of service" settings.
152186
*

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ public class ChannelN extends AMQChannel implements com.rabbitmq.client.Channel
9898
*/
9999
public volatile FlowListener flowListener = null;
100100

101+
/** Reference to the currently-active default consumer, or null if there is
102+
* none.
103+
*/
104+
public volatile Consumer defaultConsumer = null;
105+
101106
/**
102107
* Construct a new channel on the given connection with the given
103108
* channel number. Usually not called directly - call
@@ -148,6 +153,19 @@ public void setFlowListener(FlowListener listener) {
148153
flowListener = listener;
149154
}
150155

156+
/** Returns the current default consumer. */
157+
public Consumer getDefaultConsumer() {
158+
return defaultConsumer;
159+
}
160+
161+
/**
162+
* Sets the current default consumer.
163+
* A null argument is interpreted to mean "do not use a default consumer".
164+
*/
165+
public void setDefaultConsumer(Consumer consumer) {
166+
defaultConsumer = consumer;
167+
}
168+
151169
/**
152170
* Protected API - sends a ShutdownSignal to all active consumers.
153171
* @param signal an exception signalling channel shutdown
@@ -228,8 +246,17 @@ public void releaseChannelNumber() {
228246

229247
Consumer callback = _consumers.get(m.consumerTag);
230248
if (callback == null) {
231-
// FIXME: what to do when we get such an unsolicited delivery?
232-
throw new UnsupportedOperationException("FIXME unsolicited delivery");
249+
if (defaultConsumer == null) {
250+
// No handler set. We should blow up as this message
251+
// needs acking, just dropping it is not enough. See bug
252+
// 22587 for discussion.
253+
throw new IllegalStateException("Unsolicited delivery -" +
254+
" see Channel.setDefaultConsumer to handle this" +
255+
" case.");
256+
}
257+
else {
258+
callback = defaultConsumer;
259+
}
233260
}
234261

235262
Envelope envelope = new Envelope(m.deliveryTag,

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public static TestSuite suite() {
6262
suite.addTestSuite(InvalidAcksTx.class);
6363
suite.addTestSuite(BindToDefaultExchange.class);
6464
suite.addTestSuite(UnbindAutoDeleteExchange.class);
65+
suite.addTestSuite(RecoverAfterCancel.class);
6566
return suite;
6667
}
6768
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.QueueingConsumer;
5+
import com.rabbitmq.client.test.BrokerTestCase;
6+
7+
import java.io.IOException;
8+
9+
/**
10+
* After a basic.cancel you can invoke basic.recover{requeue=false} and
11+
* get back delivery(s) with now-obsolete ctags.
12+
*
13+
* See bug 22587.
14+
*/
15+
public class RecoverAfterCancel extends BrokerTestCase {
16+
String queue;
17+
18+
public void createResources() throws IOException {
19+
AMQP.Queue.DeclareOk ok = channel.queueDeclare();
20+
queue = ok.getQueue();
21+
}
22+
23+
public void testRecoverAfterCancel() throws IOException, InterruptedException {
24+
basicPublishVolatile(queue);
25+
QueueingConsumer consumer = new QueueingConsumer(channel);
26+
QueueingConsumer defaultConsumer = new QueueingConsumer(channel);
27+
channel.setDefaultConsumer(defaultConsumer);
28+
String ctag = channel.basicConsume(queue, consumer);
29+
QueueingConsumer.Delivery del = consumer.nextDelivery();
30+
channel.basicCancel(ctag);
31+
channel.basicRecover(false);
32+
33+
// The server will now redeliver us the first message again, with the
34+
// same ctag, but we're not set up to handle it with a standard
35+
// consumer - it should end up with the default one.
36+
37+
QueueingConsumer.Delivery del2 = defaultConsumer.nextDelivery();
38+
39+
assertEquals(new String(del.getBody()), new String(del2.getBody()));
40+
assertFalse(del.getEnvelope().isRedeliver());
41+
assertTrue(del2.getEnvelope().isRedeliver());
42+
}
43+
}

0 commit comments

Comments
 (0)