Skip to content

Commit 2e627f3

Browse files
Merge pull request #300 from rabbitmq/rabbitmq-server-1332
Tests for x-first-death-{queue,reason,exchange}
2 parents 6f849cc + c1e2f80 commit 2e627f3

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

src/test/java/com/rabbitmq/client/test/functional/DeadLetterExchange.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ protected void releaseResources() throws IOException {
170170
channel.queueBind(DLQ, DLX, "test");
171171

172172
//measure round-trip latency
173-
QueueMessageConsumer c = new QueueMessageConsumer(channel);
173+
AccumulatingMessageConsumer c = new AccumulatingMessageConsumer(channel);
174174
String cTag = channel.basicConsume(TEST_QUEUE_NAME, true, c);
175175
long start = System.currentTimeMillis();
176176
publish(null, "test");
@@ -536,6 +536,9 @@ public void process(GetResponse getResponse) {
536536
assertNotNull(headers);
537537
ArrayList<Object> death = (ArrayList<Object>) headers.get("x-death");
538538
assertNotNull(death);
539+
assertNotNull(headers.get("x-first-death-queue"));
540+
assertNotNull(headers.get("x-first-death-reason"));
541+
assertNotNull(headers.get("x-first-death-exchange"));
539542
assertEquals(1, death.size());
540543
assertDeathReason(death, 0, TEST_QUEUE_NAME, reason,
541544
"amq.direct",
@@ -562,7 +565,7 @@ private void sleep(long millis) {
562565

563566
/* check that each message arrives within epsilon of the
564567
publication time + TTL + latency */
565-
private void checkPromptArrival(QueueMessageConsumer c,
568+
private void checkPromptArrival(AccumulatingMessageConsumer c,
566569
int count, long latency) throws Exception {
567570
long epsilon = TTL / 10;
568571
for (int i = 0; i < count; i++) {
@@ -697,11 +700,11 @@ private static String randomQueueName() {
697700
return DeadLetterExchange.class.getSimpleName() + "-" + UUID.randomUUID().toString();
698701
}
699702

700-
class QueueMessageConsumer extends DefaultConsumer {
703+
class AccumulatingMessageConsumer extends DefaultConsumer {
701704

702705
BlockingQueue<byte[]> messages = new LinkedBlockingQueue<byte[]>();
703706

704-
public QueueMessageConsumer(Channel channel) {
707+
public AccumulatingMessageConsumer(Channel channel) {
705708
super(channel);
706709
}
707710

0 commit comments

Comments
 (0)