|
1 |
| -// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. |
| 1 | +// Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. |
2 | 2 | //
|
3 | 3 | // This software, the RabbitMQ Java client library, is triple-licensed under the
|
4 | 4 | // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
|
|
26 | 26 | import java.util.*;
|
27 | 27 | import java.util.concurrent.*;
|
28 | 28 |
|
| 29 | +import static com.rabbitmq.client.test.TestUtils.safeDelete; |
29 | 30 | import static com.rabbitmq.client.test.TestUtils.waitAtMost;
|
30 | 31 | import static java.time.Duration.ofSeconds;
|
31 | 32 | import static org.junit.Assert.*;
|
32 | 33 |
|
33 | 34 | public class DeadLetterExchange extends BrokerTestCase {
|
| 35 | + |
34 | 36 | public static final String DLX = "dead.letter.exchange";
|
35 | 37 | private static final String DLX_ARG = "x-dead-letter-exchange";
|
36 | 38 | private static final String DLX_RK_ARG = "x-dead-letter-routing-key";
|
@@ -344,21 +346,28 @@ protected void releaseResources() throws IOException {
|
344 | 346 | // messages in pure-expiry cycles. So we just need to test that
|
345 | 347 | // non-pure-expiry cycles do not drop messages.
|
346 | 348 |
|
347 |
| - declareQueue("queue1", "", "queue2", null, 1); |
348 |
| - declareQueue("queue2", "", "queue1", null, 0); |
349 |
| - |
350 |
| - channel.basicPublish("", "queue1", MessageProperties.BASIC, "".getBytes()); |
351 |
| - final CountDownLatch latch = new CountDownLatch(10); |
352 |
| - channel.basicConsume("queue2", false, |
353 |
| - new DefaultConsumer(channel) { |
354 |
| - @Override |
355 |
| - public void handleDelivery(String consumerTag, Envelope envelope, |
356 |
| - AMQP.BasicProperties properties, byte[] body) throws IOException { |
357 |
| - channel.basicReject(envelope.getDeliveryTag(), false); |
358 |
| - latch.countDown(); |
359 |
| - } |
360 |
| - }); |
361 |
| - assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| 349 | + String queue1 = generateQueueName(); |
| 350 | + String queue2 = generateQueueName(); |
| 351 | + try { |
| 352 | + declareQueue(queue1, "", queue2, null, 1); |
| 353 | + declareQueue(queue2, "", queue1, null, 0); |
| 354 | + |
| 355 | + channel.basicPublish("", queue1, MessageProperties.BASIC, "".getBytes()); |
| 356 | + final CountDownLatch latch = new CountDownLatch(10); |
| 357 | + channel.basicConsume(queue2, false, |
| 358 | + new DefaultConsumer(channel) { |
| 359 | + @Override |
| 360 | + public void handleDelivery(String consumerTag, Envelope envelope, |
| 361 | + AMQP.BasicProperties properties, byte[] body) throws IOException { |
| 362 | + channel.basicReject(envelope.getDeliveryTag(), false); |
| 363 | + latch.countDown(); |
| 364 | + } |
| 365 | + }); |
| 366 | + assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| 367 | + } finally { |
| 368 | + safeDelete(connection, queue1); |
| 369 | + safeDelete(connection, queue2); |
| 370 | + } |
362 | 371 | }
|
363 | 372 |
|
364 | 373 | @SuppressWarnings("unchecked")
|
|
0 commit comments