Skip to content

Commit 122c215

Browse files
committed
spring-projectsGH-2671: Improve DLQ Exception Message Header
Resolves spring-projects#2671 With Spring Framework 6, `NestedRuntimeException.getMessage()` no longer includes the messages for nested exceptions via the cause chain. See spring-projects/spring-framework#25162 This means that the DLQ exception message header always contains the same `Failed listener` message. Find the root cause exception and include its message in the header. Ignore any nested `TimestampedException` and `LEFE` between the top `LEFE` and the root cause; these will still appear in the stack trace.
1 parent c900185 commit 122c215

File tree

5 files changed

+57
-17
lines changed

5 files changed

+57
-17
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -762,17 +762,18 @@ private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
762762
isKey ? names.exceptionInfo.keyExceptionFqcn : names.exceptionInfo.exceptionFqcn,
763763
() -> exception.getClass().getName().getBytes(StandardCharsets.UTF_8),
764764
HeaderNames.HeadersToAdd.EXCEPTION);
765-
if (exception.getCause() != null) {
765+
Exception cause = ErrorHandlingUtils.findRootCause(exception);
766+
if (cause != null) {
766767
appendOrReplace(kafkaHeaders,
767768
names.exceptionInfo.exceptionCauseFqcn,
768-
() -> exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8),
769+
() -> cause.getClass().getName().getBytes(StandardCharsets.UTF_8),
769770
HeaderNames.HeadersToAdd.EX_CAUSE);
770771
}
771-
String message = exception.getMessage();
772+
String message = buildMessage(exception, cause);
772773
if (message != null) {
773774
appendOrReplace(kafkaHeaders,
774775
isKey ? names.exceptionInfo.keyExceptionMessage : names.exceptionInfo.exceptionMessage,
775-
() -> exception.getMessage().getBytes(StandardCharsets.UTF_8),
776+
() -> message.getBytes(StandardCharsets.UTF_8),
776777
HeaderNames.HeadersToAdd.EX_MSG);
777778
}
778779
appendOrReplace(kafkaHeaders,
@@ -781,6 +782,26 @@ private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
781782
HeaderNames.HeadersToAdd.EX_STACKTRACE);
782783
}
783784

