30
30
*/
31
31
public class PerQueueTTL extends BrokerTestCase {
32
32
33
- private static final String TTL_EXCHANGE = "ttl.exchange" ;
34
-
35
- private static final String TTL_ARG = "x-message-ttl" ;
36
-
37
- private static final String TTL_QUEUE_NAME = "queue.ttl" ;
38
-
33
+ private static final String TTL_EXCHANGE = "ttl.exchange" ;
34
+ private static final String TTL_ARG = "x-message-ttl" ;
35
+ private static final String TTL_QUEUE_NAME = "queue.ttl" ;
39
36
private static final String TTL_INVALID_QUEUE_NAME = "invalid.queue.ttl" ;
40
37
38
+ private static final String [] MSG = {"one" , "two" , "three" };
39
+
41
40
@ Override
42
41
protected void createResources () throws IOException {
43
42
this .channel .exchangeDeclare (TTL_EXCHANGE , "direct" );
@@ -48,33 +47,15 @@ protected void releaseResources() throws IOException {
48
47
this .channel .exchangeDelete (TTL_EXCHANGE );
49
48
}
50
49
51
- public void testCreateQueueWithByteTTL () throws IOException {
52
- try {
53
- declareQueue (TTL_QUEUE_NAME , (byte )200 );
54
- } catch (IOException ex ) {
55
- fail ("Should be able to use byte for queue TTL" );
56
- }
57
- }
58
- public void testCreateQueueWithShortTTL () throws IOException {
59
- try {
60
- declareQueue (TTL_QUEUE_NAME , (short )200 );
61
- } catch (IOException ex ) {
62
- fail ("Should be able to use short for queue TTL" );
63
- }
64
- }
65
- public void testCreateQueueWithIntTTL () throws IOException {
66
- try {
67
- declareQueue (TTL_QUEUE_NAME , 200 );
68
- } catch (IOException ex ) {
69
- fail ("Should be able to use int for queue TTL" );
70
- }
71
- }
72
-
73
- public void testCreateQueueWithLongTTL () throws IOException {
74
- try {
75
- declareQueue (TTL_QUEUE_NAME , 200L );
76
- } catch (IOException ex ) {
77
- fail ("Should be able to use long for queue TTL" );
50
+ public void testCreateQueueTTLTypes () throws IOException {
51
+ Object [] args = { (byte )200 , (short )200 , 200 , 200L };
52
+ for (Object ttl : args ) {
53
+ try {
54
+ declareQueue (ttl );
55
+ } catch (IOException ex ) {
56
+ fail ("Should be able to use " + ttl .getClass ().getName () +
57
+ " for x-message-ttl" );
58
+ }
78
59
}
79
60
}
80
61
@@ -99,34 +80,34 @@ public void testTTLMustBeGtZero() throws Exception {
99
80
public void testTTLMustBePositive () throws Exception {
100
81
try {
101
82
declareQueue (TTL_INVALID_QUEUE_NAME , -10 );
102
- fail ("Should not be able to declare a queue with zero for x-message-ttl" );
83
+ fail ("Should not be able to declare a queue with negative value for x-message-ttl" );
103
84
} catch (IOException e ) {
104
85
checkShutdownSignal (AMQP .PRECONDITION_FAILED , e );
105
86
}
106
87
}
107
88
108
89
public void testQueueRedeclareEquivalence () throws Exception {
109
- declareQueue (TTL_QUEUE_NAME , 10 );
90
+ declareQueue (10 );
110
91
try {
111
- declareQueue (TTL_QUEUE_NAME , 20 );
112
- fail ("Should not be able to redeclare with different TTL " );
92
+ declareQueue (20 );
93
+ fail ("Should not be able to redeclare with different x-message-ttl " );
113
94
} catch (IOException ex ) {
114
95
checkShutdownSignal (AMQP .PRECONDITION_FAILED , ex );
115
96
}
116
97
}
117
98
118
99
public void testQueueRedeclareSemanticEquivalence () throws Exception {
119
- declareQueue (TTL_QUEUE_NAME , (byte )10 );
120
- declareQueue (TTL_QUEUE_NAME , 10 );
121
- declareQueue (TTL_QUEUE_NAME , (short )10 );
122
- declareQueue (TTL_QUEUE_NAME , 10L );
100
+ declareQueue ((byte )10 );
101
+ declareQueue (10 );
102
+ declareQueue ((short )10 );
103
+ declareQueue (10L );
123
104
}
124
105
125
106
public void testQueueRedeclareSemanticNonEquivalence () throws Exception {
126
- declareQueue (TTL_QUEUE_NAME , 10 );
107
+ declareQueue (10 );
127
108
try {
128
- declareQueue (TTL_QUEUE_NAME , 10.0 );
129
- fail ("Should not be able to redeclare with argument of different type" );
109
+ declareQueue (10.0 );
110
+ fail ("Should not be able to redeclare with x-message-ttl argument of different type" );
130
111
} catch (IOException ex ) {
131
112
checkShutdownSignal (AMQP .PRECONDITION_FAILED , ex );
132
113
}
@@ -136,48 +117,37 @@ public void testQueueRedeclareSemanticNonEquivalence() throws Exception {
136
117
* Test messages expire when using basic get.
137
118
*/
138
119
public void testPublishAndGetWithExpiry () throws Exception {
139
- long ttl = 2000 ;
140
- declareQueue (TTL_QUEUE_NAME , ttl );
141
- this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
120
+ declareAndBindQueue (2000 );
142
121
143
- byte [] msg1 = "one" .getBytes ();
144
- byte [] msg2 = "two" .getBytes ();
145
- byte [] msg3 = "three" .getBytes ();
146
-
147
- basicPublishVolatile (msg1 , TTL_EXCHANGE , TTL_QUEUE_NAME );
122
+ publish (MSG [0 ]);
148
123
Thread .sleep (1500 );
149
124
150
- basicPublishVolatile ( msg2 , TTL_EXCHANGE , TTL_QUEUE_NAME );
125
+ publish ( MSG [ 1 ] );
151
126
Thread .sleep (1000 );
152
127
153
- basicPublishVolatile ( msg3 , TTL_EXCHANGE , TTL_QUEUE_NAME );
128
+ publish ( MSG [ 2 ] );
154
129
155
- assertEquals ("two" , new String (get ()));
156
- assertEquals ("three" , new String (get ()));
130
+ assertEquals (MSG [ 1 ] , new String (get ()));
131
+ assertEquals (MSG [ 2 ] , new String (get ()));
157
132
158
133
}
159
-
134
+
160
135
/*
161
136
* Test get expiry for messages sent under a transaction
162
137
*/
163
138
public void testTransactionalPublishWithGet () throws Exception {
164
- long ttl = 1000 ;
165
- declareQueue (TTL_QUEUE_NAME , ttl );
166
- this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
167
-
168
- byte [] msg1 = "one" .getBytes ();
169
- byte [] msg2 = "two" .getBytes ();
139
+ declareAndBindQueue (1000 );
170
140
171
141
this .channel .txSelect ();
172
142
173
- basicPublishVolatile ( msg1 , TTL_EXCHANGE , TTL_QUEUE_NAME );
143
+ publish ( MSG [ 0 ] );
174
144
Thread .sleep (1500 );
175
145
176
- basicPublishVolatile ( msg2 , TTL_EXCHANGE , TTL_QUEUE_NAME );
146
+ publish ( MSG [ 1 ] );
177
147
this .channel .txCommit ();
178
148
Thread .sleep (500 );
179
149
180
- assertEquals ("one" , new String (get ()));
150
+ assertEquals (MSG [ 0 ] , new String (get ()));
181
151
Thread .sleep (800 );
182
152
183
153
assertNull (get ());
@@ -187,28 +157,22 @@ public void testTransactionalPublishWithGet() throws Exception {
187
157
* Test expiry of requeued messages
188
158
*/
189
159
public void testExpiryWithRequeue () throws Exception {
190
- long ttl = 1000 ;
191
- declareQueue (TTL_QUEUE_NAME , ttl );
192
- this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
193
-
194
- byte [] msg1 = "one" .getBytes ();
195
- byte [] msg2 = "two" .getBytes ();
196
- byte [] msg3 = "three" .getBytes ();
160
+ declareAndBindQueue (1000 );
197
161
198
- basicPublishVolatile ( msg1 , TTL_EXCHANGE , TTL_QUEUE_NAME );
162
+ publish ( MSG [ 0 ] );
199
163
Thread .sleep (500 );
200
- basicPublishVolatile ( msg2 , TTL_EXCHANGE , TTL_QUEUE_NAME );
201
- basicPublishVolatile ( msg3 , TTL_EXCHANGE , TTL_QUEUE_NAME );
164
+ publish ( MSG [ 1 ] );
165
+ publish ( MSG [ 2 ] );
202
166
203
- expectBodyAndRemainingMessages ("one" , 2 );
204
- expectBodyAndRemainingMessages ("two" , 1 );
167
+ expectBodyAndRemainingMessages (MSG [ 0 ] , 2 );
168
+ expectBodyAndRemainingMessages (MSG [ 1 ] , 1 );
205
169
206
170
closeChannel ();
207
171
openChannel ();
208
172
209
173
Thread .sleep (600 );
210
- expectBodyAndRemainingMessages ("two" , 1 );
211
- expectBodyAndRemainingMessages ("three" , 0 );
174
+ expectBodyAndRemainingMessages (MSG [ 1 ] , 1 );
175
+ expectBodyAndRemainingMessages (MSG [ 2 ] , 0 );
212
176
}
213
177
214
178
@@ -220,6 +184,19 @@ private byte[] get() throws IOException {
220
184
return response .getBody ();
221
185
}
222
186
187
+ private void publish (String msg ) throws IOException {
188
+ basicPublishVolatile (msg .getBytes (), TTL_EXCHANGE , TTL_QUEUE_NAME );
189
+ }
190
+
191
+ private void declareAndBindQueue (Object ttlValue ) throws IOException {
192
+ declareQueue (ttlValue );
193
+ this .channel .queueBind (TTL_QUEUE_NAME , TTL_EXCHANGE , TTL_QUEUE_NAME );
194
+ }
195
+
196
+ private AMQP .Queue .DeclareOk declareQueue (Object ttlValue ) throws IOException {
197
+ return declareQueue (TTL_QUEUE_NAME , ttlValue );
198
+ }
199
+
223
200
private AMQP .Queue .DeclareOk declareQueue (String name , Object ttlValue ) throws IOException {
224
201
Map <String , Object > argMap = Collections .singletonMap (TTL_ARG , ttlValue );
225
202
return this .channel .queueDeclare (name , false , true , false , argMap );
0 commit comments