Skip to content

Commit 5346183

Browse files
author
Michael Bridgen
committed
Merge bug14647
2 parents fe6e87f + 64c7cdf commit 5346183

File tree

4 files changed

+133
-0
lines changed

4 files changed

+133
-0
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,17 @@ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, b
457457
*/
458458
void basicAck(long deliveryTag, boolean multiple) throws IOException;
459459

460+
/**
461+
* Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
462+
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
463+
* containing the received message being rejected.
464+
* @see com.rabbitmq.client.AMQP.Basic.Reject
465+
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
466+
* @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
467+
* @throws java.io.IOException if an error is encountered
468+
*/
469+
void basicReject(long deliveryTag, boolean requeue) throws IOException;
470+
460471
/**
461472
* Start a non-nolocal, non-exclusive consumer, with
462473
* explicit acknowledgements required and a server-generated consumerTag.

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,13 @@ public void basicAck(long deliveryTag, boolean multiple)
646646
transmit(new Basic.Ack(deliveryTag, multiple));
647647
}
648648

649+
/** Public API - {@inheritDoc} */
650+
public void basicReject(long deliveryTag, boolean requeue)
651+
throws IOException
652+
{
653+
transmit(new Basic.Reject(deliveryTag, requeue));
654+
}
655+
649656
/** Public API - {@inheritDoc} */
650657
public String basicConsume(String queue, Consumer callback)
651658
throws IOException

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public static TestSuite suite() {
4444
suite.addTestSuite(Routing.class);
4545
suite.addTestSuite(BindingLifecycle.class);
4646
suite.addTestSuite(Recover.class);
47+
suite.addTestSuite(Reject.class);
4748
suite.addTestSuite(TransactionalRecover.class);
4849
suite.addTestSuite(Transactions.class);
4950
suite.addTestSuite(PersistentTransactions.class);
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License at
4+
// http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
8+
// License for the specific language governing rights and limitations
9+
// under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developers of the Original Code are LShift Ltd,
14+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
15+
//
16+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
17+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
18+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
19+
// Technologies LLC, and Rabbit Technologies Ltd.
20+
//
21+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
22+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
23+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
24+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
25+
// (C) 2007-2010 Rabbit Technologies Ltd.
26+
//
27+
// All Rights Reserved.
28+
//
29+
// Contributor(s): ______________________________________.
30+
//
31+
32+
package com.rabbitmq.client.test.functional;
33+
34+
import com.rabbitmq.client.test.BrokerTestCase;
35+
36+
import com.rabbitmq.client.AMQP;
37+
import com.rabbitmq.client.Channel;
38+
import com.rabbitmq.client.Envelope;
39+
import com.rabbitmq.client.GetResponse;
40+
import com.rabbitmq.client.QueueingConsumer;
41+
import com.rabbitmq.client.QueueingConsumer.Delivery;
42+
43+
import java.io.IOException;
44+
import java.util.Arrays;
45+
46+
public class Reject extends BrokerTestCase
47+
{
48+
49+
protected Channel secondaryChannel;
50+
51+
protected void setUp()
52+
throws IOException
53+
{
54+
super.setUp();
55+
secondaryChannel = connection.createChannel();
56+
57+
}
58+
59+
protected void tearDown()
60+
throws IOException
61+
{
62+
if (secondaryChannel != null) {
63+
secondaryChannel.abort();
64+
secondaryChannel = null;
65+
}
66+
super.tearDown();
67+
}
68+
69+
protected long checkDelivery(Delivery d, byte[] msg, boolean redelivered)
70+
{
71+
assertNotNull(d);
72+
return checkDelivery(d.getEnvelope(), d.getBody(), msg, redelivered);
73+
}
74+
75+
protected long checkDelivery(GetResponse r, byte[] msg, boolean redelivered)
76+
{
77+
assertNotNull(r);
78+
return checkDelivery(r.getEnvelope(), r.getBody(), msg, redelivered);
79+
}
80+
81+
protected long checkDelivery(Envelope e, byte[] m,
82+
byte[] msg, boolean redelivered)
83+
{
84+
assertNotNull(e);
85+
assertTrue(Arrays.equals(m, msg));
86+
assertEquals(e.isRedeliver(), redelivered);
87+
return e.getDeliveryTag();
88+
}
89+
90+
public void testReject()
91+
throws IOException, InterruptedException
92+
{
93+
String q = channel.queueDeclare("", false, true, false, null).getQueue();
94+
95+
byte[] m1 = "1".getBytes();
96+
byte[] m2 = "2".getBytes();
97+
98+
basicPublishVolatile(m1, q);
99+
basicPublishVolatile(m2, q);
100+
101+
long tag1 = checkDelivery(channel.basicGet(q, false), m1, false);
102+
long tag2 = checkDelivery(channel.basicGet(q, false), m2, false);
103+
QueueingConsumer c = new QueueingConsumer(secondaryChannel);
104+
String consumerTag = secondaryChannel.basicConsume(q, false, c);
105+
channel.basicReject(tag2, true);
106+
long tag3 = checkDelivery(c.nextDelivery(), m2, true);
107+
secondaryChannel.basicCancel(consumerTag);
108+
secondaryChannel.basicReject(tag3, false);
109+
assertNull(channel.basicGet(q, false));
110+
channel.basicAck(tag1, false);
111+
channel.basicReject(tag3, false);
112+
expectChannelError(AMQP.PRECONDITION_FAILED);
113+
}
114+
}

0 commit comments

Comments
 (0)