Skip to content

Commit c7d0e82

Browse files
author
Emile Joubert
committed
Merged stable into default
2 parents 3f28409 + 4257436 commit c7d0e82

File tree

12 files changed

+459
-42
lines changed

12 files changed

+459
-42
lines changed

src/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -745,34 +745,40 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
745745
/**
746746
* Wait until all messages published since the last call have been
747747
* either ack'd or nack'd by the broker. Note, when called on a
748-
* non-Confirm channel, waitForConfirms returns true immediately.
748+
* non-Confirm channel, waitForConfirms throws an IllegalStateException.
749749
* @return whether all the messages were ack'd (and none were nack'd)
750+
* @throws java.lang.IllegalStateException
750751
*/
751752
boolean waitForConfirms() throws InterruptedException;
752753

753754
/**
754755
* Wait until all messages published since the last call have been
755756
* either ack'd or nack'd by the broker; or until timeout elapses.
756757
* If the timeout expires a TimeoutException is thrown. When
757-
* called on a non-Confirm channel, waitForConfirms returns true
758-
* immediately.
758+
* called on a non-Confirm channel, waitForConfirms throws an
759+
* IllegalStateException.
759760
* @return whether all the messages were ack'd (and none were nack'd)
761+
* @throws java.lang.IllegalStateException
760762
*/
761763
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
762764

763765
/** Wait until all messages published since the last call have
764766
* been either ack'd or nack'd by the broker. If any of the
765767
* messages were nack'd, waitForConfirmsOrDie will throw an
766768
* IOException. When called on a non-Confirm channel, it will
767-
* return immediately. */
768-
void waitForConfirmsOrDie() throws IOException, InterruptedException;
769+
* throw an IllegalStateException.
770+
* @throws java.lang.IllegalStateException
771+
*/
772+
void waitForConfirmsOrDie() throws IOException, InterruptedException;
769773

770774
/** Wait until all messages published since the last call have
771775
* been either ack'd or nack'd by the broker; or until timeout elapses.
772776
* If the timeout expires a TimeoutException is thrown. If any of the
773777
* messages were nack'd, waitForConfirmsOrDie will throw an
774778
* IOException. When called on a non-Confirm channel, it will
775-
* return immediately. */
779+
* throw an IllegalStateException.
780+
* @throws java.lang.IllegalStateException
781+
*/
776782
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
777783

