Skip to content

Commit d65a9f0

Browse files
Viacheslav Chernyshevgaryrussell
Viacheslav Chernyshev
authored andcommitted
DLPR - Generate Header Values Lazily
The `this.whichHeaders.contains` check happens after the byte array (in case of `maybeAddHeader`) or the `RecordHeader` (in `appendOrReplace`) has already been created. It may be quite an expensive operation, so in this PR I propose to pass a supplier into these two functions instead. Change the signature of appendOrReplace according to the review.
1 parent 1d13671 commit d65a9f0

File tree

1 file changed

+32
-25
lines changed

1 file changed

+32
-25
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.TimeoutException;
3232
import java.util.function.BiFunction;
3333
import java.util.function.Function;
34+
import java.util.function.Supplier;
3435

3536
import org.apache.commons.logging.LogFactory;
3637
import org.apache.kafka.clients.consumer.Consumer;
@@ -39,7 +40,6 @@
3940
import org.apache.kafka.common.PartitionInfo;
4041
import org.apache.kafka.common.TopicPartition;
4142
import org.apache.kafka.common.header.Headers;
42-
import org.apache.kafka.common.header.internals.RecordHeader;
4343
import org.apache.kafka.common.header.internals.RecordHeaders;
4444

4545
import org.springframework.core.log.LogAccessor;
@@ -659,63 +659,70 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E
659659

660660
private void maybeAddOriginalHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception ex) {
661661
maybeAddHeader(kafkaHeaders, this.headerNames.original.topicHeader,
662-
record.topic().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TOPIC);
662+
() -> record.topic().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TOPIC);
663663
maybeAddHeader(kafkaHeaders, this.headerNames.original.partitionHeader,
664-
ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array(),
664+
() -> ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array(),
665665
HeaderNames.HeadersToAdd.PARTITION);
666666
maybeAddHeader(kafkaHeaders, this.headerNames.original.offsetHeader,
667-
ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array(), HeaderNames.HeadersToAdd.OFFSET);
667+
() -> ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array(),
668+
HeaderNames.HeadersToAdd.OFFSET);
668669
maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampHeader,
669-
ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array(), HeaderNames.HeadersToAdd.TS);
670+
() -> ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array(), HeaderNames.HeadersToAdd.TS);
670671
maybeAddHeader(kafkaHeaders, this.headerNames.original.timestampTypeHeader,
671-
record.timestampType().toString().getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.TS_TYPE);
672+
() -> record.timestampType().toString().getBytes(StandardCharsets.UTF_8),
673+
HeaderNames.HeadersToAdd.TS_TYPE);
672674
if (ex instanceof ListenerExecutionFailedException) {
673675
String consumerGroup = ((ListenerExecutionFailedException) ex).getGroupId();
674676
if (consumerGroup != null) {
675677
maybeAddHeader(kafkaHeaders, this.headerNames.original.consumerGroup,
676-
consumerGroup.getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.GROUP);
678+
() -> consumerGroup.getBytes(StandardCharsets.UTF_8), HeaderNames.HeadersToAdd.GROUP);
677679
}
678680
}
679681
}
680682

681-
private void maybeAddHeader(Headers kafkaHeaders, String header, byte[] value, HeaderNames.HeadersToAdd hta) {
683+
private void maybeAddHeader(Headers kafkaHeaders, String header, Supplier<byte[]> valueSupplier,
684+
HeaderNames.HeadersToAdd hta) {
685+
682686
if (this.whichHeaders.contains(hta)
683687
&& (this.appendOriginalHeaders || kafkaHeaders.lastHeader(header) == null)) {
684-
kafkaHeaders.add(header, value);
688+
kafkaHeaders.add(header, valueSupplier.get());
685689
}
686690
}
687691

688692
private void addExceptionInfoHeaders(Headers kafkaHeaders, Exception exception, boolean isKey,
689693
HeaderNames names) {
690694

691-
appendOrReplace(kafkaHeaders, new RecordHeader(isKey ? names.exceptionInfo.keyExceptionFqcn
692-
: names.exceptionInfo.exceptionFqcn,
693-
exception.getClass().getName().getBytes(StandardCharsets.UTF_8)), HeaderNames.HeadersToAdd.EXCEPTION);
695+
appendOrReplace(kafkaHeaders,
696+
isKey ? names.exceptionInfo.keyExceptionFqcn : names.exceptionInfo.exceptionFqcn,
697+
() -> exception.getClass().getName().getBytes(StandardCharsets.UTF_8),
698+
HeaderNames.HeadersToAdd.EXCEPTION);
694699
if (exception.getCause() != null) {
695-
appendOrReplace(kafkaHeaders, new RecordHeader(names.exceptionInfo.exceptionCauseFqcn,
696-
exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8)),
700+
appendOrReplace(kafkaHeaders,
701+
names.exceptionInfo.exceptionCauseFqcn,
702+
() -> exception.getCause().getClass().getName().getBytes(StandardCharsets.UTF_8),
697703
HeaderNames.HeadersToAdd.EX_CAUSE);
698704
}
699705
String message = exception.getMessage();
700706
if (message != null) {
701-
appendOrReplace(kafkaHeaders, new RecordHeader(isKey
702-
? names.exceptionInfo.keyExceptionMessage
703-
: names.exceptionInfo.exceptionMessage,
704-
exception.getMessage().getBytes(StandardCharsets.UTF_8)), HeaderNames.HeadersToAdd.EX_MSG);
707+
appendOrReplace(kafkaHeaders,
708+
isKey ? names.exceptionInfo.keyExceptionMessage : names.exceptionInfo.exceptionMessage,
709+
() -> exception.getMessage().getBytes(StandardCharsets.UTF_8),
710+
HeaderNames.HeadersToAdd.EX_MSG);
705711
}
706-
appendOrReplace(kafkaHeaders, new RecordHeader(isKey
707-
? names.exceptionInfo.keyExceptionStacktrace
708-
: names.exceptionInfo.exceptionStacktrace,
709-
getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)),
712+
appendOrReplace(kafkaHeaders,
713+
isKey ? names.exceptionInfo.keyExceptionStacktrace : names.exceptionInfo.exceptionStacktrace,
714+
() -> getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8),
710715
HeaderNames.HeadersToAdd.EX_STACKTRACE);
711716
}
712717

713-
private void appendOrReplace(Headers headers, RecordHeader header, HeaderNames.HeadersToAdd hta) {
718+
private void appendOrReplace(Headers headers, String header, Supplier<byte[]> valueSupplier,
719+
HeaderNames.HeadersToAdd hta) {
720+
714721
if (this.whichHeaders.contains(hta)) {
715722
if (this.stripPreviousExceptionHeaders) {
716-
headers.remove(header.key());
723+
headers.remove(header);
717724
}
718-
headers.add(header);
725+
headers.add(header, valueSupplier.get());
719726
}
720727
}
721728

0 commit comments

Comments
 (0)