785+
@Nullable
786+
private String buildMessage(Exception exception, Throwable cause) {
787+
String message = exception.getMessage();
788+
if (!exception.equals(cause)) {
789+
if (message != null) {
790+
message = message + "; ";
791+
}
792+
String causeMsg = cause.getMessage();
793+
if (causeMsg != null) {
794+
if (message != null) {
795+
message = message + causeMsg;
796+
}
797+
else {
798+
message = causeMsg;
799+
}
800+
}
801+
}
802+
return message;
803+
}
804+
784805
private void appendOrReplace(Headers headers, String header, Supplier<byte[]> valueSupplier,
785806
HeaderNames.HeadersToAdd hta) {
786807

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,4 +212,22 @@ public static Exception unwrapIfNeeded(Exception exception) {
212212
return theEx;
213213
}
214214

215+
/**
216+
* Find the root cause, ignoring any {@link ListenerExecutionFailedException} and
217+
* {@link TimestampedException}.
218+
* @param exception the exception to examine.
219+
* @return the root cause.
220+
* @since 3.0.7
221+
*/
222+
public static Exception findRootCause(Exception exception) {
223+
Exception realException = exception;
224+
while ((realException instanceof ListenerExecutionFailedException
225+
|| realException instanceof TimestampedException)
226+
&& realException.getCause() instanceof Exception cause) {
227+
228+
realException = cause;
229+
}
230+
return realException;
231+
}
232+
215233
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -199,13 +199,7 @@ public boolean recovered(ConsumerRecord<?, ?> record, Exception exception,
199199
private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, Exception exception,
200200
Map<TopicPartition, FailedRecord> map, TopicPartition topicPartition) {
201201

202-
Exception realException = exception;
203-
while ((realException instanceof ListenerExecutionFailedException
204-
|| realException instanceof TimestampedException)
205-
&& realException.getCause() instanceof Exception) {
206-
207-
realException = (Exception) realException.getCause();
208-
}
202+
Exception realException = ErrorHandlingUtils.findRootCause(exception);
209203
FailedRecord failedRecord = map.get(topicPartition);
210204
if (failedRecord == null || failedRecord.getOffset() != record.offset()
211205
|| (this.resetStateOnExceptionChange

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ void headersNotStripped() {
258258
headers = captor.getValue().headers();
259259
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
260260
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
261-
assertThat(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value()).isEqualTo("testK".getBytes());
261+
assertThat(new String(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value())).isEqualTo("testK");
262262
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()).isEqualTo("testV".getBytes());
263263
}
264264

@@ -399,7 +399,8 @@ void appendOriginalHeaders() {
399399
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
400400
recoverer.setAppendOriginalHeaders(true);
401401
recoverer.setStripPreviousExceptionHeaders(false);
402-
recoverer.accept(record, new RuntimeException(new IllegalStateException()));
402+
recoverer.accept(record, new ListenerExecutionFailedException("Listener failed",
403+
new TimestampedException(new RuntimeException("ex1 msg", new IllegalStateException()))));
403404
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
404405
then(template).should(times(1)).send(producerRecordCaptor.capture());
405406
Headers headers = producerRecordCaptor.getValue().headers();
@@ -412,11 +413,15 @@ void appendOriginalHeaders() {
412413
Header firstExceptionCauseType = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN);
413414
Header firstExceptionMessage = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE);
414415
Header firstExceptionStackTrace = headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE);
416+
assertThat(new String(firstExceptionMessage.value())).isEqualTo("Listener failed; ex1 msg");
417+
assertThat(new String(firstExceptionType.value())).isEqualTo(ListenerExecutionFailedException.class.getName());
418+
assertThat(new String(firstExceptionCauseType.value())).isEqualTo(RuntimeException.class.getName());
415419

416420
ConsumerRecord<String, String> anotherRecord = new ConsumerRecord<>("bar", 1, 12L, 4321L,
417421
TimestampType.LOG_APPEND_TIME, 321, 321, "bar", null, new RecordHeaders(), Optional.empty());
418422
headers.forEach(header -> anotherRecord.headers().add(header));
419-
recoverer.accept(anotherRecord, new RuntimeException(new IllegalStateException()));
423+
recoverer.accept(anotherRecord, new ListenerExecutionFailedException("Listener failed",
424+
new TimestampedException(new RuntimeException("ex2 msg", new IllegalStateException()))));
420425
ArgumentCaptor<ProducerRecord> anotherProducerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
421426
then(template).should(times(2)).send(anotherProducerRecordCaptor.capture());
422427
Headers anotherHeaders = anotherProducerRecordCaptor.getAllValues().get(1).headers();
@@ -436,6 +441,8 @@ void appendOriginalHeaders() {
436441
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN))
437442
.isNotSameAs(firstExceptionCauseType);
438443
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE)).isNotSameAs(firstExceptionMessage);
444+
assertThat(new String(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()))
445+
.isEqualTo("Listener failed; ex2 msg");
439446
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE))
440447
.isNotSameAs(firstExceptionStackTrace);
441448
Iterator<Header> exceptionHeaders = anotherHeaders.headers(KafkaHeaders.DLT_EXCEPTION_FQCN).iterator();

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -751,8 +751,8 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
751751
.contains("ListenerExecutionFailedException");
752752
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, byte[].class)))
753753
.isEqualTo("java.lang.RuntimeException");
754-
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))
755-
.contains("Listener failed".getBytes());
754+
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)))
755+
.contains("Listener failed; fail for max failures");
756756
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull();
757757
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, byte[].class))
758758
.contains("fail for max failures".getBytes());

0 commit comments

Comments
 (0)