Skip to content

Commit 64ccaf9

Browse files
committed
Disable dead-lettering republishing test for 3.13+
A test that publishes a message with "x-death" header already set and expects to see those appended in case of another dead lettering. After message container (3.13+), we avoid to mutate messages, so dead letter events are appended to the header if the message stays on the broker (e.g. is nacked, then expires). (cherry picked from commit 19f78c1)
1 parent 90cf4c5 commit 64ccaf9

File tree

1 file changed

+84
-82
lines changed

1 file changed

+84
-82
lines changed

src/test/java/com/rabbitmq/client/test/functional/DeadLetterExchange.java

Lines changed: 84 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -410,95 +410,97 @@ public void handleDelivery(String consumerTag, Envelope envelope,
410410

411411
@SuppressWarnings("unchecked")
412412
@Test public void republish() throws Exception {
413-
Map<String, Object> args = new HashMap<>();
414-
args.put("x-message-ttl", 100);
415-
declareQueue(TEST_QUEUE_NAME, DLX, null, args);
416-
channel.queueBind(TEST_QUEUE_NAME, "amq.direct", "test");
417-
channel.queueBind(DLQ, DLX, "test");
418-
publishN(1);
419-
420-
AtomicReference<GetResponse> responseRefeference = new AtomicReference<>();
421-
waitAtMost(
422-
ofSeconds(1),
423-
() -> {
424-
GetResponse response = channel.basicGet(DLQ, true);
425-
responseRefeference.set(response);
426-
return responseRefeference.get() != null;
427-
});
428-
GetResponse getResponse = responseRefeference.get();
429-
assertNotNull(getResponse, "Message not dead-lettered");
430-
assertEquals("test message", new String(getResponse.getBody()));
431-
BasicProperties props = getResponse.getProps();
432-
Map<String, Object> headers = props.getHeaders();
433-
assertNotNull(headers);
434-
ArrayList<Object> death = (ArrayList<Object>) headers.get("x-death");
435-
assertNotNull(death);
436-
assertEquals(1, death.size());
437-
assertDeathReason(death, 0, TEST_QUEUE_NAME, "expired", "amq.direct",
413+
if (TestUtils.atMost312(connection)) {
414+
Map<String, Object> args = new HashMap<>();
415+
args.put("x-message-ttl", 100);
416+
declareQueue(TEST_QUEUE_NAME, DLX, null, args);
417+
channel.queueBind(TEST_QUEUE_NAME, "amq.direct", "test");
418+
channel.queueBind(DLQ, DLX, "test");
419+
publishN(1);
420+
421+
AtomicReference<GetResponse> responseRefeference = new AtomicReference<>();
422+
waitAtMost(
423+
ofSeconds(1),
424+
() -> {
425+
GetResponse response = channel.basicGet(DLQ, true);
426+
responseRefeference.set(response);
427+
return responseRefeference.get() != null;
428+
});
429+
GetResponse getResponse = responseRefeference.get();
430+
assertNotNull(getResponse, "Message not dead-lettered");
431+
assertEquals("test message", new String(getResponse.getBody()));
432+
BasicProperties props = getResponse.getProps();
433+
Map<String, Object> headers = props.getHeaders();
434+
assertNotNull(headers);
435+
ArrayList<Object> death = (ArrayList<Object>) headers.get("x-death");
436+
assertNotNull(death);
437+
assertEquals(1, death.size());
438+
assertDeathReason(death, 0, TEST_QUEUE_NAME, "expired", "amq.direct",
438439
Collections.singletonList("test"));
439440

440-
// Make queue zero length
441-
args = new HashMap<>();
442-
args.put("x-max-length", 0);
443-
channel.queueDelete(TEST_QUEUE_NAME);
444-
declareQueue(TEST_QUEUE_NAME, DLX, null, args);
445-
channel.queueBind(TEST_QUEUE_NAME, "amq.direct", "test");
446-
447-
sleep(100);
448-
//Queueing second time with same props
449-
channel.basicPublish("amq.direct", "test",
450-
new AMQP.BasicProperties.Builder()
451-
.headers(headers)
452-
.build(), "test message".getBytes());
453-
454-
responseRefeference.set(null);
455-
waitAtMost(
456-
ofSeconds(1),
457-
() -> {
458-
GetResponse response = channel.basicGet(DLQ, true);
459-
responseRefeference.set(response);
460-
return responseRefeference.get() != null;
461-
});
462-
getResponse = responseRefeference.get();
463-
assertNotNull(getResponse, "Message not dead-lettered");
464-
assertEquals("test message", new String(getResponse.getBody()));
465-
headers = getResponse.getProps().getHeaders();
466-
assertNotNull(headers);
467-
death = (ArrayList<Object>) headers.get("x-death");
468-
assertNotNull(death);
469-
assertEquals(2, death.size());
470-
assertDeathReason(death, 0, TEST_QUEUE_NAME, "maxlen", "amq.direct",
441+
// Make queue zero length
442+
args = new HashMap<>();
443+
args.put("x-max-length", 0);
444+
channel.queueDelete(TEST_QUEUE_NAME);
445+
declareQueue(TEST_QUEUE_NAME, DLX, null, args);
446+
channel.queueBind(TEST_QUEUE_NAME, "amq.direct", "test");
447+
448+
sleep(100);
449+
//Queueing second time with same props
450+
channel.basicPublish("amq.direct", "test",
451+
new AMQP.BasicProperties.Builder()
452+
.headers(headers)
453+
.build(), "test message".getBytes());
454+
455+
responseRefeference.set(null);
456+
waitAtMost(
457+
ofSeconds(1),
458+
() -> {
459+
GetResponse response = channel.basicGet(DLQ, true);
460+
responseRefeference.set(response);
461+
return responseRefeference.get() != null;
462+
});
463+
getResponse = responseRefeference.get();
464+
assertNotNull(getResponse, "Message not dead-lettered");
465+
assertEquals("test message", new String(getResponse.getBody()));
466+
headers = getResponse.getProps().getHeaders();
467+
assertNotNull(headers);
468+
death = (ArrayList<Object>) headers.get("x-death");
469+
assertNotNull(death);
470+
assertEquals(2, death.size());
471+
assertDeathReason(death, 0, TEST_QUEUE_NAME, "maxlen", "amq.direct",
471472
Collections.singletonList("test"));
472-
assertDeathReason(death, 1, TEST_QUEUE_NAME, "expired", "amq.direct",
473+
assertDeathReason(death, 1, TEST_QUEUE_NAME, "expired", "amq.direct",
473474
Collections.singletonList("test"));
474475

475-
//Set invalid headers
476-
headers.put("x-death", "[I, am, not, array]");
477-
channel.basicPublish("amq.direct", "test",
478-
new AMQP.BasicProperties.Builder()
479-
.headers(headers)
480-
.build(), "test message".getBytes());
481-
482-
responseRefeference.set(null);
483-
waitAtMost(
484-
ofSeconds(1),
485-
() -> {
486-
GetResponse response = channel.basicGet(DLQ, true);
487-
responseRefeference.set(response);
488-
return responseRefeference.get() != null;
489-
});
490-
getResponse = responseRefeference.get();
491-
492-
assertNotNull(getResponse, "Message not dead-lettered");
493-
assertEquals("test message", new String(getResponse.getBody()));
494-
headers = getResponse.getProps().getHeaders();
495-
assertNotNull(headers);
496-
death = (ArrayList<Object>) headers.get("x-death");
497-
assertNotNull(death);
498-
assertEquals(1, death.size());
499-
assertDeathReason(death, 0, TEST_QUEUE_NAME, "maxlen", "amq.direct",
476+
//Set invalid headers
477+
headers.put("x-death", "[I, am, not, array]");
478+
channel.basicPublish("amq.direct", "test",
479+
new AMQP.BasicProperties.Builder()
480+
.headers(headers)
481+
.build(), "test message".getBytes());
482+
483+
responseRefeference.set(null);
484+
waitAtMost(
485+
ofSeconds(1),
486+
() -> {
487+
GetResponse response = channel.basicGet(DLQ, true);
488+
responseRefeference.set(response);
489+
return responseRefeference.get() != null;
490+
});
491+
getResponse = responseRefeference.get();
492+
493+
assertNotNull(getResponse, "Message not dead-lettered");
494+
assertEquals("test message", new String(getResponse.getBody()));
495+
headers = getResponse.getProps().getHeaders();
496+
assertNotNull(headers);
497+
death = (ArrayList<Object>) headers.get("x-death");
498+
assertNotNull(death);
499+
assertEquals(1, death.size());
500+
assertDeathReason(death, 0, TEST_QUEUE_NAME, "maxlen", "amq.direct",
500501
Collections.singletonList("test"));
501502

503+
}
502504
}
503505

504506
private void rejectionTest(final boolean useNack) throws Exception {

0 commit comments

Comments
 (0)