778784
/**

src/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ public boolean waitForConfirms()
175175
/** {@inheritDoc} */
176176
public boolean waitForConfirms(long timeout)
177177
throws InterruptedException, TimeoutException {
178+
if (nextPublishSeqNo == 0L)
179+
throw new IllegalStateException("Confirms not selected");
178180
long startTime = System.currentTimeMillis();
179181
synchronized (unconfirmedSet) {
180182
while (true) {

test/src/com/rabbitmq/client/test/functional/Confirm.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,16 @@ public void handleNack(long seqNo, boolean multiple) {
229229
}
230230
}
231231

232-
public void testWaitForConfirmsNoOp()
232+
public void testWaitForConfirmsWithoutConfirmSelected()
233233
throws IOException, InterruptedException
234234
{
235235
channel = connection.createChannel();
236236
// Don't enable Confirm mode
237237
publish("", "confirm-test", true, false);
238-
channel.waitForConfirmsOrDie(); // Nop
238+
try {
239+
channel.waitForConfirms();
240+
fail("waitForConfirms without confirms selected succeeded");
241+
} catch (IllegalStateException _) {}
239242
}
240243

241244
public void testWaitForConfirmsException()

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public static void add(TestSuite suite) {
5555
suite.addTestSuite(QueueLifecycle.class);
5656
suite.addTestSuite(QueueLease.class);
5757
suite.addTestSuite(QueueExclusivity.class);
58+
suite.addTestSuite(QueueSizeLimit.class);
5859
suite.addTestSuite(InvalidAcks.class);
5960
suite.addTestSuite(InvalidAcksTx.class);
6061
suite.addTestSuite(DefaultExchange.class);
@@ -71,5 +72,6 @@ public static void add(TestSuite suite) {
7172
suite.addTestSuite(InternalExchange.class);
7273
suite.addTestSuite(CcRoutes.class);
7374
suite.addTestSuite(WorkPoolTests.class);
75+
suite.addTestSuite(HeadersExchangeValidation.class);
7476
}
7577
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.test.BrokerTestCase;
6+
7+
import java.io.IOException;
8+
import java.util.HashMap;
9+
10+
public class HeadersExchangeValidation extends BrokerTestCase {
11+
12+
public void testHeadersValidation() throws IOException
13+
{
14+
AMQP.Queue.DeclareOk ok = channel.queueDeclare();
15+
String queue = ok.getQueue();
16+
17+
HashMap<String, Object> arguments = new HashMap<String, Object>();
18+
failBind(queue, arguments);
19+
20+
arguments.put("x-match", 23);
21+
failBind(queue, arguments);
22+
23+
arguments.put("x-match", "all or any I don't mind");
24+
failBind(queue, arguments);
25+
26+
arguments.put("x-match", "all");
27+
succeedBind(queue, arguments);
28+
29+
arguments.put("x-match", "any");
30+
succeedBind(queue, arguments);
31+
}
32+
33+
private void failBind(String queue, HashMap<String, Object> arguments) {
34+
try {
35+
Channel ch = connection.createChannel();
36+
ch.queueBind(queue, "amq.headers", "", arguments);
37+
fail("Expected failure");
38+
} catch (IOException e) {
39+
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
40+
}
41+
}
42+
43+
private void succeedBind(String queue, HashMap<String, Object> arguments) throws IOException {
44+
Channel ch = connection.createChannel();
45+
ch.queueBind(queue, "amq.headers", "", arguments);
46+
ch.close();
47+
}
48+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client.test.functional;
19+
20+
import com.rabbitmq.client.GetResponse;
21+
import com.rabbitmq.client.test.BrokerTestCase;
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* Test queue max length limit.
30+
*/
31+
public class QueueSizeLimit extends BrokerTestCase {
32+
33+
private final int MAXMAXLENGTH = 3;
34+
private final String q = "queue-maxlength";
35+
36+
public void testQueueSize() throws IOException, InterruptedException {
37+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
38+
setupNonDlxTest(maxLen, false);
39+
assertHead(maxLen, "msg2", q);
40+
deleteQueue(q);
41+
}
42+
}
43+
44+
public void testQueueSizeUnacked() throws IOException, InterruptedException {
45+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
46+
setupNonDlxTest(maxLen, true);
47+
assertHead(maxLen > 0 ? 1 : 0, "msg" + (maxLen + 1), q);
48+
deleteQueue(q);
49+
}
50+
}
51+
52+
public void testQueueSizeDlx() throws IOException, InterruptedException {
53+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
54+
setupDlxTest(maxLen, false);
55+
assertHead(1, "msg1", "DLQ");
56+
deleteQueue(q);
57+
deleteQueue("DLQ");
58+
}
59+
}
60+
61+
public void testQueueSizeUnackedDlx() throws IOException, InterruptedException {
62+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
63+
setupDlxTest(maxLen, true);
64+
assertHead(maxLen > 0 ? 0 : 1, "msg1", "DLQ");
65+
deleteQueue(q);
66+
deleteQueue("DLQ");
67+
}
68+
}
69+
70+
public void testRequeue() throws IOException, InterruptedException {
71+
for (int maxLen = 1; maxLen <= MAXMAXLENGTH; maxLen ++) {
72+
declareQueue(maxLen, false);
73+
setupRequeueTest(maxLen);
74+
assertHead(maxLen, "msg1", q);
75+
deleteQueue(q);
76+
}
77+
}
78+
79+
public void testRequeueWithDlx() throws IOException, InterruptedException {
80+
for (int maxLen = 1; maxLen <= MAXMAXLENGTH; maxLen ++) {
81+
declareQueue(maxLen, true);
82+
setupRequeueTest(maxLen);
83+
assertHead(maxLen, "msg1", q);
84+
assertHead(maxLen, "msg1", "DLQ");
85+
deleteQueue(q);
86+
deleteQueue("DLQ");
87+
}
88+
}
89+
90+
private void setupNonDlxTest(int maxLen, boolean unAcked) throws IOException, InterruptedException {
91+
declareQueue(maxLen, false);
92+
fill(maxLen);
93+
if (unAcked) getUnacked(maxLen);
94+
publish("msg" + (maxLen + 1));
95+
}
96+
97+
private void setupDlxTest(int maxLen, boolean unAcked) throws IOException, InterruptedException {
98+
declareQueue(maxLen, true);
99+
fill(maxLen);
100+
if (unAcked) getUnacked(maxLen);
101+
publish("msg" + (maxLen + 1));
102+
try {
103+
Thread.sleep(100);
104+
} catch (InterruptedException _) { }
105+
}
106+
107+
private void setupRequeueTest(int maxLen) throws IOException, InterruptedException {
108+
fill(maxLen);
109+
List<Long> tags = getUnacked(maxLen);
110+
fill(maxLen);
111+
channel.basicNack(tags.get(0), false, true);
112+
if (maxLen > 1)
113+
channel.basicNack(tags.get(maxLen - 1), true, true);
114+
}
115+
116+
private void declareQueue(int maxLen, boolean dlx) throws IOException {
117+
Map<String, Object> args = new HashMap<String, Object>();
118+
args.put("x-max-length", maxLen);
119+
if (dlx) {
120+
args.put("x-dead-letter-exchange", "amq.fanout");
121+
channel.queueDeclare("DLQ", false, true, false, null);
122+
channel.queueBind("DLQ", "amq.fanout", "");
123+
}
124+
channel.queueDeclare(q, false, true, true, args);
125+
}
126+
127+
private void fill(int count) throws IOException, InterruptedException {
128+
for (int i=1; i <= count; i++){
129+
publish("msg" + i);
130+
}
131+
}
132+
133+
private void publish(String payload) throws IOException, InterruptedException {
134+
basicPublishVolatile(payload.getBytes(), q);
135+
}
136+
137+
private void assertHead(int expectedLength, String expectedHeadPayload, String queueName) throws IOException {
138+
GetResponse head = channel.basicGet(queueName, true);
139+
if (expectedLength > 0) {
140+
assertNotNull(head);
141+
assertEquals(expectedHeadPayload, new String(head.getBody()));
142+
assertEquals(expectedLength, head.getMessageCount() + 1);
143+
} else {
144+
assertNull(head);
145+
}
146+
}
147+
148+
private List<Long> getUnacked(int howMany) throws IOException {
149+
List<Long> tags = new ArrayList<Long>(howMany);
150+
for (;howMany > 0; howMany --) {
151+
GetResponse response = channel.basicGet(q, false);
152+
tags.add(response.getEnvelope().getDeliveryTag());
153+
}
154+
return tags;
155+
}
156+
}

0 commit comments

Comments
 (0)