Skip to content

Commit 538aaf9

Browse files
committed
spring-projectsGH-1882 DLPR: Add Exception Cause Header
Resolves spring-projects#1882
1 parent 6e5d4b9 commit 538aaf9

File tree

5 files changed

+121
-30
lines changed

5 files changed

+121
-30
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5321,7 +5321,8 @@ ErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(
53215321

53225322
The record sent to the dead-letter topic is enhanced with the following headers:
53235323

5324-
* `KafkaHeaders.DLT_EXCEPTION_FQCN`: The Exception class name.
5324+
* `KafkaHeaders.DLT_EXCEPTION_FQCN`: The Exception class name (generally a `ListenerExecutionFailedException`, but can be others).
5325+
* `KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN`: The Exception cause class name, if present (since version 2.8).
53255326
* `KafkaHeaders.DLT_EXCEPTION_STACKTRACE`: The Exception stack trace.
53265327
* `KafkaHeaders.DLT_EXCEPTION_MESSAGE`: The Exception message.
53275328
* `KafkaHeaders.DLT_KEY_EXCEPTION_FQCN`: The Exception class name (key deserialization errors only).
@@ -5332,6 +5333,9 @@ The record sent to the dead-letter topic is enhanced with the following headers:
53325333
* `KafkaHeaders.DLT_ORIGINAL_OFFSET`: The original offset.
53335334
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP`: The original timestamp.
53345335
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE`: The original timestamp type.
5336+
* `KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP`: The original consumer group that failed to process the record (since version 2.8).
5337+
5338+
Key exceptions are only caused by `DeserializationException` s so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN`.
53355339

53365340
There are two mechanisms to add more headers.
53375341

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 85 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -514,14 +514,14 @@ private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
514514
}
515515

516516
private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception exception) {
517-
maybeAddOriginalHeaders(kafkaHeaders, record);
517+
maybeAddOriginalHeaders(kafkaHeaders, record, exception);
518518
Headers headers = this.headersFunction.apply(record, exception);
519519
if (headers != null) {
520520
headers.forEach(kafkaHeaders::add);
521521
}
522522
}
523523

524-
private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record) {
524+
private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception ex) {
525525
maybeAddHeader(kafkaHeaders, this.headerNames.original.topicHeader,
526526
record.topic().getBytes(StandardCharsets.UTF_8));
527527
maybeAddHeader(kafkaHeaders, this.headerNames.original.partitionHeader,
@@ -532,6 +532,11 @@ private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?>
532532
ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array());
533533
maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampTypeHeader,
534534
record.timestampType().toString().getBytes(StandardCharsets.UTF_8));
535+
if (ex instanceof ListenerExecutionFailedException) {
536+
String consumerGroup = ((ListenerExecutionFailedException) ex).getGroupId();
537+
maybeAddHeader(kafkaHeaders, this.headerNames.original.consumerGroup,
538+
consumerGroup.getBytes(StandardCharsets.UTF_8));
539+
}
535540
}
536541

537542
private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) {
@@ -540,11 +545,14 @@ private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) {
540545
}
541546
}
542547

543-
void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
544-
boolean isKey) {
548+
void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey) {
545549
kafkaHeaders.add(new RecordHeader(isKey ? this.headerNames.exceptionInfo.keyExceptionFqcn
546550
: this.headerNames.exceptionInfo.exceptionFqcn,
547551
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
552+
if (!isKey && exception.getCause() != null) {
553+
kafkaHeaders.add(new RecordHeader(this.headerNames.exceptionInfo.exceptionCauseFqcn,
554+
exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8)));
555+
}
548556
String message = exception.getMessage();
549557
if (message != null) {
550558
kafkaHeaders.add(new RecordHeader(isKey
@@ -579,9 +587,11 @@ protected HeaderNames getHeaderNames() {
579587
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
580588
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
581589
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
590+
.consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)
582591
.exception()
583592
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
584593
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
594+
.exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)
585595
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
586596
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
587597
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
@@ -605,50 +615,67 @@ public static class HeaderNames {
605615
}
606616

607617
static class Original {
608-
private final String offsetHeader;
609-
private final String timestampHeader;
610-
private final String timestampTypeHeader;
611-
private final String topicHeader;
612-
private final String partitionHeader;
618+
619+
final String offsetHeader;
620+
621+
final String timestampHeader;
622+
623+
final String timestampTypeHeader;
624+
625+
final String topicHeader;
626+
627+
final String partitionHeader;
628+
629+
final String consumerGroup;
613630

614631
Original(String offsetHeader,
615632
String timestampHeader,
616633
String timestampTypeHeader,
617634
String topicHeader,
618-
String partitionHeader) {
635+
String partitionHeader,
636+
String consumerGroup) {
619637
this.offsetHeader = offsetHeader;
620638
this.timestampHeader = timestampHeader;
621639
this.timestampTypeHeader = timestampTypeHeader;
622640
this.topicHeader = topicHeader;
623641
this.partitionHeader = partitionHeader;
642+
this.consumerGroup = consumerGroup;
624643
}
625644
}
626645

627646
static class ExceptionInfo {
628647

629-
private final String keyExceptionFqcn;
630-
private final String exceptionFqcn;
631-
private final String keyExceptionMessage;
632-
private final String exceptionMessage;
633-
private final String keyExceptionStacktrace;
634-
private final String exceptionStacktrace;
648+
final String keyExceptionFqcn;
649+
650+
final String exceptionFqcn;
651+
652+
final String exceptionCauseFqcn;
653+
654+
final String keyExceptionMessage;
655+
656+
final String exceptionMessage;
657+
658+
final String keyExceptionStacktrace;
659+
660+
final String exceptionStacktrace;
635661

636662
ExceptionInfo(String keyExceptionFqcn,
637-
String exceptionFqcn,
638-
String keyExceptionMessage,
639-
String exceptionMessage,
640-
String keyExceptionStacktrace,
641-
String exceptionStacktrace) {
663+
String exceptionFqcn,
664+
String exceptionCauseFqcn,
665+
String keyExceptionMessage,
666+
String exceptionMessage,
667+
String keyExceptionStacktrace,
668+
String exceptionStacktrace) {
642669
this.keyExceptionFqcn = keyExceptionFqcn;
643670
this.exceptionFqcn = exceptionFqcn;
671+
this.exceptionCauseFqcn = exceptionCauseFqcn;
644672
this.keyExceptionMessage = keyExceptionMessage;
645673
this.exceptionMessage = exceptionMessage;
646674
this.keyExceptionStacktrace = keyExceptionStacktrace;
647675
this.exceptionStacktrace = exceptionStacktrace;
648676
}
649677
}
650678

651-
652679
/**
653680
* Provides a convenient API for creating
654681
* {@link DeadLetterPublishingRecoverer.HeaderNames}.
@@ -685,6 +712,8 @@ public class Original {
685712

686713
private String partitionHeader;
687714

715+
private String consumerGroupHeader;
716+
688717
/**
689718
* Sets the name of the header that will be used to store the offset
690719
* of the original record.
@@ -745,6 +774,18 @@ public Builder.Original partitionHeader(String partitionHeader) {
745774
return this;
746775
}
747776

777+
/**
778+
* Sets the name of the header that will be used to store the consumer
779+
* group that failed to consume the original record.
780+
* @param consumerGroup the consumer group header name.
781+
* @return the Original builder instance
782+
* @since 2.8
783+
*/
784+
public Builder.Original consumerGroupHeader(String consumerGroupHeader) {
785+
this.consumerGroupHeader = consumerGroupHeader;
786+
return this;
787+
}
788+
748789
/**
749790
* Returns the exception builder.
750791
* @return the exception builder.
@@ -760,16 +801,18 @@ public ExceptionInfo exception() {
760801
* @since 2.7
761802
*/
762803
private DeadLetterPublishingRecoverer.HeaderNames.Original build() {
763-
Assert.notNull(this.offsetHeader, "offsetHeader header cannot be null");
764-
Assert.notNull(this.timestampHeader, "timestampHeader header cannot be null");
765-
Assert.notNull(this.timestampTypeHeader, "timestampTypeHeader header cannot be null");
766-
Assert.notNull(this.topicHeader, "topicHeader header cannot be null");
767-
Assert.notNull(this.partitionHeader, "partitionHeader header cannot be null");
804+
Assert.notNull(this.offsetHeader, "offsetHeader cannot be null");
805+
Assert.notNull(this.timestampHeader, "timestampHeader cannot be null");
806+
Assert.notNull(this.timestampTypeHeader, "timestampTypeHeader cannot be null");
807+
Assert.notNull(this.topicHeader, "topicHeader cannot be null");
808+
Assert.notNull(this.partitionHeader, "partitionHeader cannot be null");
809+
Assert.notNull(this.consumerGroupHeader, "consumerGroupHeader cannot be null");
768810
return new DeadLetterPublishingRecoverer.HeaderNames.Original(this.offsetHeader,
769811
this.timestampHeader,
770812
this.timestampTypeHeader,
771813
this.topicHeader,
772-
this.partitionHeader);
814+
this.partitionHeader,
815+
this.consumerGroupHeader);
773816
}
774817
}
775818

@@ -785,6 +828,8 @@ public class ExceptionInfo {
785828

786829
private String exceptionFqcn;
787830

831+
private String exceptionCauseFqcn;
832+
788833
private String keyExceptionMessage;
789834

790835
private String exceptionMessage;
@@ -817,6 +862,17 @@ public ExceptionInfo exceptionFqcn(String exceptionFqcn) {
817862
return this;
818863
}
819864

865+
/**
866+
* Sets the name of the header that will be used to store the exceptionCauseFqcn
867+
* of the original record.
868+
* @param exceptionFqcn the exceptionFqcn header name.
869+
* @return the Exception builder instance
870+
* @since 2.8
871+
*/
872+
public ExceptionInfo exceptionCauseFqcn(String exceptionCauseFqcn) {
873+
this.exceptionCauseFqcn = exceptionCauseFqcn;
874+
return this;
875+
}
820876
/**
821877
* Sets the name of the header that will be used to store the keyExceptionMessage
822878
* of the original record.
@@ -880,6 +936,7 @@ public DeadLetterPublishingRecoverer.HeaderNames build() {
880936
return new DeadLetterPublishingRecoverer.HeaderNames(Builder.this.original.build(),
881937
new HeaderNames.ExceptionInfo(this.keyExceptionFqcn,
882938
this.exceptionFqcn,
939+
this.exceptionCauseFqcn,
883940
this.keyExceptionMessage,
884941
this.exceptionMessage,
885942
this.keyExceptionStacktrace,

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
7272
.timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE)
7373
.topicHeader(KafkaHeaders.ORIGINAL_TOPIC)
7474
.partitionHeader(KafkaHeaders.ORIGINAL_PARTITION)
75+
.consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP)
7576
.exception()
7677
.keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN)
7778
.exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN)
79+
.exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN)
7880
.keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE)
7981
.exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE)
8082
.keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE)

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ public abstract class KafkaHeaders {
147147
*/
148148
public static final String DLT_EXCEPTION_FQCN = PREFIX + "dlt-exception-fqcn";
149149

150+
/**
151+
* Exception cause class name for a record published sent to a dead-letter topic.
152+
* @since 2.8
153+
*/
154+
public static final String DLT_EXCEPTION_CAUSE_FQCN = PREFIX + "dlt-exception-cause-fqcn";
155+
150156
/**
151157
* Exception stack trace for a record published to a dead-letter topic.
152158
* @since 2.2
@@ -198,6 +204,12 @@ public abstract class KafkaHeaders {
198204
*/
199205
public static final String DLT_ORIGINAL_OFFSET = PREFIX + "dlt-original-offset";
200206

207+
/**
208+
* Consumer group that failed to consumer a record published to a dead-letter topic.
209+
* @since 2.8
210+
*/
211+
public static final String DLT_ORIGINAL_CONSUMER_GROUP = PREFIX + "dlt-original-consumer-group";
212+
201213
/**
202214
* Original timestamp for a record published to a dead-letter topic.
203215
* @since 2.2
@@ -228,6 +240,12 @@ public abstract class KafkaHeaders {
228240
*/
229241
public static final String EXCEPTION_FQCN = PREFIX + "exception-fqcn";
230242

243+
/**
244+
* Exception class name for a record published sent to another topic.
245+
* @since 2.8
246+
*/
247+
public static final String EXCEPTION_CAUSE_FQCN = PREFIX + "exception-fqcn";
248+
231249
/**
232250
* Exception stack trace for a record published to another topic.
233251
* @since 2.2
@@ -279,6 +297,12 @@ public abstract class KafkaHeaders {
279297
*/
280298
public static final String ORIGINAL_OFFSET = PREFIX + "original-offset";
281299

300+
/**
301+
* Consumer group that failed to consumer a record published to another topic.
302+
* @since 2.8
303+
*/
304+
public static final String ORIGINAL_CONSUMER_GROUP = PREFIX + "dlt-original-consumer-group";
305+
282306
/**
283307
* Original timestamp for a record published to another topic.
284308
* @since 2.2

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,8 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
694694
public void testMaxFailures() throws Exception {
695695
logger.info("Start testMaxFailures");
696696
Map<String, Object> props = KafkaTestUtils.consumerProps("txTestMaxFailures", "false", embeddedKafka);
697-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupInARBP");
697+
String group = "groupInARBP";
698+
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
698699
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
699700
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
700701
ContainerProperties containerProps = new ContainerProperties(topic3);
@@ -769,6 +770,8 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
769770
MessageHeaders headers = new MessageHeaders(map);
770771
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class)))
771772
.contains("ListenerExecutionFailedException");
773+
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, byte[].class)))
774+
.isEqualTo("java.lang.RuntimeException");
772775
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))
773776
.contains("fail for max failures".getBytes());
774777
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull();
@@ -777,6 +780,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
777780
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, byte[].class)).isNotNull();
778781
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, byte[].class)).isNotNull();
779782
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)).isEqualTo(topic3.getBytes());
783+
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isEqualTo(group.getBytes());
780784
assertThat(headers.get("baz")).isEqualTo("qux".getBytes());
781785
pf.destroy();
782786
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();

0 commit comments

Comments
 (0)