Skip to content

Commit f515145

Browse files
committed
merge bug19375 into default
2 parents c75fd1a + 0fd8911 commit f515145

File tree

2 files changed

+157
-0
lines changed

2 files changed

+157
-0
lines changed

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public static void add(TestSuite suite) {
5555
suite.addTestSuite(QueueLifecycle.class);
5656
suite.addTestSuite(QueueLease.class);
5757
suite.addTestSuite(QueueExclusivity.class);
58+
suite.addTestSuite(QueueSizeLimit.class);
5859
suite.addTestSuite(InvalidAcks.class);
5960
suite.addTestSuite(InvalidAcksTx.class);
6061
suite.addTestSuite(DefaultExchange.class);
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client.test.functional;
19+
20+
import com.rabbitmq.client.GetResponse;
21+
import com.rabbitmq.client.test.BrokerTestCase;
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
/**
29+
* Test queue max length limit.
30+
*/
31+
public class QueueSizeLimit extends BrokerTestCase {
32+
33+
private final int MAXMAXLENGTH = 3;
34+
private final String q = "queue-maxlength";
35+
36+
public void testQueueSize() throws IOException, InterruptedException {
37+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
38+
setupNonDlxTest(maxLen, false);
39+
assertHead(maxLen, "msg2", q);
40+
deleteQueue(q);
41+
}
42+
}
43+
44+
public void testQueueSizeUnacked() throws IOException, InterruptedException {
45+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
46+
setupNonDlxTest(maxLen, true);
47+
assertHead(maxLen > 0 ? 1 : 0, "msg" + (maxLen + 1), q);
48+
deleteQueue(q);
49+
}
50+
}
51+
52+
public void testQueueSizeDlx() throws IOException, InterruptedException {
53+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
54+
setupDlxTest(maxLen, false);
55+
assertHead(1, "msg1", "DLQ");
56+
deleteQueue(q);
57+
deleteQueue("DLQ");
58+
}
59+
}
60+
61+
public void testQueueSizeUnackedDlx() throws IOException, InterruptedException {
62+
for (int maxLen = 0; maxLen <= MAXMAXLENGTH; maxLen ++){
63+
setupDlxTest(maxLen, true);
64+
assertHead(maxLen > 0 ? 0 : 1, "msg1", "DLQ");
65+
deleteQueue(q);
66+
deleteQueue("DLQ");
67+
}
68+
}
69+
70+
public void testRequeue() throws IOException, InterruptedException {
71+
for (int maxLen = 1; maxLen <= MAXMAXLENGTH; maxLen ++) {
72+
declareQueue(maxLen, false);
73+
setupRequeueTest(maxLen);
74+
assertHead(maxLen, "msg1", q);
75+
deleteQueue(q);
76+
}
77+
}
78+
79+
public void testRequeueWithDlx() throws IOException, InterruptedException {
80+
for (int maxLen = 1; maxLen <= MAXMAXLENGTH; maxLen ++) {
81+
declareQueue(maxLen, true);
82+
setupRequeueTest(maxLen);
83+
assertHead(maxLen, "msg1", q);
84+
assertHead(maxLen, "msg1", "DLQ");
85+
deleteQueue(q);
86+
deleteQueue("DLQ");
87+
}
88+
}
89+
90+
private void setupNonDlxTest(int maxLen, boolean unAcked) throws IOException, InterruptedException {
91+
declareQueue(maxLen, false);
92+
fill(maxLen);
93+
if (unAcked) getUnacked(maxLen);
94+
publish("msg" + (maxLen + 1));
95+
}
96+
97+
private void setupDlxTest(int maxLen, boolean unAcked) throws IOException, InterruptedException {
98+
declareQueue(maxLen, true);
99+
fill(maxLen);
100+
if (unAcked) getUnacked(maxLen);
101+
publish("msg" + (maxLen + 1));
102+
try {
103+
Thread.sleep(100);
104+
} catch (InterruptedException _) { }
105+
}
106+
107+
private void setupRequeueTest(int maxLen) throws IOException, InterruptedException {
108+
fill(maxLen);
109+
List<Long> tags = getUnacked(maxLen);
110+
fill(maxLen);
111+
channel.basicNack(tags.get(0), false, true);
112+
if (maxLen > 1)
113+
channel.basicNack(tags.get(maxLen - 1), true, true);
114+
}
115+
116+
private void declareQueue(int maxLen, boolean dlx) throws IOException {
117+
Map<String, Object> args = new HashMap<String, Object>();
118+
args.put("x-max-length", maxLen);
119+
if (dlx) {
120+
args.put("x-dead-letter-exchange", "amq.fanout");
121+
channel.queueDeclare("DLQ", false, true, false, null);
122+
channel.queueBind("DLQ", "amq.fanout", "");
123+
}
124+
channel.queueDeclare(q, false, true, true, args);
125+
}
126+
127+
private void fill(int count) throws IOException, InterruptedException {
128+
for (int i=1; i <= count; i++){
129+
publish("msg" + i);
130+
}
131+
}
132+
133+
private void publish(String payload) throws IOException, InterruptedException {
134+
basicPublishVolatile(payload.getBytes(), q);
135+
}
136+
137+
private void assertHead(int expectedLength, String expectedHeadPayload, String queueName) throws IOException {
138+
GetResponse head = channel.basicGet(queueName, true);
139+
if (expectedLength > 0) {
140+
assertNotNull(head);
141+
assertEquals(expectedHeadPayload, new String(head.getBody()));
142+
assertEquals(expectedLength, head.getMessageCount() + 1);
143+
} else {
144+
assertNull(head);
145+
}
146+
}
147+
148+
private List<Long> getUnacked(int howMany) throws IOException {
149+
List<Long> tags = new ArrayList<Long>(howMany);
150+
for (;howMany > 0; howMany --) {
151+
GetResponse response = channel.basicGet(q, false);
152+
tags.add(response.getEnvelope().getDeliveryTag());
153+
}
154+
return tags;
155+
}
156+
}

0 commit comments

Comments
 (0)