11
11
// The Original Code is RabbitMQ.
12
12
//
13
13
// The Initial Developer of the Original Code is VMware, Inc.
14
- // Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
14
+ // Copyright (c) 2007-2013 VMware, Inc. All rights reserved.
15
15
//
16
16
17
17
18
18
package com .rabbitmq .client .test .functional ;
19
19
20
- import com .rabbitmq .client .AMQP ;
21
20
import com .rabbitmq .client .GetResponse ;
22
- import com .rabbitmq .client .MessageProperties ;
23
21
import com .rabbitmq .client .test .BrokerTestCase ;
24
22
import java .io .IOException ;
25
23
import java .util .ArrayList ;
26
24
import java .util .HashMap ;
25
+ import java .util .List ;
27
26
import java .util .Map ;
28
27
29
28
/**
30
29
* Test queue max length limit.
31
30
*/
32
31
public class QueueSizeLimit extends BrokerTestCase {
33
32
34
- private final int MAXLENGTH = 5 ;
35
- private final int MAXLENGTH1 = MAXLENGTH + 1 ;
36
- private final int EXPIRY_TIMEOUT = 100 ;
33
+ private final int MAXMAXLENGTH = 3 ;
37
34
private final String q = "queue-maxlength" ;
38
35
39
- @ Override
40
- protected void setUp () throws IOException {
41
- super .setUp ();
42
- channel .confirmSelect ();
43
- }
44
-
45
- AMQP .BasicProperties setupDlx (boolean persistent ) throws IOException {
46
- channel .queueDeclare ("DLQ" , false , true , false , null );
47
- channel .queueBind ("DLQ" , "amq.fanout" , "" );
48
- declareQueue (persistent , true );
49
- AMQP .BasicProperties props = null ;
50
- if (persistent ) {
51
- props = MessageProperties .MINIMAL_PERSISTENT_BASIC ;
36
+ public void testQueueSize () throws IOException , InterruptedException {
37
+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
38
+ setupNonDlxTest (maxLen , false );
39
+ assertHead (maxLen , "msg2" , q );
40
+ deleteQueue (q );
52
41
}
53
- return props ;
54
42
}
55
43
56
- public void testQueueSize () throws IOException , InterruptedException {
57
- declareQueue (false , false );
58
- fill (false , false , false );
59
- syncPublish (null , "msg" + MAXLENGTH1 );
60
- assertHead (MAXLENGTH , "msg2" , q );
44
+ public void testQueueSizeUnacked () throws IOException , InterruptedException {
45
+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
46
+ setupNonDlxTest (maxLen , true );
47
+ assertHead (maxLen > 0 ? 1 : 0 , "msg" + (maxLen + 1 ), q );
48
+ deleteQueue (q );
49
+ }
61
50
}
62
51
63
- public void testQueueUnacked () throws IOException , InterruptedException {
64
- declareQueue (false , false );
65
- fill (false , true , false );
66
- syncPublish (null , "msg" + MAXLENGTH1 );
67
- assertHead (1 , "msg" + MAXLENGTH1 , q );
52
+ public void testQueueSizeDlx () throws IOException , InterruptedException {
53
+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
54
+ setupDlxTest (maxLen , false );
55
+ assertHead (1 , "msg1" , "DLQ" );
56
+ deleteQueue (q );
57
+ deleteQueue ("DLQ" );
58
+ }
68
59
}
69
60
70
- public void testPersistent () throws IOException , InterruptedException {
71
- declareQueue (true , false );
72
- fill (true , true , false );
73
- syncPublish (MessageProperties .MINIMAL_PERSISTENT_BASIC , "msg" + MAXLENGTH1 );
74
- assertHead (1 , "msg" + MAXLENGTH1 , q );
61
+ public void testQueueSizeUnackedDlx () throws IOException , InterruptedException {
62
+ for (int maxLen = 0 ; maxLen <= MAXMAXLENGTH ; maxLen ++){
63
+ setupDlxTest (maxLen , true );
64
+ assertHead (maxLen > 0 ? 0 : 1 , "msg1" , "DLQ" );
65
+ deleteQueue (q );
66
+ deleteQueue ("DLQ" );
67
+ }
75
68
}
76
69
77
- public void testDlxHeadTransient () throws IOException , InterruptedException {
78
- dlxHead (false );
70
+ public void testRequeue () throws IOException , InterruptedException {
71
+ for (int maxLen = 1 ; maxLen <= MAXMAXLENGTH ; maxLen ++) {
72
+ declareQueue (maxLen , false );
73
+ setupRequeueTest (maxLen );
74
+ assertHead (maxLen , "msg1" , q );
75
+ deleteQueue (q );
76
+ }
79
77
}
80
78
81
- public void testDlxTailTransient () throws IOException , InterruptedException {
82
- dlxTail (false );
79
+ public void testRequeueWithDlx () throws IOException , InterruptedException {
80
+ for (int maxLen = 1 ; maxLen <= MAXMAXLENGTH ; maxLen ++) {
81
+ declareQueue (maxLen , true );
82
+ setupRequeueTest (maxLen );
83
+ assertHead (maxLen , "msg1" , q );
84
+ assertHead (maxLen , "msg1" , "DLQ" );
85
+ deleteQueue (q );
86
+ deleteQueue ("DLQ" );
87
+ }
83
88
}
84
89
85
- public void testDlxHeadDurable () throws IOException , InterruptedException {
86
- dlxHead (true );
90
+ private void setupNonDlxTest (int maxLen , boolean unAcked ) throws IOException , InterruptedException {
91
+ declareQueue (maxLen , false );
92
+ fill (maxLen );
93
+ if (unAcked )
94
+ getUnacked (maxLen , null );
95
+ publish ("msg" + (maxLen + 1 ));
87
96
}
88
97
89
- public void testDlxTailDurable () throws IOException , InterruptedException {
90
- dlxTail (true );
98
+ private void setupDlxTest (int maxLen , boolean unAcked ) throws IOException , InterruptedException {
99
+ declareQueue (maxLen , true );
100
+ fill (maxLen );
101
+ if (unAcked )
102
+ getUnacked (maxLen , null );
103
+ publish ("msg" + (maxLen + 1 ));
104
+ try {
105
+ Thread .sleep (100 );
106
+ } catch (InterruptedException _) { }
91
107
}
92
108
93
- public void testMaxlenZero () throws IOException , InterruptedException {
94
- Map <String , Object > args = new HashMap <String , Object >();
95
- args .put ("x-max-length" , 0 );
96
- channel .queueDeclare (q , false , true , true , args );
97
- syncPublish (null , "msg" );
98
- assertNull (channel .basicGet (q , true ));
109
+ private void setupRequeueTest (int maxLen ) throws IOException , InterruptedException {
110
+ List <Long > tags = new ArrayList <Long >(maxLen );
111
+ fill (maxLen );
112
+ getUnacked (maxLen , tags );
113
+ fill (maxLen );
114
+ channel .basicNack (tags .get (0 ), false , true );
115
+ if (maxLen > 1 )
116
+ channel .basicNack (tags .get (maxLen - 1 ), true , true );
99
117
}
100
118
101
- public void testMaxlenOne ( ) throws IOException , InterruptedException {
119
+ private void declareQueue ( int maxLen , boolean dlx ) throws IOException {
102
120
Map <String , Object > args = new HashMap <String , Object >();
103
- args .put ("x-max-length" , 1 );
121
+ args .put ("x-max-length" , maxLen );
122
+ if (dlx ) {
123
+ args .put ("x-dead-letter-exchange" , "amq.fanout" );
124
+ channel .queueDeclare ("DLQ" , false , true , false , null );
125
+ channel .queueBind ("DLQ" , "amq.fanout" , "" );
126
+ }
104
127
channel .queueDeclare (q , false , true , true , args );
105
-
106
- AMQP .BasicProperties props = new AMQP .BasicProperties ();
107
- props .setExpiration ((new Integer (EXPIRY_TIMEOUT )).toString ());
108
- channel .basicPublish ("" , q , props , "msg1" .getBytes ());
109
- channel .basicPublish ("" , q , null , "msg2" .getBytes ());
110
-
111
- channel .waitForConfirms ();
112
- Thread .sleep (EXPIRY_TIMEOUT * 2 );
113
- assertHead (1 , "msg2" , q );
114
128
}
115
129
116
- public void testRequeue () throws IOException , InterruptedException {
117
- declareQueue (false , false );
118
- ArrayList <Long > tags = new ArrayList <Long >(MAXLENGTH );;
119
- fill (false , false , false );
120
- getUnacked (MAXLENGTH , tags );
121
- fill (false , false , false );
122
- channel .basicNack (tags .get (0 ), false , true );
123
- channel .basicNack (tags .get (MAXLENGTH - 1 ), true , true );
124
- assertHead (MAXLENGTH , "msg1" , q );
125
- }
126
-
127
- public void testRequeueWithDlx () throws IOException , InterruptedException {
128
- setupDlx (false );
129
- ArrayList <Long > tags = new ArrayList <Long >(MAXLENGTH );;
130
- fill (false , false , true );
131
- getUnacked (MAXLENGTH , tags );
132
- fill (false , false , true );
133
- channel .basicNack (tags .get (0 ), false , true );
134
- channel .basicNack (tags .get (MAXLENGTH - 1 ), true , true );
135
- assertHead (MAXLENGTH , "msg1" , q );
136
- assertHead (MAXLENGTH , "msg1" , "DLQ" );
137
- }
138
-
139
- public void dlxHead (boolean persistent ) throws IOException , InterruptedException {
140
- AMQP .BasicProperties props = setupDlx (persistent );
141
- fill (persistent , false , true );
142
- syncPublish (props , "msg" + MAXLENGTH1 );
143
- assertEquals (MAXLENGTH , declareQueue (persistent , true ));
144
- assertHead (1 , "msg1" , "DLQ" );
145
- }
146
-
147
- public void dlxTail (boolean persistent ) throws IOException , InterruptedException {
148
- AMQP .BasicProperties props = setupDlx (persistent );
149
- fill (persistent , true , true );
150
- syncPublish (props , "msg" + MAXLENGTH1 );
151
- assertNull (channel .basicGet ("DLQ" , true ));
152
- assertHead (1 , "msg" + MAXLENGTH1 , q );
153
- }
154
-
155
- private void fill (boolean persistent , boolean unAcked , boolean dlx ) throws IOException , InterruptedException {
156
- for (int i =1 ; i <= MAXLENGTH ; i ++){
157
- syncPublish (null , "msg" + i );
158
- if (unAcked ) {
159
- assertNotNull (channel .basicGet (q , false ));
160
- }
161
- }
162
- if (unAcked ) {
163
- assertEquals (0 , declareQueue (persistent , dlx ));
164
- } else {
165
- assertEquals (MAXLENGTH , declareQueue (persistent , dlx ));
130
+ private void fill (int count ) throws IOException , InterruptedException {
131
+ for (int i =1 ; i <= count ; i ++){
132
+ publish ("msg" + i );
166
133
}
167
134
}
168
135
169
- private void syncPublish (AMQP .BasicProperties props , String payload ) throws IOException , InterruptedException {
170
- channel .basicPublish ("" , q , props , payload .getBytes ());
171
- channel .waitForConfirmsOrDie ();
136
+ private void publish (String payload ) throws IOException , InterruptedException {
137
+ basicPublishVolatile (payload .getBytes (), q );
172
138
}
173
139
174
- private int declareQueue (boolean persistent , boolean dlx ) throws IOException {
175
- Map <String , Object > args = new HashMap <String , Object >();
176
- args .put ("x-max-length" , MAXLENGTH );
177
- if (dlx ) {
178
- args .put ("x-dead-letter-exchange" , "amq.fanout" );
179
- }
180
- AMQP .Queue .DeclareOk ok = channel .queueDeclare (q , persistent , true , true , args );
181
- return ok .getMessageCount ();
182
- }
183
-
184
- private void assertHead (int expectedLength , String expectedPayload , String queueName ) throws IOException {
140
+ private void assertHead (int expectedLength , String expectedHeadPayload , String queueName ) throws IOException {
185
141
GetResponse head = channel .basicGet (queueName , true );
186
- assertNotNull (head );
187
- assertEquals (expectedPayload , new String (head .getBody ()));
188
- assertEquals (expectedLength , head .getMessageCount () + 1 );
142
+ if (expectedLength > 0 ) {
143
+ assertNotNull (head );
144
+ assertEquals (expectedHeadPayload , new String (head .getBody ()));
145
+ assertEquals (expectedLength , head .getMessageCount () + 1 );
146
+ } else {
147
+ assertNull (head );
148
+ }
189
149
}
190
150
191
- private void getUnacked (int howMany , ArrayList <Long > acks ) throws IOException {
151
+ private void getUnacked (int howMany , List <Long > acks ) throws IOException {
192
152
for (;howMany > 0 ; howMany --){
193
- acks .add (channel .basicGet (q , false ).getEnvelope ().getDeliveryTag ());
153
+ GetResponse response = channel .basicGet (q , false );
154
+ if (acks != null )
155
+ acks .add (response .getEnvelope ().getDeliveryTag ());
194
156
}
195
157
}
196
- }
158
+ }
0 commit comments