diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 6404e29f83..7e028bf8cf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -36,6 +36,7 @@ import org.springframework.core.log.LogAccessor; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; +import org.springframework.util.ConcurrentLruCache; import org.springframework.util.ObjectUtils; import org.springframework.util.PatternMatchUtils; @@ -65,8 +66,14 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { private final List matchers = new ArrayList<>(); + private final ConcurrentLruCache matcherResultCache = + new ConcurrentLruCache<>(1000, this::doesMatchInternal); + private final List multiValueHeaderMatchers = new ArrayList<>(); + private final ConcurrentLruCache multiValueMatcherResultCache = + new ConcurrentLruCache<>(1000, this::doesMatchMultiValueHeaderInternal); + private final Map rawMappedHeaders = new HashMap<>(); { @@ -240,13 +247,17 @@ protected boolean matchesForInbound(String header) { } private boolean doesMatch(String header) { + return this.matcherResultCache.get(header); + } + + private boolean doesMatchInternal(String header) { for (HeaderMatcher matcher : this.matchers) { if (matcher.matchHeader(header)) { return !matcher.isNegated(); } } this.logger.debug(() -> MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns", - header)); + header)); return false; } @@ -266,12 +277,20 @@ protected Object headerValueToAddOut(String key, Object value) { } /** - * Check whether the header value should be mapped to multiple values. + * Determine whether the given header name should be mapped to multiple values. + * This method first checks if the mapping result is already cached. + * If a cached result exists, it is returned immediately. + * If not, {@code doesMatchInternal(headerName)} is called to compute the result, + * which is then cached and returned. * @param headerName the header name. * @return True for multiple values at the same key. * @since 4.0 */ protected boolean doesMatchMultiValueHeader(String headerName) { + return this.multiValueMatcherResultCache.get(headerName); + } + + private boolean doesMatchMultiValueHeaderInternal(String headerName) { for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) { if (headerMatcher.matchHeader(headerName)) { return true; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index 1151e6cd3d..5765a9be4a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -31,6 +31,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.retrytopic.RetryTopicHeaders; @@ -413,6 +415,102 @@ void multiValueHeaderToTest() { .containsExactly(multiValueWildCardHeader2Value1, multiValueWildCardHeader2Value2); } + @ParameterizedTest + @ValueSource(ints = {500, 1000, 2000}) + void hugeNumberOfSingleValueHeaderToTest(int numberOfSingleValueHeaderCount) { + // GIVEN + Headers rawHeaders = new RecordHeaders(); + + String multiValueHeader1 = "test-multi-value1"; + byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 }; + + rawHeaders.add(multiValueHeader1, multiValueHeader1Value1); + rawHeaders.add(multiValueHeader1, multiValueHeader1Value2); + + byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 }; + byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 }; + byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 }; + + rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue); + rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue); + rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues); + + byte[] singleValueHeaderValue = { 0, 0, 0, 6 }; + for (int i = 0; i < numberOfSingleValueHeaderCount; i++) { + String singleValueHeader = "test-single-value" + i; + rawHeaders.add(singleValueHeader, singleValueHeaderValue); + } + + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + mapper.setMultiValueHeaderPatterns(multiValueHeader1); + + // WHEN + Map mappedHeaders = new HashMap<>(); + mapper.toHeaders(rawHeaders, mappedHeaders); + + // THEN + assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1); + assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue); + assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues); + + for (int i = 0; i < numberOfSingleValueHeaderCount; i++) { + String singleValueHeader = "test-single-value" + i; + assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue); + } + + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader1, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2); + } + + @ParameterizedTest + @ValueSource(ints = {500, 1000, 2000}) + void hugeNumberOfMultiValueHeaderToTest(int numberOfMultiValueHeaderCount) { + // GIVEN + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + Headers rawHeaders = new RecordHeaders(); + + byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 }; + byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 }; + + for (int i = 0; i < numberOfMultiValueHeaderCount; i++) { + String multiValueHeader = "test-multi-value" + i; + mapper.setMultiValueHeaderPatterns(multiValueHeader); + rawHeaders.add(multiValueHeader, multiValueHeader1Value1); + rawHeaders.add(multiValueHeader, multiValueHeader1Value2); + } + + byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 }; + byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 }; + byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 }; + + rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue); + rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue); + rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues); + + String singleValueHeader = "test-single-value"; + byte[] singleValueHeaderValue = { 0, 0, 0, 6 }; + rawHeaders.add(singleValueHeader, singleValueHeaderValue); + + // WHEN + Map mappedHeaders = new HashMap<>(); + mapper.toHeaders(rawHeaders, mappedHeaders); + + // THEN + assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1); + assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue); + assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues); + assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue); + + for (int i = 0; i < numberOfMultiValueHeaderCount; i++) { + String multiValueHeader = "test-multi-value" + i; + assertThat(mappedHeaders) + .extractingByKey(multiValueHeader, InstanceOfAssertFactories.list(byte[].class)) + .containsExactly(multiValueHeader1Value1, multiValueHeader1Value2); + } + } + @Test void multiValueHeaderFromTest() { // GIVEN