Skip to content

Commit 23d0715

Browse files
committed
merge bug19376 into default
2 parents 5812569 + 7356321 commit 23d0715

File tree

6 files changed

+329
-193
lines changed

6 files changed

+329
-193
lines changed

test/src/com/rabbitmq/client/test/BrokerTestCase.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,12 @@ protected void basicPublishVolatile(String x, String routingKey) throws IOExcept
200200
}
201201

202202
protected void basicPublishVolatile(byte[] msg, String x, String routingKey) throws IOException {
203-
channel.basicPublish(x, routingKey, MessageProperties.TEXT_PLAIN, msg);
203+
basicPublishVolatile(msg, x, routingKey, MessageProperties.TEXT_PLAIN);
204+
}
205+
206+
public void basicPublishVolatile(byte[] msg, String x, String routingKey,
207+
AMQP.BasicProperties properties) throws IOException {
208+
channel.basicPublish(x, routingKey, properties, msg);
204209
}
205210

206211
protected void declareAndBindDurableQueue(String q, String x, String r) throws IOException {

test/src/com/rabbitmq/client/test/functional/FunctionalTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public static void add(TestSuite suite) {
6363
suite.addTestSuite(ConsumerCancelNotificiation.class);
6464
suite.addTestSuite(UnexpectedFrames.class);
6565
suite.addTestSuite(PerQueueTTL.class);
66+
suite.addTestSuite(PerMessageTTL.class);
67+
suite.addTestSuite(PerQueueVsPerMessageTTL.class);
6668
suite.addTestSuite(DeadLetterExchange.class);
6769
suite.addTestSuite(SaslMechanisms.class);
6870
suite.addTestSuite(UserIDHeader.class);
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is VMware, Inc.
14+
// Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
15+
//
16+
17+
18+
package com.rabbitmq.client.test.functional;
19+
20+
import com.rabbitmq.client.AMQP;
21+
import com.rabbitmq.client.MessageProperties;
22+
23+
import java.io.IOException;
24+
25+
public class PerMessageTTL extends TTLHandling {
26+
27+
protected Object sessionTTL;
28+
29+
@Override
30+
protected void publish(String msg) throws IOException {
31+
basicPublishVolatile(msg.getBytes(), TTL_EXCHANGE, TTL_QUEUE_NAME,
32+
MessageProperties.TEXT_PLAIN
33+
.builder()
34+
.expiration(String.valueOf(sessionTTL))
35+
.build());
36+
}
37+
38+
@Override
39+
protected AMQP.Queue.DeclareOk declareQueue(String name, Object ttlValue) throws IOException {
40+
this.sessionTTL = ttlValue;
41+
return this.channel.queueDeclare(name, false, true, false, null);
42+
}
43+
44+
}

test/src/com/rabbitmq/client/test/functional/PerQueueTTL.java

Lines changed: 19 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -18,89 +18,36 @@
1818
package com.rabbitmq.client.test.functional;
1919

2020
import com.rabbitmq.client.AMQP;
21-
import com.rabbitmq.client.GetResponse;
22-
import com.rabbitmq.client.QueueingConsumer;
23-
import com.rabbitmq.client.QueueingConsumer.Delivery;
24-
import com.rabbitmq.client.test.BrokerTestCase;
21+
import com.rabbitmq.client.MessageProperties;
2522

2623
import java.io.IOException;
2724
import java.util.Collections;
2825
import java.util.Map;
2926

30-
/**
31-
*
32-
*/
33-
public class PerQueueTTL extends BrokerTestCase {
27+
public class PerQueueTTL extends TTLHandling {
3428

35-
private static final String TTL_EXCHANGE = "ttl.exchange";
36-
private static final String TTL_ARG = "x-message-ttl";
37-
private static final String TTL_QUEUE_NAME = "queue.ttl";
38-
private static final String TTL_INVALID_QUEUE_NAME = "invalid.queue.ttl";
39-
40-
private static final String[] MSG = {"one", "two", "three"};
41-
42-
@Override
43-
protected void createResources() throws IOException {
44-
this.channel.exchangeDeclare(TTL_EXCHANGE, "direct");
45-
}
29+
protected static final String TTL_ARG = "x-message-ttl";
4630

4731
@Override
48-
protected void releaseResources() throws IOException {
49-
this.channel.exchangeDelete(TTL_EXCHANGE);
50-
}
51-
52-
public void testCreateQueueTTLTypes() throws IOException {
53-
Object[] args = {(byte) 200, (short) 200, 200, 200L};
54-
for (Object ttl : args) {
55-
try {
56-
declareQueue(ttl);
57-
} catch (IOException ex) {
58-
fail("Should be able to use " + ttl.getClass().getName() +
59-
" for x-message-ttl");
60-
}
61-
}
62-
}
63-
64-
public void testTTLAllowZero() throws Exception {
65-
try {
66-
declareQueue(0);
67-
} catch (IOException e) {
68-
fail("Should be able to declare a queue with zero for x-message-ttl");
69-
}
70-
}
71-
72-
public void testCreateQueueWithInvalidTTL() throws Exception {
73-
try {
74-
declareQueue(TTL_INVALID_QUEUE_NAME, "foobar");
75-
fail("Should not be able to declare a queue with a non-long value for x-message-ttl");
76-
} catch (IOException e) {
77-
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
78-
}
79-
}
80-
81-
public void testTTLMustBePositive() throws Exception {
82-
try {
83-
declareQueue(TTL_INVALID_QUEUE_NAME, -10);
84-
fail("Should not be able to declare a queue with negative value for x-message-ttl");
85-
} catch (IOException e) {
86-
checkShutdownSignal(AMQP.PRECONDITION_FAILED, e);
87-
}
32+
protected AMQP.Queue.DeclareOk declareQueue(String name, Object ttlValue) throws IOException {
33+
Map<String, Object> argMap = Collections.singletonMap(TTL_ARG, ttlValue);
34+
return this.channel.queueDeclare(name, false, true, false, argMap);
8835
}
8936

90-
public void testQueueRedeclareEquivalence() throws Exception {
37+
public void testQueueReDeclareEquivalence() throws Exception {
9138
declareQueue(10);
9239
try {
9340
declareQueue(20);
9441
fail("Should not be able to redeclare with different x-message-ttl");
95-
} catch (IOException ex) {
42+
} catch(IOException ex) {
9643
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ex);
9744
}
9845
}
9946

100-
public void testQueueRedeclareSemanticEquivalence() throws Exception {
101-
declareQueue((byte) 10);
47+
public void testQueueReDeclareSemanticEquivalence() throws Exception {
48+
declareQueue((byte)10);
10249
declareQueue(10);
103-
declareQueue((short) 10);
50+
declareQueue((short)10);
10451
declareQueue(10L);
10552
}
10653

@@ -109,136 +56,16 @@ public void testQueueReDeclareSemanticNonEquivalence() throws Exception {
10956
try {
11057
declareQueue(10.0);
11158
fail("Should not be able to redeclare with x-message-ttl argument of different type");
112-
} catch (IOException ex) {
59+
} catch(IOException ex) {
11360
checkShutdownSignal(AMQP.PRECONDITION_FAILED, ex);
11461
}
11562
}
11663

117-
/*
118-
* Test messages expire when using basic get.
119-
*/
120-
public void testPublishAndGetWithExpiry() throws Exception {
121-
declareAndBindQueue(200);
122-
123-
publish(MSG[0]);
124-
Thread.sleep(150);
125-
126-
publish(MSG[1]);
127-
Thread.sleep(100);
128-
129-
publish(MSG[2]);
130-
131-
assertEquals(MSG[1], get());
132-
assertEquals(MSG[2], get());
133-
}
134-
135-
/*
136-
* Test get expiry for messages sent under a transaction
137-
*/
138-
public void testTransactionalPublishWithGet() throws Exception {
139-
declareAndBindQueue(100);
140-
141-
this.channel.txSelect();
142-
143-
publish(MSG[0]);
144-
Thread.sleep(150);
145-
146-
publish(MSG[1]);
147-
this.channel.txCommit();
148-
Thread.sleep(50);
149-
150-
assertEquals(MSG[0], get());
151-
Thread.sleep(80);
152-
153-
assertNull(get());
64+
protected void publishWithExpiration(String msg, Object sessionTTL) throws IOException {
65+
basicPublishVolatile(msg.getBytes(), TTL_EXCHANGE, TTL_QUEUE_NAME,
66+
MessageProperties.TEXT_PLAIN
67+
.builder()
68+
.expiration(String.valueOf(sessionTTL))
69+
.build());
15470
}
155-
156-
/*
157-
* Test expiry of re-queued messages
158-
*/
159-
public void testExpiryWithRequeue() throws Exception {
160-
declareAndBindQueue(200);
161-
162-
publish(MSG[0]);
163-
Thread.sleep(100);
164-
publish(MSG[1]);
165-
publish(MSG[2]);
166-
167-
expectBodyAndRemainingMessages(MSG[0], 2);
168-
expectBodyAndRemainingMessages(MSG[1], 1);
169-
170-
closeChannel();
171-
openChannel();
172-
173-
Thread.sleep(150);
174-
expectBodyAndRemainingMessages(MSG[1], 1);
175-
expectBodyAndRemainingMessages(MSG[2], 0);
176-
}
177-
178-
/*
179-
* Test expiry of re-queued messages after being consumed instantly
180-
*/
181-
public void testExpiryWithRequeueAfterConsume() throws Exception {
182-
declareAndBindQueue(100);
183-
QueueingConsumer c = new QueueingConsumer(channel);
184-
channel.basicConsume(TTL_QUEUE_NAME, c);
185-
186-
publish(MSG[0]);
187-
assertNotNull(c.nextDelivery(100));
188-
189-
closeChannel();
190-
Thread.sleep(150);
191-
openChannel();
192-
193-
assertNull("Requeued message not expired", get());
194-
}
195-
196-
public void testZeroTTLDelivery() throws Exception {
197-
declareAndBindQueue(0);
198-
199-
// when there is no consumer, message should expire
200-
publish(MSG[0]);
201-
assertNull(get());
202-
203-
// when there is a consumer, message should be delivered
204-
QueueingConsumer c = new QueueingConsumer(channel);
205-
channel.basicConsume(TTL_QUEUE_NAME, c);
206-
publish(MSG[0]);
207-
Delivery d = c.nextDelivery(100);
208-
assertNotNull(d);
209-
210-
// re-queued messages should expire
211-
channel.basicReject(d.getEnvelope().getDeliveryTag(), true);
212-
assertNull(c.nextDelivery(100));
213-
}
214-
215-
private String get() throws IOException {
216-
GetResponse response = basicGet(TTL_QUEUE_NAME);
217-
return response == null ? null : new String(response.getBody());
218-
}
219-
220-
private void publish(String msg) throws IOException {
221-
basicPublishVolatile(msg.getBytes(), TTL_EXCHANGE, TTL_QUEUE_NAME);
222-
}
223-
224-
private void declareAndBindQueue(Object ttlValue) throws IOException {
225-
declareQueue(ttlValue);
226-
this.channel.queueBind(TTL_QUEUE_NAME, TTL_EXCHANGE, TTL_QUEUE_NAME);
227-
}
228-
229-
private AMQP.Queue.DeclareOk declareQueue(Object ttlValue) throws IOException {
230-
return declareQueue(TTL_QUEUE_NAME, ttlValue);
231-
}
232-
233-
private AMQP.Queue.DeclareOk declareQueue(String name, Object ttlValue) throws IOException {
234-
Map<String, Object> argMap = Collections.singletonMap(TTL_ARG, ttlValue);
235-
return this.channel.queueDeclare(name, false, true, false, argMap);
236-
}
237-
238-
private void expectBodyAndRemainingMessages(String body, int messagesLeft) throws IOException {
239-
GetResponse response = channel.basicGet(TTL_QUEUE_NAME, false);
240-
assertEquals(body, new String(response.getBody()));
241-
assertEquals(messagesLeft, response.getMessageCount());
242-
}
243-
244-
}
71+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.rabbitmq.client.test.functional;
2+
3+
import com.rabbitmq.client.AMQP;
4+
5+
import java.io.IOException;
6+
import java.util.Collections;
7+
import java.util.Map;
8+
9+
public class PerQueueVsPerMessageTTL extends PerMessageTTL {
10+
11+
public void testSmallerPerQueueExpiryWins() throws IOException, InterruptedException {
12+
declareAndBindQueue(10);
13+
this.sessionTTL = 1000;
14+
15+
publish("message1");
16+
17+
Thread.sleep(100);
18+
19+
assertNull("per-queue ttl should have removed message after 10ms!", get());
20+
}
21+
22+
@Override
23+
protected AMQP.Queue.DeclareOk declareQueue(String name, Object ttlValue) throws IOException {
24+
final Object mappedTTL = (ttlValue instanceof String &&
25+
((String) ttlValue).contains("foobar")) ?
26+
ttlValue : longValue(ttlValue) * 2;
27+
this.sessionTTL = ttlValue;
28+
Map<String, Object> argMap = Collections.singletonMap(PerQueueTTL.TTL_ARG, mappedTTL);
29+
return this.channel.queueDeclare(name, false, true, false, argMap);
30+
}
31+
32+
private Long longValue(final Object ttl) {
33+
if (ttl instanceof Short) {
34+
return ((Short)ttl).longValue();
35+
} else if (ttl instanceof Integer) {
36+
return ((Integer)ttl).longValue();
37+
} else if (ttl instanceof Long) {
38+
return (Long) ttl;
39+
} else {
40+
throw new IllegalArgumentException("ttl not of expected type");
41+
}
42+
}
43+
44+
}

0 commit comments

Comments
 (0)