Skip to content

Commit 59ef66f

Browse files
committed
spring-projectsGH-1882 DLPR: Add Exception Cause Header
Resolves spring-projects#1882
1 parent 6e5d4b9 commit 59ef66f

File tree

5 files changed

+124
-30
lines changed

5 files changed

+124
-30
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5321,7 +5321,8 @@ ErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(
53215321

53225322
The record sent to the dead-letter topic is enhanced with the following headers:
53235323

5324-
* `KafkaHeaders.DLT_EXCEPTION_FQCN`: The Exception class name.
5324+
* `KafkaHeaders.DLT_EXCEPTION_FQCN`: The Exception class name (generally a `ListenerExecutionFailedException`, but can be others).
5325+
* `KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN`: The Exception cause class name, if present (since version 2.8).
53255326
* `KafkaHeaders.DLT_EXCEPTION_STACKTRACE`: The Exception stack trace.
53265327
* `KafkaHeaders.DLT_EXCEPTION_MESSAGE`: The Exception message.
53275328
* `KafkaHeaders.DLT_KEY_EXCEPTION_FQCN`: The Exception class name (key deserialization errors only).
@@ -5332,6 +5333,9 @@ The record sent to the dead-letter topic is enhanced with the following headers:
53325333
* `KafkaHeaders.DLT_ORIGINAL_OFFSET`: The original offset.
53335334
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP`: The original timestamp.
53345335
* `KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE`: The original timestamp type.
5336+
* `KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP`: The original consumer group that failed to process the record (since version 2.8).
5337+
5338+
Key exceptions are only caused by `DeserializationException` s so there is no `DLT_KEY_EXCEPTION_CAUSE_FQCN`.
53355339

53365340
There are two mechanisms to add more headers.
53375341

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 88 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -514,14 +514,14 @@ private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
514514
}
515515

516516
private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception exception) {
517-
maybeAddOriginalHeaders(kafkaHeaders, record);
517+
maybeAddOriginalHeaders(kafkaHeaders, record, exception);
518518
Headers headers = this.headersFunction.apply(record, exception);
519519
if (headers != null) {
520520
headers.forEach(kafkaHeaders::add);
521521
}
522522
}
523523

524-
private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record) {
524+
private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception ex) {
525525
maybeAddHeader(kafkaHeaders, this.headerNames.original.topicHeader,
526526
record.topic().getBytes(StandardCharsets.UTF_8));
527527
maybeAddHeader(kafkaHeaders, this.headerNames.original.partitionHeader,
@@ -532,6 +532,13 @@ private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?>
532532
ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array());
533533
maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampTypeHeader,
534534
record.timestampType().toString().getBytes(StandardCharsets.UTF_8));
535+
if (ex instanceof ListenerExecutionFailedException) {
536+
String consumerGroup = ((ListenerExecutionFailedException) ex).getGroupId();
537+
if (consumerGroup != null) {
538+
maybeAddHeader(kafkaHeaders, this.headerNames.original.consumerGroup,
539+
consumerGroup.getBytes(StandardCharsets.UTF_8));
540+
}
541+
}
535542
}
536543

537544
private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) {
@@ -540,11 +547,14 @@ private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value) {
540547
}
541548
}
542549

543-
void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception,
544-
boolean isKey) {
550+
void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey) {
545551
kafkaHeaders.add(new RecordHeader(isKey ? this.headerNames.exceptionInfo.keyExceptionFqcn
546552
: this.headerNames.exceptionInfo.exceptionFqcn,
547553
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
554+
if (!isKey && exception.getCause() != null) {
555+
kafkaHeaders.add(new RecordHeader(this.headerNames.exceptionInfo.exceptionCauseFqcn,
556+
exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8)));
557+
}
548558
String message = exception.getMessage();
549559
if (message != null) {
550560
kafkaHeaders.add(new RecordHeader(isKey
@@ -579,9 +589,11 @@ protected HeaderNames getHeaderNames() {
579589
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
580590
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
581591
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
592+
.consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)
582593
.exception()
583594
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
584595
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
596+
.exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)
585597
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
586598
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
587599
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
@@ -605,50 +617,67 @@ public static class HeaderNames {
605617
}
606618

607619
static class Original {
608-
private final String offsetHeader;
609-
private final String timestampHeader;
610-
private final String timestampTypeHeader;
611-
private final String topicHeader;
612-
private final String partitionHeader;
620+
621+
final String offsetHeader;
622+
623+
final String timestampHeader;
624+
625+
final String timestampTypeHeader;
626+
627+
final String topicHeader;
628+
629+
final String partitionHeader;
630+
631+
final String consumerGroup;
613632

614633
Original(String offsetHeader,
615634
String timestampHeader,
616635
String timestampTypeHeader,
617636
String topicHeader,
618-
String partitionHeader) {
637+
String partitionHeader,
638+
String consumerGroup) {
619639
this.offsetHeader = offsetHeader;
620640
this.timestampHeader = timestampHeader;
621641
this.timestampTypeHeader = timestampTypeHeader;
622642
this.topicHeader = topicHeader;
623643
this.partitionHeader = partitionHeader;
644+
this.consumerGroup = consumerGroup;
624645
}
625646
}
626647

627648
static class ExceptionInfo {
628649

629-
private final String keyExceptionFqcn;
630-
private final String exceptionFqcn;
631-
private final String keyExceptionMessage;
632-
private final String exceptionMessage;
633-
private final String keyExceptionStacktrace;
634-
private final String exceptionStacktrace;
650+
final String keyExceptionFqcn;
651+
652+
final String exceptionFqcn;
653+
654+
final String exceptionCauseFqcn;
655+
656+
final String keyExceptionMessage;
657+
658+
final String exceptionMessage;
659+
660+
final String keyExceptionStacktrace;
661+
662+
final String exceptionStacktrace;
635663

636664
ExceptionInfo(String keyExceptionFqcn,
637-
String exceptionFqcn,
638-
String keyExceptionMessage,
639-
String exceptionMessage,
640-
String keyExceptionStacktrace,
641-
String exceptionStacktrace) {
665+
String exceptionFqcn,
666+
String exceptionCauseFqcn,
667+
String keyExceptionMessage,
668+
String exceptionMessage,
669+
String keyExceptionStacktrace,
670+
String exceptionStacktrace) {
642671
this.keyExceptionFqcn = keyExceptionFqcn;
643672
this.exceptionFqcn = exceptionFqcn;
673+
this.exceptionCauseFqcn = exceptionCauseFqcn;
644674
this.keyExceptionMessage = keyExceptionMessage;
645675
this.exceptionMessage = exceptionMessage;
646676
this.keyExceptionStacktrace = keyExceptionStacktrace;
647677
this.exceptionStacktrace = exceptionStacktrace;
648678
}
649679
}
650680

651-
652681
/**
653682
* Provides a convenient API for creating
654683
* {@link DeadLetterPublishingRecoverer.HeaderNames}.
@@ -685,6 +714,8 @@ public class Original {
685714

686715
private String partitionHeader;
687716

717+
private String consumerGroupHeader;
718+
688719
/**
689720
* Sets the name of the header that will be used to store the offset
690721
* of the original record.
@@ -745,6 +776,18 @@ public Builder.Original partitionHeader(String partitionHeader) {
745776
return this;
746777
}
747778

779+
/**
780+
* Sets the name of the header that will be used to store the consumer
781+
* group that failed to consume the original record.
782+
* @param consumerGroup the consumer group header name.
783+
* @return the Original builder instance
784+
* @since 2.8
785+
*/
786+
public Builder.Original consumerGroupHeader(String consumerGroupHeader) {
787+
this.consumerGroupHeader = consumerGroupHeader;
788+
return this;
789+
}
790+
748791
/**
749792
* Returns the exception builder.
750793
* @return the exception builder.
@@ -760,16 +803,18 @@ public ExceptionInfo exception() {
760803
* @since 2.7
761804
*/
762805
private DeadLetterPublishingRecoverer.HeaderNames.Original build() {
763-
Assert.notNull(this.offsetHeader, "offsetHeader header cannot be null");
764-
Assert.notNull(this.timestampHeader, "timestampHeader header cannot be null");
765-
Assert.notNull(this.timestampTypeHeader, "timestampTypeHeader header cannot be null");
766-
Assert.notNull(this.topicHeader, "topicHeader header cannot be null");
767-
Assert.notNull(this.partitionHeader, "partitionHeader header cannot be null");
806+
Assert.notNull(this.offsetHeader, "offsetHeader cannot be null");
807+
Assert.notNull(this.timestampHeader, "timestampHeader cannot be null");
808+
Assert.notNull(this.timestampTypeHeader, "timestampTypeHeader cannot be null");
809+
Assert.notNull(this.topicHeader, "topicHeader cannot be null");
810+
Assert.notNull(this.partitionHeader, "partitionHeader cannot be null");
811+
Assert.notNull(this.consumerGroupHeader, "consumerGroupHeader cannot be null");
768812
return new DeadLetterPublishingRecoverer.HeaderNames.Original(this.offsetHeader,
769813
this.timestampHeader,
770814
this.timestampTypeHeader,
771815
this.topicHeader,
772-
this.partitionHeader);
816+
this.partitionHeader,
817+
this.consumerGroupHeader);
773818
}
774819
}
775820

@@ -785,6 +830,8 @@ public class ExceptionInfo {
785830

786831
private String exceptionFqcn;
787832

833+
private String exceptionCauseFqcn;
834+
788835
private String keyExceptionMessage;
789836

790837
private String exceptionMessage;
@@ -817,6 +864,17 @@ public ExceptionInfo exceptionFqcn(String exceptionFqcn) {
817864
return this;
818865
}
819866

867+
/**
868+
* Sets the name of the header that will be used to store the exceptionCauseFqcn
869+
* of the original record.
870+
* @param exceptionFqcn the exceptionFqcn header name.
871+
* @return the Exception builder instance
872+
* @since 2.8
873+
*/
874+
public ExceptionInfo exceptionCauseFqcn(String exceptionCauseFqcn) {
875+
this.exceptionCauseFqcn = exceptionCauseFqcn;
876+
return this;
877+
}
820878
/**
821879
* Sets the name of the header that will be used to store the keyExceptionMessage
822880
* of the original record.
@@ -873,13 +931,15 @@ public ExceptionInfo exceptionStacktrace(String exceptionStacktrace) {
873931
public DeadLetterPublishingRecoverer.HeaderNames build() {
874932
Assert.notNull(this.keyExceptionFqcn, "keyExceptionFqcn header cannot be null");
875933
Assert.notNull(this.exceptionFqcn, "exceptionFqcn header cannot be null");
934+
Assert.notNull(this.exceptionCauseFqcn, "exceptionCauseFqcn header cannot be null");
876935
Assert.notNull(this.keyExceptionMessage, "keyExceptionMessage header cannot be null");
877936
Assert.notNull(this.exceptionMessage, "exceptionMessage header cannot be null");
878937
Assert.notNull(this.keyExceptionStacktrace, "keyExceptionStacktrace header cannot be null");
879938
Assert.notNull(this.exceptionStacktrace, "exceptionStacktrace header cannot be null");
880939
return new DeadLetterPublishingRecoverer.HeaderNames(Builder.this.original.build(),
881940
new HeaderNames.ExceptionInfo(this.keyExceptionFqcn,
882941
this.exceptionFqcn,
942+
this.exceptionCauseFqcn,
883943
this.keyExceptionMessage,
884944
this.exceptionMessage,
885945
this.keyExceptionStacktrace,

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
7272
.timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE)
7373
.topicHeader(KafkaHeaders.ORIGINAL_TOPIC)
7474
.partitionHeader(KafkaHeaders.ORIGINAL_PARTITION)
75+
.consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP)
7576
.exception()
7677
.keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN)
7778
.exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN)
79+
.exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN)
7880
.keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE)
7981
.exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE)
8082
.keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE)

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ public abstract class KafkaHeaders {
147147
*/
148148
public static final String DLT_EXCEPTION_FQCN = PREFIX + "dlt-exception-fqcn";
149149

150+
/**
151+
* Exception cause class name for a record published sent to a dead-letter topic.
152+
* @since 2.8
153+
*/
154+
public static final String DLT_EXCEPTION_CAUSE_FQCN = PREFIX + "dlt-exception-cause-fqcn";
155+
150156
/**
151157
* Exception stack trace for a record published to a dead-letter topic.
152158
* @since 2.2
@@ -198,6 +204,12 @@ public abstract class KafkaHeaders {
198204
*/
199205
public static final String DLT_ORIGINAL_OFFSET = PREFIX + "dlt-original-offset";
200206

207+
/**
208+
* Consumer group that failed to consumer a record published to a dead-letter topic.
209+
* @since 2.8
210+
*/
211+
public static final String DLT_ORIGINAL_CONSUMER_GROUP = PREFIX + "dlt-original-consumer-group";
212+
201213
/**
202214
* Original timestamp for a record published to a dead-letter topic.
203215
* @since 2.2
@@ -228,6 +240,12 @@ public abstract class KafkaHeaders {
228240
*/
229241
public static final String EXCEPTION_FQCN = PREFIX + "exception-fqcn";
230242

243+
/**
244+
* Exception class name for a record published sent to another topic.
245+
* @since 2.8
246+
*/
247+
public static final String EXCEPTION_CAUSE_FQCN = PREFIX + "exception-fqcn";
248+
231249
/**
232250
* Exception stack trace for a record published to another topic.
233251
* @since 2.2
@@ -279,6 +297,12 @@ public abstract class KafkaHeaders {
279297
*/
280298
public static final String ORIGINAL_OFFSET = PREFIX + "original-offset";
281299

300+
/**
301+
* Consumer group that failed to consumer a record published to another topic.
302+
* @since 2.8
303+
*/
304+
public static final String ORIGINAL_CONSUMER_GROUP = PREFIX + "dlt-original-consumer-group";
305+
282306
/**
283307
* Original timestamp for a record published to another topic.
284308
* @since 2.2

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,8 @@ private void testFixLagGuts(String topic, int whichTm) throws InterruptedExcepti
694694
public void testMaxFailures() throws Exception {
695695
logger.info("Start testMaxFailures");
696696
Map<String, Object> props = KafkaTestUtils.consumerProps("txTestMaxFailures", "false", embeddedKafka);
697-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupInARBP");
697+
String group = "groupInARBP";
698+
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
698699
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
699700
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
700701
ContainerProperties containerProps = new ContainerProperties(topic3);
@@ -769,6 +770,8 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
769770
MessageHeaders headers = new MessageHeaders(map);
770771
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_FQCN, byte[].class)))
771772
.contains("ListenerExecutionFailedException");
773+
assertThat(new String(headers.get(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, byte[].class)))
774+
.isEqualTo("java.lang.RuntimeException");
772775
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class))
773776
.contains("fail for max failures".getBytes());
774777
assertThat(headers.get(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNotNull();
@@ -777,6 +780,7 @@ public void accept(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer, Excepti
777780
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, byte[].class)).isNotNull();
778781
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, byte[].class)).isNotNull();
779782
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)).isEqualTo(topic3.getBytes());
783+
assertThat(headers.get(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)).isEqualTo(group.getBytes());
780784
assertThat(headers.get("baz")).isEqualTo("qux".getBytes());
781785
pf.destroy();
782786
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();

0 commit comments

Comments
 (0)