Skip to content

Commit f3f3eac

Browse files
garyrussellartembilan
authored andcommitted
GH-2155: DeadLetterPublishingRecoverer Improvement
Resolves #2155 Co-authored-by: Viacheslav Chernyshev <[email protected]> - Add a `BitSet` property to suppress individual standard headers - Support multiple `headersFunction` - Allow complete customization of exception headers - Add `setHeadersFunction` to the DLPR factory for retryable topics * Fix typo in doc. * Rework enum usage; other refactoring; add getters to `HeaderNames`. * Fix javadocs. * Doc polishing. * Change BitSet to EnumSet. * Fix bogus import. * Defend against immutable headers; only create new when detected. * Fix NOSONAR typo. **Cherry-pick to `2.8.x`** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java
1 parent a1ca194 commit f3f3eac

File tree

7 files changed

+689
-36
lines changed

7 files changed

+689
-36
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5578,7 +5578,7 @@ Key exceptions are only caused by `DeserializationException` s so there is no `D
55785578
There are two mechanisms to add more headers.
55795579

55805580
1. Subclass the recoverer and override `createProducerRecord()` - call `super.createProducerRecord()` and add more headers.
5581-
2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record.
5581+
2. Provide a `BiFunction` to receive the consumer record and exception, returning a `Headers` object; headers from there will be copied to the final producer record; also see <<dlpr-headers>>.
55825582
Use `setHeadersFunction()` to set the `BiFunction`.
55835583

55845584
The second is simpler to implement but the first has more information available, including the already assembled standard headers.
@@ -5673,6 +5673,34 @@ The reason for the two properties is because, while you might want to retain onl
56735673

56745674
`appendOriginalHeaders` is applied to all headers named `*ORIGINAL*` while `stripPreviousExceptionHeaders` is applied to all headers named `*EXCEPTION*`.
56755675

5676+
Starting with version 2.8.4, you now can control which of the standard headers will be added to the output record.
5677+
See the `enum HeadersToAdd` for the generic names of the (currently) 10 standard headers that are added by default (these are not the actual header names, just an abstraction; the actual header names are set up by the `getHeaderNames()` method which subclasses can override.
5678+
5679+
To exclude headers, use the `excludeHeaders()` method; for example, to suppress adding the exception stack trace in a header, use:
5680+
5681+
====
5682+
[source, java]
5683+
----
5684+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
5685+
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
5686+
----
5687+
====
5688+
5689+
In addition, you can completely customize the addition of exception headers by adding an `ExceptionHeadersCreator`; this also disables all standard exception headers.
5690+
5691+
====
5692+
[source, java]
5693+
----
5694+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
5695+
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
5696+
kafkaHeaders.add(new RecordHeader(..., ...));
5697+
});
5698+
----
5699+
====
5700+
5701+
Also starting with version 2.8.4, you can now provide multiple headers functions, via the `addHeadersFunction` method.
5702+
This allows additional functions to apply, even if another function has already been registered, for example, when using <<retry-topic>>.
5703+
56765704
Also see <<retry-headers>> with <<retry-topic>>.
56775705

56785706
[[exp-backoff]]

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,8 @@ DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver)
433433
----
434434
====
435435

436+
Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })`
437+
436438
[[retry-topic-combine-blocking]]
437439
==== Combining blocking and non-blocking retries
438440

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ See <<delegating-serialization>> for more information.
8383

8484
The property `stripPreviousExceptionHeaders` is now `true` by default.
8585

86+
There are now several techniques to customize which headers are added to the output record.
87+
8688
See <<dlpr-headers>> for more information.
8789

8890
[[x28-retryable-topics-changes]]

0 commit comments

Comments
 (0)