Skip to content

Commit fb754cc

Browse files
spring-projectsGH-3879: Add cache to optimize header match performance.
Fixes: spring-projects#3879 Issue link: spring-projects#3879 What Add a LRU cache for pattern match of KafkaHeaderMapper. Why? To improve CPU usage used by pattern match of KafkaHeaderMapper. Commonly, many Kafka records in the same topic will have the same header name. Currently, Pattern Match has O(M*N) time complexity, where M is pattern length, N is String length. If results of patterns match are cached and KafkaHeaderMapper uses it, KafkaHeaderMapper can expect improvement in terms of CPU usage. Signed-off-by: Sanghyeok An <[email protected]>
1 parent 17fedf6 commit fb754cc

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.core.log.LogAccessor;
3737
import org.springframework.messaging.MessageHeaders;
3838
import org.springframework.util.Assert;
39+
import org.springframework.util.ConcurrentLruCache;
3940
import org.springframework.util.ObjectUtils;
4041
import org.springframework.util.PatternMatchUtils;
4142

@@ -67,6 +68,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {
6768

6869
private final List<HeaderMatcher> multiValueHeaderMatchers = new ArrayList<>();
6970

71+
private final HeaderPatternMatchCache headerMatchedCache = new HeaderPatternMatchCache();
72+
7073
private final Map<String, Boolean> rawMappedHeaders = new HashMap<>();
7174

7275
{
@@ -272,11 +275,22 @@ protected Object headerValueToAddOut(String key, Object value) {
272275
* @since 4.0
273276
*/
274277
protected boolean doesMatchMultiValueHeader(String headerName) {
278+
if (this.headerMatchedCache.isMultiValuePattern(headerName)) {
279+
return true;
280+
}
281+
282+
if (this.headerMatchedCache.isSingleValuePattern(headerName)) {
283+
return false;
284+
}
285+
275286
for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) {
276287
if (headerMatcher.matchHeader(headerName)) {
288+
this.headerMatchedCache.cacheAsMultiValueHeader(headerName);
277289
return true;
278290
}
279291
}
292+
293+
this.headerMatchedCache.cacheAsSingleValueHeader(headerName);
280294
return false;
281295
}
282296

@@ -427,4 +441,33 @@ public boolean isNegated() {
427441

428442
}
429443

444+
/**
445+
* A Cache that remembers whether a header name matches the multi-value pattern.
446+
*/
447+
class HeaderPatternMatchCache {
448+
449+
private static final int MAX_SIZE = 1000;
450+
451+
private final ConcurrentLruCache<String, Boolean> multiValueHeaderPatternMatchCache = new ConcurrentLruCache<>(MAX_SIZE, key -> Boolean.TRUE);
452+
453+
private final ConcurrentLruCache<String, Boolean> singleValueHeaderPatternMatchCache = new ConcurrentLruCache<>(MAX_SIZE, key -> Boolean.TRUE);
454+
455+
public boolean isMultiValuePattern(String headerName) {
456+
return this.multiValueHeaderPatternMatchCache.contains(headerName);
457+
}
458+
459+
public boolean isSingleValuePattern(String headerName) {
460+
return this.singleValueHeaderPatternMatchCache.contains(headerName);
461+
}
462+
463+
public void cacheAsSingleValueHeader(String headerName) {
464+
this.singleValueHeaderPatternMatchCache.get(headerName);
465+
}
466+
467+
public void cacheAsMultiValueHeader(String headerName) {
468+
this.multiValueHeaderPatternMatchCache.get(headerName);
469+
}
470+
471+
}
472+
430473
}

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.kafka.common.header.internals.RecordHeaders;
3232
import org.assertj.core.api.InstanceOfAssertFactories;
3333
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.ValueSource;
3436

3537
import org.springframework.core.log.LogAccessor;
3638
import org.springframework.kafka.retrytopic.RetryTopicHeaders;
@@ -413,6 +415,103 @@ void multiValueHeaderToTest() {
413415
.containsExactly(multiValueWildCardHeader2Value1, multiValueWildCardHeader2Value2);
414416
}
415417

418+
@ParameterizedTest
419+
@ValueSource(ints = {2000})
420+
// @ValueSource(ints = {500, 1000, 2000})
421+
void hugeNumberOfSingleValueHeaderToTest(int numberOfSingleValueHeaderCount) {
422+
// GIVEN
423+
Headers rawHeaders = new RecordHeaders();
424+
425+
String multiValueHeader1 = "test-multi-value1";
426+
byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 };
427+
byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 };
428+
429+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value1);
430+
rawHeaders.add(multiValueHeader1, multiValueHeader1Value2);
431+
432+
byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 };
433+
byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 };
434+
byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 };
435+
436+
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue);
437+
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue);
438+
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues);
439+
440+
byte[] singleValueHeaderValue = { 0, 0, 0, 6 };
441+
for (int i = 0; i < numberOfSingleValueHeaderCount; i++) {
442+
String singleValueHeader = "test-single-value" + i;
443+
rawHeaders.add(singleValueHeader, singleValueHeaderValue);
444+
}
445+
446+
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
447+
mapper.setMultiValueHeaderPatterns(multiValueHeader1);
448+
449+
// WHEN
450+
Map<String, Object> mappedHeaders = new HashMap<>();
451+
mapper.toHeaders(rawHeaders, mappedHeaders);
452+
453+
// THEN
454+
assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1);
455+
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue);
456+
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues);
457+
458+
for (int i = 0; i < numberOfSingleValueHeaderCount; i++) {
459+
String singleValueHeader = "test-single-value" + i;
460+
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue);
461+
}
462+
463+
assertThat(mappedHeaders)
464+
.extractingByKey(multiValueHeader1, InstanceOfAssertFactories.list(byte[].class))
465+
.containsExactly(multiValueHeader1Value1, multiValueHeader1Value2);
466+
}
467+
468+
@ParameterizedTest
469+
@ValueSource(ints = {500, 1000, 2000})
470+
void hugeNumberOfMultiValueHeaderToTest(int numberOfMultiValueHeaderCount) {
471+
// GIVEN
472+
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
473+
Headers rawHeaders = new RecordHeaders();
474+
475+
byte[] multiValueHeader1Value1 = { 0, 0, 0, 0 };
476+
byte[] multiValueHeader1Value2 = { 0, 0, 0, 1 };
477+
478+
for (int i = 0; i < numberOfMultiValueHeaderCount; i++) {
479+
String multiValueHeader = "test-multi-value" + i;
480+
mapper.setMultiValueHeaderPatterns(multiValueHeader);
481+
rawHeaders.add(multiValueHeader, multiValueHeader1Value1);
482+
rawHeaders.add(multiValueHeader, multiValueHeader1Value2);
483+
}
484+
485+
byte[] deliveryAttemptHeaderValue = { 0, 0, 0, 1 };
486+
byte[] originalOffsetHeaderValue = { 0, 0, 0, 2 };
487+
byte[] defaultHeaderAttemptsValues = { 0, 0, 0, 5 };
488+
489+
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, deliveryAttemptHeaderValue);
490+
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, originalOffsetHeaderValue);
491+
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, defaultHeaderAttemptsValues);
492+
493+
String singleValueHeader = "test-single-value";
494+
byte[] singleValueHeaderValue = { 0, 0, 0, 6 };
495+
rawHeaders.add(singleValueHeader, singleValueHeaderValue);
496+
497+
// WHEN
498+
Map<String, Object> mappedHeaders = new HashMap<>();
499+
mapper.toHeaders(rawHeaders, mappedHeaders);
500+
501+
// THEN
502+
assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1);
503+
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(originalOffsetHeaderValue);
504+
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(defaultHeaderAttemptsValues);
505+
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(singleValueHeaderValue);
506+
507+
for (int i = 0; i < numberOfMultiValueHeaderCount; i++) {
508+
String multiValueHeader = "test-multi-value" + i;
509+
assertThat(mappedHeaders)
510+
.extractingByKey(multiValueHeader, InstanceOfAssertFactories.list(byte[].class))
511+
.containsExactly(multiValueHeader1Value1, multiValueHeader1Value2);
512+
}
513+
}
514+
416515
@Test
417516
void multiValueHeaderFromTest() {
418517
// GIVEN

0 commit comments

Comments
 (0)