Skip to content

Commit 88ac574

Browse files
author
Simon MacMullen
committed
Merge bug25260
2 parents 3c08f59 + f13cca4 commit 88ac574

File tree

3 files changed

+90
-24
lines changed

3 files changed

+90
-24
lines changed

test/src/com/rabbitmq/client/test/functional/DeadLetterExchange.java

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.rabbitmq.client.test.functional;
22

33
import com.rabbitmq.client.AMQP;
4+
import com.rabbitmq.client.Channel;
45
import com.rabbitmq.client.GetResponse;
56
import com.rabbitmq.client.QueueingConsumer;
67
import com.rabbitmq.client.QueueingConsumer.Delivery;
@@ -16,15 +17,15 @@
1617
import java.util.concurrent.Callable;
1718

1819
public class DeadLetterExchange extends BrokerTestCase {
19-
private static final String DLX = "dead.letter.exchange";
20-
private static final String DLX_ARG = "x-dead-letter-exchange";
21-
private static final String DLX_RK_ARG = "x-dead-letter-routing-key";
22-
private static final String TEST_QUEUE_NAME = "test.queue.dead.letter";
23-
private static final String DLQ = "queue.dlq";
24-
private static final String DLQ2 = "queue.dlq2";
25-
private static final int MSG_COUNT = 10;
26-
private static final int MSG_COUNT_MANY = 1000;
27-
private static final int TTL = 1000;
20+
public static final String DLX = "dead.letter.exchange";
21+
public static final String DLX_ARG = "x-dead-letter-exchange";
22+
public static final String DLX_RK_ARG = "x-dead-letter-routing-key";
23+
public static final String TEST_QUEUE_NAME = "test.queue.dead.letter";
24+
public static final String DLQ = "queue.dlq";
25+
public static final String DLQ2 = "queue.dlq2";
26+
public static final int MSG_COUNT = 10;
27+
public static final int MSG_COUNT_MANY = 1000;
28+
public static final int TTL = 1000;
2829

2930
@Override
3031
protected void createResources() throws IOException {
@@ -348,19 +349,23 @@ private void deadLetterTest(Callable<?> deathTrigger,
348349

349350
deathTrigger.call();
350351

351-
consumeN(DLQ, MSG_COUNT, new WithResponse() {
352-
@SuppressWarnings("unchecked")
353-
public void process(GetResponse getResponse) {
354-
Map<String, Object> headers = getResponse.getProps().getHeaders();
355-
assertNotNull(headers);
356-
ArrayList<Object> death = (ArrayList<Object>)headers.get("x-death");
357-
assertNotNull(death);
358-
assertEquals(1, death.size());
359-
assertDeathReason(death, 0, TEST_QUEUE_NAME, reason,
360-
"amq.direct",
361-
Arrays.asList(new String[]{"test"}));
362-
}
363-
});
352+
consume(channel, reason);
353+
}
354+
355+
public static void consume(final Channel channel, final String reason) throws IOException {
356+
consumeN(channel, DLQ, MSG_COUNT, new WithResponse() {
357+
@SuppressWarnings("unchecked")
358+
public void process(GetResponse getResponse) {
359+
Map<String, Object> headers = getResponse.getProps().getHeaders();
360+
assertNotNull(headers);
361+
ArrayList<Object> death = (ArrayList<Object>) headers.get("x-death");
362+
assertNotNull(death);
363+
assertEquals(1, death.size());
364+
assertDeathReason(death, 0, TEST_QUEUE_NAME, reason,
365+
"amq.direct",
366+
Arrays.asList(new String[]{"test"}));
367+
}
368+
});
364369
}
365370

366371
private void ttlTest(final long ttl) throws Exception {
@@ -455,6 +460,12 @@ private void waitUntil(long when) throws Exception {
455460

456461
private void consumeN(String queue, int n, WithResponse withResponse)
457462
throws IOException
463+
{
464+
consumeN(channel, queue, n, withResponse);
465+
}
466+
467+
private static void consumeN(Channel channel, String queue, int n, WithResponse withResponse)
468+
throws IOException
458469
{
459470
for(int x = 0; x < n; x++) {
460471
GetResponse getResponse =
@@ -469,7 +480,7 @@ private void consumeN(String queue, int n, WithResponse withResponse)
469480
}
470481

471482
@SuppressWarnings("unchecked")
472-
private void assertDeathReason(List<Object> death, int num,
483+
private static void assertDeathReason(List<Object> death, int num,
473484
String queue, String reason,
474485
String exchange, List<String> routingKeys)
475486
{
@@ -489,7 +500,7 @@ private void assertDeathReason(List<Object> death, int num,
489500
}
490501

491502
@SuppressWarnings("unchecked")
492-
private void assertDeathReason(List<Object> death, int num,
503+
private static void assertDeathReason(List<Object> death, int num,
493504
String queue, String reason) {
494505
Map<String, Object> deathHeader =
495506
(Map<String, Object>)death.get(num);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.rabbitmq.client.test.server;
2+
3+
import com.rabbitmq.client.MessageProperties;
4+
import com.rabbitmq.client.test.BrokerTestCase;
5+
import com.rabbitmq.client.test.functional.DeadLetterExchange;
6+
import com.rabbitmq.tools.Host;
7+
8+
import java.io.IOException;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
12+
public class DeadLetterExchangeDurable extends BrokerTestCase {
13+
@Override
14+
protected void createResources() throws IOException {
15+
Map<String, Object> args = new HashMap<String, Object>();
16+
args.put("x-message-ttl", 5000);
17+
args.put("x-dead-letter-exchange", DeadLetterExchange.DLX);
18+
19+
channel.exchangeDeclare(DeadLetterExchange.DLX, "direct", true);
20+
channel.queueDeclare(DeadLetterExchange.DLQ, true, false, false, null);
21+
channel.queueDeclare(DeadLetterExchange.TEST_QUEUE_NAME, true, false, false, args);
22+
channel.queueBind(DeadLetterExchange.TEST_QUEUE_NAME, "amq.direct", "test");
23+
channel.queueBind(DeadLetterExchange.DLQ, DeadLetterExchange.DLX, "test");
24+
}
25+
26+
@Override
27+
protected void releaseResources() throws IOException {
28+
channel.exchangeDelete(DeadLetterExchange.DLX);
29+
channel.queueDelete(DeadLetterExchange.DLQ);
30+
channel.queueDelete(DeadLetterExchange.TEST_QUEUE_NAME);
31+
}
32+
33+
public void testDeadLetterQueueTTLExpiredWhileDown() throws Exception {
34+
for(int x = 0; x < DeadLetterExchange.MSG_COUNT; x++) {
35+
channel.basicPublish("amq.direct", "test", MessageProperties.MINIMAL_PERSISTENT_BASIC, "test message".getBytes());
36+
}
37+
38+
closeConnection();
39+
Host.executeCommand("cd ../rabbitmq-test; make stop-app");
40+
Thread.sleep(5000);
41+
Host.executeCommand("cd ../rabbitmq-test; make start-app");
42+
openConnection();
43+
openChannel();
44+
45+
//This has the effect of waiting for the queue to complete the
46+
//dead lettering. Some raciness remains though since the
47+
//dead-letter publication is async so the 'consume' below may
48+
//reach the dlq before all dead-lettered messages have arrived
49+
//there.
50+
assertNull(basicGet(DeadLetterExchange.TEST_QUEUE_NAME));
51+
52+
DeadLetterExchange.consume(channel, "expired");
53+
}
54+
}

test/src/com/rabbitmq/client/test/server/ServerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public static TestSuite suite() {
3030
public static void add(TestSuite suite) {
3131
suite.addTestSuite(Permissions.class);
3232
suite.addTestSuite(DurableBindingLifecycle.class);
33+
suite.addTestSuite(DeadLetterExchangeDurable.class);
3334
suite.addTestSuite(EffectVisibilityCrossNodeTest.class);
3435
suite.addTestSuite(ExclusiveQueueDurability.class);
3536
suite.addTestSuite(AbsentQueue.class);

0 commit comments

Comments
 (0)