From fb754ccf104bff139206aab148d75b6bda56d489 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Fri, 30 May 2025 22:49:54 +0900 Subject: [PATCH 1/5] spring-projectsGH-3879: Add cache to optimize header match performance. Fixes: #3879 Issue link: #3879 What Add a LRU cache for pattern match of KafkaHeaderMapper. Why? To improve CPU usage used by pattern match of KafkaHeaderMapper. Commonly, many Kafka records in the same topic will have the same header name. Currently, Pattern Match has O(M*N) time complexity, where M is pattern length, N is String length. If results of patterns match are cached and KafkaHeaderMapper uses it, KafkaHeaderMapper can expect improvement in terms of CPU usage. Signed-off-by: Sanghyeok An --- .../support/AbstractKafkaHeaderMapper.java | 43 ++++++++ .../DefaultKafkaHeaderMapperTests.java | 99 +++++++++++++++++++ 2 files changed, 142 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 6404e29f83..da7eb347c2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -36,6 +36,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; +import org.springframework.util.ConcurrentLruCache; import org.springframework.util.ObjectUtils; import org.springframework.util.PatternMatchUtils; @@ -67,6 +68,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { private final List multiValueHeaderMatchers = new ArrayList<>(); + private final HeaderPatternMatchCache headerMatchedCache = new HeaderPatternMatchCache(); + private final Map rawMappedHeaders = new HashMap<>(); { @@ -272,11 +275,22 @@ protected Object headerValueToAddOut(String key, Object value) { * @since 4.0 */ protected boolean doesMatchMultiValueHeader(String headerName) { + if (this.headerMatchedCache.isMultiValuePattern(headerName)) { + return true; + } + + if (this.headerMatchedCache.isSingleValuePattern(headerName)) { + return false; + } + for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) { if (headerMatcher.matchHeader(headerName)) { + this.headerMatchedCache.cacheAsMultiValueHeader(headerName); return true; } } + + this.headerMatchedCache.cacheAsSingleValueHeader(headerName); return false; } @@ -427,4 +441,33 @@ public boolean isNegated() { } + /** + * A Cache that remembers whether a header name matches the multi-value pattern. + */ + class HeaderPatternMatchCache { + + private static final int MAX_SIZE = 1000; + + private final ConcurrentLruCache multiValueHeaderPatternMatchCache = new ConcurrentLruCache<>(MAX_SIZE, key -> Boolean.TRUE); + + private final ConcurrentLruCache singleValueHeaderPatternMatchCache = new ConcurrentLruCache<>(MAX_SIZE, key -> Boolean.TRUE); + + public boolean isMultiValuePattern(String headerName) { + return this.multiValueHeaderPatternMatchCache.contains(headerName); + } + + public boolean isSingleValuePattern(String headerName) { + return this.singleValueHeaderPatternMatchCache.contains(headerName); + } + + public void cacheAsSingleValueHeader(String headerName) { + this.singleValueHeaderPatternMatchCache.get(headerName); + } + + public void cacheAsMultiValueHeader(String headerName) { + this.multiValueHeaderPatternMatchCache.get(headerName); + } + + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index 1151e6cd3d..40070537e9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -31,6 +31,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.retrytopic.RetryTopicHeaders; @@ -413,6 +415,103 @@ void multiValueHeaderToTest() { .containsExactly(multiValueWildCardHeader2Value1, multiValueWildCardHeader2Value2); } + @ParameterizedTest + @ValueSource(ints = {2000}) +// @ValueSource(ints = {500, 1000, 2000}) + void hugeNumberOfSingleValueHeaderToTest(int numberOfSingleValueHeaderCount) { + // GIVEN + Headers rawHeaders = new RecordHeaders(); + + String multiValueHeader1 = "test-multi-value1"; + byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 }; + + rawHeaders.add(multiValueHeader1, multiValueHeader1Value1); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value2); + + byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 }; + byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 }; + byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 }; + + rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue); + rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue); + rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues); + + byte[] singleValueHeaderValue = { 0, 0, 0, 6 }; + for (int i = 0; i < numberOfSingleValueHeaderCount; i++) { + String singleValueHeader = "test-single-value" + i; + rawHeaders.add(singleValueHeader, singleValueHeaderValue); + } + + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + mapper.setMultiValueHeaderPatterns(multiValueHeader1); + + // WHEN + Map mappedHeaders = new HashMap<>(); + mapper.toHeaders(rawHeaders, mappedHeaders); + + // THEN + assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1); + assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue); + assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues); + + for (int i = 0; i < numberOfSingleValueHeaderCount; i++) { + String singleValueHeader = "test-single-value" + i; + assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue); + } + + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader1, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2); + } + + @ParameterizedTest + @ValueSource(ints = {500, 1000, 2000}) + void hugeNumberOfMultiValueHeaderToTest(int numberOfMultiValueHeaderCount) { + // GIVEN + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + Headers rawHeaders = new RecordHeaders(); + + byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 }; + + for (int i = 0; i < numberOfMultiValueHeaderCount; i++) { + String multiValueHeader = "test-multi-value" + i; + mapper.setMultiValueHeaderPatterns(multiValueHeader); + rawHeaders.add(multiValueHeader, multiValueHeader1Value1); + rawHeaders.add(multiValueHeader, multiValueHeader1Value2); + } + + byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 }; + byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 }; + byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 }; + + rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue); + rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue); + rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues); + + String singleValueHeader = "test-single-value"; + byte[] singleValueHeaderValue = { 0, 0, 0, 6 }; + rawHeaders.add(singleValueHeader, singleValueHeaderValue); + + // WHEN + Map mappedHeaders = new HashMap<>(); + mapper.toHeaders(rawHeaders, mappedHeaders); + + // THEN + assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1); + assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue); + assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues); + assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue); + + for (int i = 0; i < numberOfMultiValueHeaderCount; i++) { + String multiValueHeader = "test-multi-value" + i; + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2); + } + } + @Test void multiValueHeaderFromTest() { // GIVEN From e55bd5ea02bf7ee47b3c073ff3b3c00a2824c511 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Fri, 30 May 2025 23:06:41 +0900 Subject: [PATCH 2/5] Remove useless comment. Signed-off-by: Sanghyeok An --- .../kafka/support/DefaultKafkaHeaderMapperTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index 40070537e9..5765a9be4a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -416,8 +416,7 @@ void multiValueHeaderToTest() { } @ParameterizedTest - @ValueSource(ints = {2000}) -// @ValueSource(ints = {500, 1000, 2000}) + @ValueSource(ints = {500, 1000, 2000}) void hugeNumberOfSingleValueHeaderToTest(int numberOfSingleValueHeaderCount) { // GIVEN Headers rawHeaders = new RecordHeaders(); From 390f4bf5a99c5ded24384f53af77e8db413bd899 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sat, 31 May 2025 07:20:07 +0900 Subject: [PATCH 3/5] Addressing PR review Signed-off-by: Sanghyeok An --- .../kafka/support/AbstractKafkaHeaderMapper.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index da7eb347c2..1d943fa528 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -444,7 +444,7 @@ public boolean isNegated() { /** * A Cache that remembers whether a header name matches the multi-value pattern. */ - class HeaderPatternMatchCache { + private static final class HeaderPatternMatchCache { private static final int MAX_SIZE = 1000; @@ -452,19 +452,19 @@ class HeaderPatternMatchCache { private final ConcurrentLruCache singleValueHeaderPatternMatchCache = new ConcurrentLruCache<>(MAX_SIZE, key -> Boolean.TRUE); - public boolean isMultiValuePattern(String headerName) { + boolean isMultiValuePattern(String headerName) { return this.multiValueHeaderPatternMatchCache.contains(headerName); } - public boolean isSingleValuePattern(String headerName) { + boolean isSingleValuePattern(String headerName) { return this.singleValueHeaderPatternMatchCache.contains(headerName); } - public void cacheAsSingleValueHeader(String headerName) { + void cacheAsSingleValueHeader(String headerName) { this.singleValueHeaderPatternMatchCache.get(headerName); } - public void cacheAsMultiValueHeader(String headerName) { + void cacheAsMultiValueHeader(String headerName) { this.multiValueHeaderPatternMatchCache.get(headerName); } From f5e3b1f2c5745a50884ef7cfe496e449b36846c6 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Fri, 6 Jun 2025 23:16:41 +0900 Subject: [PATCH 4/5] Addressing PR review -s Signed-off-by: Sanghyeok An --- .../support/AbstractKafkaHeaderMapper.java | 58 +++++-------------- 1 file changed, 16 insertions(+), 42 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 1d943fa528..7f8a32eac6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -66,9 +66,11 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { private final List matchers = new ArrayList<>(); + private final ConcurrentLruCache matcherResultCache = new ConcurrentLruCache<>(1000, this::doesMatchInternal); + private final List multiValueHeaderMatchers = new ArrayList<>(); - private final HeaderPatternMatchCache headerMatchedCache = new HeaderPatternMatchCache(); + private final ConcurrentLruCache multiValueMatcherResultCache = new ConcurrentLruCache<>(1000, this::doesMatchMultiValueHeaderInternal); private final Map rawMappedHeaders = new HashMap<>(); @@ -243,13 +245,17 @@ protected boolean matchesForInbound(String header) { } private boolean doesMatch(String header) { + return this.matcherResultCache.get(header); + } + + private boolean doesMatchInternal(String header) { for (HeaderMatcher matcher : this.matchers) { if (matcher.matchHeader(header)) { return !matcher.isNegated(); } } this.logger.debug(() -> MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns", - header)); + header)); return false; } @@ -269,28 +275,25 @@ protected Object headerValueToAddOut(String key, Object value) { } /** - * Check whether the header value should be mapped to multiple values. + * Determine whether the given header name should be mapped to multiple values. + * This method first checks if the mapping result is already cached. + * If a cached result exists, it is returned immediately. + * If not, {@code doesMatchInternal(headerName)} is called to compute the result, + * which is then cached and returned. * @param headerName the header name. * @return True for multiple values at the same key. * @since 4.0 */ protected boolean doesMatchMultiValueHeader(String headerName) { - if (this.headerMatchedCache.isMultiValuePattern(headerName)) { - return true; - } - - if (this.headerMatchedCache.isSingleValuePattern(headerName)) { - return false; - } + return this.multiValueMatcherResultCache.get(headerName); + } + private boolean doesMatchMultiValueHeaderInternal(String headerName) { for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) { if (headerMatcher.matchHeader(headerName)) { - this.headerMatchedCache.cacheAsMultiValueHeader(headerName); return true; } } - - this.headerMatchedCache.cacheAsSingleValueHeader(headerName); return false; } @@ -441,33 +444,4 @@ public boolean isNegated() { } - /** - * A Cache that remembers whether a header name matches the multi-value pattern. - */ - private static final class HeaderPatternMatchCache { - - private static final int MAX_SIZE = 1000; - - private final ConcurrentLruCache multiValueHeaderPatternMatchCache = new ConcurrentLruCache<>(MAX_SIZE, key -> Boolean.TRUE); - - private final ConcurrentLruCache singleValueHeaderPatternMatchCache = new ConcurrentLruCache<>(MAX_SIZE, key -> Boolean.TRUE); - - boolean isMultiValuePattern(String headerName) { - return this.multiValueHeaderPatternMatchCache.contains(headerName); - } - - boolean isSingleValuePattern(String headerName) { - return this.singleValueHeaderPatternMatchCache.contains(headerName); - } - - void cacheAsSingleValueHeader(String headerName) { - this.singleValueHeaderPatternMatchCache.get(headerName); - } - - void cacheAsMultiValueHeader(String headerName) { - this.multiValueHeaderPatternMatchCache.get(headerName); - } - - } - } From 70ff8935b7a3b0c5a26d6fa5a6a7c3fce95aead8 Mon Sep 17 00:00:00 2001 From: Sanghyeok An Date: Sat, 7 Jun 2025 07:56:45 +0900 Subject: [PATCH 5/5] Addressing PR review. Signed-off-by: Sanghyeok An --- .../kafka/support/AbstractKafkaHeaderMapper.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 7f8a32eac6..7e028bf8cf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -66,11 +66,13 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { private final List matchers = new ArrayList<>(); - private final ConcurrentLruCache matcherResultCache = new ConcurrentLruCache<>(1000, this::doesMatchInternal); + private final ConcurrentLruCache matcherResultCache = + new ConcurrentLruCache<>(1000, this::doesMatchInternal); private final List multiValueHeaderMatchers = new ArrayList<>(); - private final ConcurrentLruCache multiValueMatcherResultCache = new ConcurrentLruCache<>(1000, this::doesMatchMultiValueHeaderInternal); + private final ConcurrentLruCache multiValueMatcherResultCache = + new ConcurrentLruCache<>(1000, this::doesMatchMultiValueHeaderInternal); private final Map rawMappedHeaders = new HashMap<>();