-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-3067: Spring Kafka support multiple headers with same key. #3874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
c7e5e09
42010c6
dfdfb8d
865b26a
019efb7
065bedc
6aed15b
462fe25
b3f4374
34e6860
dd24248
13267dc
c9c360e
4a762bd
b5375d4
c945dd5
07a49df
732ae6a
585f356
667f76c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -179,3 +179,27 @@ MessagingMessageConverter converter() { | |
|
||
If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template. | ||
|
||
[[multi-value-header]] | ||
== Support multi-value header | ||
|
||
Spring for Apache Kafka 4.0 supports multi-value header. | ||
To use multi-value header, the patterns for multi-value should be configured to `HeaderMapper`. | ||
If no patterns are configured to `HeaderMapper`, iterable header values under same header name will be handled like single value. | ||
|
||
If there are use cases in which multi-value headers are required, use this: | ||
|
||
[source, java] | ||
---- | ||
@Bean | ||
public void KafkaListenerContainerFactory<Integer, String> containerFactory() { | ||
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use header mapper not only in the listener container. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree.
|
||
new ConcurrentKafkaListenerContainerFactory<>(); | ||
|
||
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); | ||
headerMapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2"); | ||
|
||
MessagingMessageConverter converter = new MessagingMessageConverter(headerMapper); | ||
factory.setRecordMessageConverter(converter); | ||
return factory; | ||
} | ||
---- |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,3 +70,9 @@ Several deprecated items have been removed: | |
|
||
Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848]. | ||
For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs]. | ||
|
||
[[x40-multi-value-header]] | ||
=== Support multi-value header | ||
|
||
Spring for Apache Kafka 4.0 supports multi-value header for Kafka Record. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don’t need to repeat version since the whole chapter is about 4.0 news. Better to say what class(es) got a new option and then link to the target chapter. |
||
For details, see xref:kafka/headers.adoc#multi-value-header[Support multi-value header]. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, I know that I've just said There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A journey of a thousand miles begins with a single step. 😃 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -65,6 +65,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { | |
|
||
private final List<HeaderMatcher> matchers = new ArrayList<>(); | ||
|
||
private final List<HeaderMatcher> multiValueHeaderMatchers = new ArrayList<>(); | ||
|
||
private final Map<String, Boolean> rawMappedHeaders = new HashMap<>(); | ||
|
||
{ | ||
|
@@ -191,6 +193,18 @@ public void addRawMappedHeader(String name, boolean toString) { | |
this.rawMappedHeaders.put(name, toString); | ||
} | ||
|
||
/** | ||
* Add patterns for matching multi-value headers under the same key. | ||
* @param patterns the patterns for header. | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @since 4.0 | ||
*/ | ||
public void setMultiValueHeaderPatterns(String ... patterns) { | ||
this.multiValueHeaderMatchers.addAll(Arrays | ||
.stream(patterns) | ||
.map(SimplePatternBasedHeaderMatcher::new) | ||
.toList()); | ||
} | ||
|
||
protected boolean matches(String header, Object value) { | ||
if (matches(header)) { | ||
if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL)) | ||
|
@@ -251,6 +265,41 @@ protected Object headerValueToAddOut(String key, Object value) { | |
return valueToAdd; | ||
} | ||
|
||
/** | ||
* Check whether the header value should be mapped to multiple values. | ||
* @param headerName the header name. | ||
* @return True for multiple values at the same key. | ||
* @since 4.0 | ||
*/ | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
protected boolean doesMatchMultiValueHeader(String headerName) { | ||
for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) { | ||
if (headerMatcher.matchHeader(headerName)) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}. | ||
* @param headerName the header name. | ||
* @param header the header instance. | ||
* @param headers the target headers. | ||
* @since 4.0 | ||
*/ | ||
protected void fromUserHeader(String headerName, Header header, final Map<String, Object> headers) { | ||
if (!doesMatchMultiValueHeader(headerName)) { | ||
headers.put(headerName, headerValueToAddIn(header)); | ||
} | ||
else { | ||
@SuppressWarnings("unchecked") | ||
List<Object> headerValues = (List<Object>) | ||
headers.computeIfAbsent(headerName, key -> new ArrayList<>()); | ||
headerValues.add(headerValueToAddIn(header)); | ||
// headers.put(headerName, headerValues); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why comment out this line? |
||
} | ||
} | ||
|
||
@SuppressWarnings("NullAway") // Dataflow analysis limitation | ||
@Nullable | ||
private byte[] mapRawOut(String header, Object value) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,7 @@ | |
* @author Gary Russell | ||
* @author Artem Bilan | ||
* @author Soby Chacko | ||
* @author Sanghyoek An | ||
* | ||
* @since 1.3 | ||
* | ||
|
@@ -266,31 +267,17 @@ public void fromHeaders(MessageHeaders headers, Headers target) { | |
final ObjectMapper headerObjectMapper = getObjectMapper(); | ||
headers.forEach((key, rawValue) -> { | ||
if (matches(key, rawValue)) { | ||
Object valueToAdd = headerValueToAddOut(key, rawValue); | ||
if (valueToAdd instanceof byte[]) { | ||
target.add(new RecordHeader(key, (byte[]) valueToAdd)); | ||
} | ||
else { | ||
try { | ||
String className = valueToAdd.getClass().getName(); | ||
boolean encodeToJson = this.encodeStrings; | ||
if (this.toStringClasses.contains(className)) { | ||
valueToAdd = valueToAdd.toString(); | ||
className = JAVA_LANG_STRING; | ||
encodeToJson = true; | ||
} | ||
if (!encodeToJson && valueToAdd instanceof String) { | ||
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset()))); | ||
} | ||
else { | ||
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd))); | ||
} | ||
jsonHeaders.put(key, className); | ||
if (doesMatchMultiValueHeader(key)) { | ||
if (rawValue instanceof Iterable<?> valuesToMap) { | ||
valuesToMap.forEach(o -> fromHeader(key, o, jsonHeaders, headerObjectMapper, target)); | ||
} | ||
catch (Exception e) { | ||
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); | ||
else { | ||
fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target); | ||
} | ||
} | ||
else { | ||
fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target); | ||
} | ||
} | ||
}); | ||
if (!jsonHeaders.isEmpty()) { | ||
|
@@ -324,12 +311,42 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { | |
populateJsonValueHeader(header, requestedType, headers); | ||
} | ||
else { | ||
headers.put(headerName, headerValueToAddIn(header)); | ||
fromUserHeader(headerName, header, headers); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
private void fromHeader(String key, Object rawValue, Map<String, String> jsonHeaders, | ||
ObjectMapper headerObjectMapper, Headers target) { | ||
|
||
Object valueToAdd = headerValueToAddOut(key, rawValue); | ||
if (valueToAdd instanceof byte[]) { | ||
target.add(new RecordHeader(key, (byte[]) valueToAdd)); | ||
} | ||
else { | ||
try { | ||
String className = valueToAdd.getClass().getName(); | ||
boolean encodeToJson = this.encodeStrings; | ||
if (this.toStringClasses.contains(className)) { | ||
valueToAdd = valueToAdd.toString(); | ||
className = JAVA_LANG_STRING; | ||
encodeToJson = true; | ||
} | ||
if (!encodeToJson && valueToAdd instanceof String) { | ||
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset()))); | ||
} | ||
else { | ||
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd))); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we can move |
||
} | ||
jsonHeaders.putIfAbsent(key, className); | ||
} | ||
catch (Exception e) { | ||
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName()); | ||
} | ||
} | ||
} | ||
|
||
private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) { | ||
Class<?> type = Object.class; | ||
boolean trusted = false; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2018-2022 the original author or authors. | ||
* Copyright 2018-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -35,6 +35,7 @@ | |
* The exceptions are correlation and reply headers for request/reply | ||
* | ||
* @author Gary Russell | ||
* @author Sanghyeok An | ||
* @since 2.1.3 | ||
* | ||
*/ | ||
|
@@ -94,27 +95,41 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte | |
public void fromHeaders(MessageHeaders headers, Headers target) { | ||
headers.forEach((key, value) -> { | ||
if (!NEVER.contains(key)) { | ||
Object valueToAdd = headerValueToAddOut(key, value); | ||
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { | ||
target.add(new RecordHeader(key, (byte[]) valueToAdd)); | ||
if (doesMatchMultiValueHeader(key)) { | ||
if (value instanceof Iterable<?> valuesToMap) { | ||
valuesToMap.forEach(o -> fromHeader(key, o, target)); | ||
} | ||
else { | ||
fromHeader(key, value, target); | ||
} | ||
} | ||
else { | ||
fromHeader(key, value, target); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
@Override | ||
public void toHeaders(Headers source, Map<String, Object> target) { | ||
public void toHeaders(Headers source, Map<String, Object> headers) { | ||
source.forEach(header -> { | ||
String headerName = header.key(); | ||
if (matchesForInbound(headerName)) { | ||
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) { | ||
target.put(headerName, ByteBuffer.wrap(header.value()).getInt()); | ||
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt()); | ||
} | ||
else { | ||
target.put(headerName, headerValueToAddIn(header)); | ||
fromUserHeader(headerName, header, headers); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
private void fromHeader(String key, Object value, Headers target) { | ||
Object valueToAdd = headerValueToAddOut(key, value); | ||
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably can use pattern matching for
|
||
target.add(new RecordHeader(key, (byte[]) valueToAdd)); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2017-2024 the original author or authors. | ||
* Copyright 2017-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -33,6 +33,7 @@ | |
import org.junit.jupiter.api.Test; | ||
|
||
import org.springframework.core.log.LogAccessor; | ||
import org.springframework.kafka.retrytopic.RetryTopicHeaders; | ||
import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType; | ||
import org.springframework.kafka.support.serializer.DeserializationException; | ||
import org.springframework.kafka.support.serializer.SerializationTestUtils; | ||
|
@@ -56,6 +57,7 @@ | |
* @author Gary Russell | ||
* @author Artem Bilan | ||
* @author Soby Chacko | ||
* @author Sanghyeok An | ||
* | ||
* @since 1.3 | ||
* | ||
|
@@ -332,6 +334,86 @@ void inboundJson() { | |
.containsKey("baz"); | ||
} | ||
|
||
@Test | ||
void multiValueHeaderToTest() { | ||
// GIVEN | ||
String multiValueHeader1 = "test-multi-value1"; | ||
String multiValueHeader2 = "test-multi-value2"; | ||
String singleValueHeader = "test-single-value1"; | ||
|
||
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); | ||
mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2); | ||
|
||
Headers rawHeaders = new RecordHeaders(); | ||
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 }); | ||
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, new byte[] { 0, 0, 0, 1 }); | ||
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, new byte[] { 0, 0, 0, 5 }); | ||
rawHeaders.add(singleValueHeader, new byte[] { 0, 0, 0, 6 }); | ||
|
||
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 0 }); | ||
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 1 }); | ||
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 2 }); | ||
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 3 }); | ||
|
||
rawHeaders.add(multiValueHeader2, new byte[] { 0, 0, 0, 4 }); | ||
rawHeaders.add(multiValueHeader2, new byte[] { 0, 0, 0, 5 }); | ||
|
||
// WHEN | ||
Map<String, Object> mappedHeaders = new HashMap<>(); | ||
mapper.toHeaders(rawHeaders, mappedHeaders); | ||
|
||
// THEN | ||
assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1); | ||
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(new byte[] { 0, 0, 0, 1 }); | ||
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(new byte[] { 0, 0, 0, 5 }); | ||
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(new byte[] { 0, 0, 0, 6 }); | ||
|
||
@SuppressWarnings("unchecked") | ||
List<byte[]> multiHeader1Values = (List<byte[]>) mappedHeaders.get(multiValueHeader1); | ||
assertThat(multiHeader1Values).contains(new byte[] { 0, 0, 0, 0 }, | ||
new byte[] { 0, 0, 0, 1 }, | ||
new byte[] { 0, 0, 0, 2 }, | ||
new byte[] { 0, 0, 0, 3 }); | ||
|
||
@SuppressWarnings("unchecked") | ||
List<byte[]> multiHeader2Values = (List<byte[]>) mappedHeaders.get(multiValueHeader2); | ||
assertThat(multiHeader2Values).contains(new byte[] { 0, 0, 0, 4 }, | ||
new byte[] { 0, 0, 0, 5 }); | ||
} | ||
|
||
@Test | ||
void multiValueHeaderFromTest() { | ||
// GIVEN | ||
String multiValueHeader1 = "test-multi-value1"; | ||
String multiValueHeader2 = "test-multi-value2"; | ||
String singleValueHeader = "test-single-value1"; | ||
|
||
Message<String> message = MessageBuilder | ||
.withPayload("test-multi-value-header") | ||
.setHeader(multiValueHeader1, List.of(new byte[] { 0, 0, 0, 1 }, | ||
new byte[] { 0, 0, 0, 2 })) | ||
.setHeader(multiValueHeader2, List.of(new byte[] { 0, 0, 0, 3 }, | ||
new byte[] { 0, 0, 0, 4 })) | ||
.setHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 }) | ||
.build(); | ||
|
||
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); | ||
mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2); | ||
|
||
// WHEN | ||
Headers results = new RecordHeaders(); | ||
mapper.fromHeaders(message.getHeaders(), results); | ||
|
||
// THEN | ||
assertThat(results).contains( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can utilize |
||
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 1 }), | ||
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 2 }), | ||
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 3 }), | ||
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 4 }), | ||
new RecordHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 }) | ||
); | ||
} | ||
|
||
@Test | ||
void deserializationExceptionHeadersAreMappedAsNonByteArray() { | ||
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
use this
is a command for specific person.That is not a technical documentation sentence.
Either way, I don't find the text here as useful as it could be.
I would expect an explanation about multi-value headers in Kafka and then how their are handled by default and so on.
Then other other hand, there should be an explanation how iterable message headers are processed now and what we do if there is respective pattern matching.