Skip to content

GH-3067: Spring Kafka support multiple headers with same key. #3874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,27 @@ MessagingMessageConverter converter() {

If using Spring Boot, it will auto configure this converter bean into the auto-configured `KafkaTemplate`; otherwise you should add this converter to the template.

[[multi-value-header]]
== Support multi-value header

Spring for Apache Kafka 4.0 supports multi-value header.
To use multi-value header, the patterns for multi-value should be configured to `HeaderMapper`.
If no patterns are configured to `HeaderMapper`, iterable header values under same header name will be handled like single value.

If there are use cases in which multi-value headers are required, use this:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use this is a command for specific person.
That is not a technical documentation sentence.

Either way, I don't find the text here as useful as it could be.
I would expect an explanation about multi-value headers in Kafka and then how their are handled by default and so on.
Then other other hand, there should be an explanation how iterable message headers are processed now and what we do if there is respective pattern matching.


[source, java]
----
@Bean
public void KafkaListenerContainerFactory<Integer, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use header mapper not only in the listener container.
And such a factory is not only one way to configure that.
Therefore I would expect to not have such a misleading bean definition in this docs section.
More over, I would say there is no need in the code snippet just for a setMultiValueHeaderPatterns() at all.
Better to explain what are those patterns and give examples of possible values.
Now, you provide only whole header names.
What if I would like to have both mentioned headers mapped with one pattern?
I guess something like test-multi-* would be enough, but I'm not sure.
WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.
I also added note section following java doc of PatternMatchUtils.simpleMatch(...).

Regular expressions are *not* supported; only the `*` wildcard is allowed in simple patterns—supporting direct equality and forms such as:

- `xxx*`
- `*xxx`
- `*xxx*`
- `xxx*yyy`

new ConcurrentKafkaListenerContainerFactory<>();

DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");

MessagingMessageConverter converter = new MessagingMessageConverter(headerMapper);
factory.setRecordMessageConverter(converter);
return factory;
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ Several deprecated items have been removed:

Spring for Apache Kafka 4.0 supports Kafka 4.0’s new consumer rebalance protocol - https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol[KIP-848].
For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-rebalalcne-protocol[New Consumer Rebalace Protocol docs].

[[x40-multi-value-header]]
=== Support multi-value header

Spring for Apache Kafka 4.0 supports multi-value header for Kafka Record.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don’t need to repeat version since the whole chapter is about 4.0 news. Better to say what class(es) got a new option and then link to the target chapter.

For details, see xref:kafka/headers.adoc#multi-value-header[Support multi-value header].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I know that I've just said use this is not OK, and then we have that see al around What's New section.
We probably can revise it eventually in favor of something like More details in.
But that is my personal opinion.
I guess no hard requirements and no legal accidents yet.
So, we are good so far 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A journey of a thousand miles begins with a single step. 😃
I modified 'see' to 'More details are available in ...'.

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper {

private final List<HeaderMatcher> matchers = new ArrayList<>();

private final List<HeaderMatcher> multiValueHeaderMatchers = new ArrayList<>();

private final Map<String, Boolean> rawMappedHeaders = new HashMap<>();

{
Expand Down Expand Up @@ -191,6 +193,18 @@ public void addRawMappedHeader(String name, boolean toString) {
this.rawMappedHeaders.put(name, toString);
}

/**
* Add patterns for matching multi-value headers under the same key.
* @param patterns the patterns for header.
* @since 4.0
*/
public void setMultiValueHeaderPatterns(String ... patterns) {
this.multiValueHeaderMatchers.addAll(Arrays
.stream(patterns)
.map(SimplePatternBasedHeaderMatcher::new)
.toList());
}

protected boolean matches(String header, Object value) {
if (matches(header)) {
if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL))
Expand Down Expand Up @@ -251,6 +265,41 @@ protected Object headerValueToAddOut(String key, Object value) {
return valueToAdd;
}

/**
* Check whether the header value should be mapped to multiple values.
* @param headerName the header name.
* @return True for multiple values at the same key.
* @since 4.0
*/
protected boolean doesMatchMultiValueHeader(String headerName) {
for (HeaderMatcher headerMatcher : this.multiValueHeaderMatchers) {
if (headerMatcher.matchHeader(headerName)) {
return true;
}
}
return false;
}

/**
* Handle non-reserved headers in {@link DefaultKafkaHeaderMapper}.
* @param headerName the header name.
* @param header the header instance.
* @param headers the target headers.
* @since 4.0
*/
protected void fromUserHeader(String headerName, Header header, final Map<String, Object> headers) {
if (!doesMatchMultiValueHeader(headerName)) {
headers.put(headerName, headerValueToAddIn(header));
}
else {
@SuppressWarnings("unchecked")
List<Object> headerValues = (List<Object>)
headers.computeIfAbsent(headerName, key -> new ArrayList<>());
headerValues.add(headerValueToAddIn(header));
// headers.put(headerName, headerValues);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why comment out this line?
Why just not remove?

}
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Nullable
private byte[] mapRawOut(String header, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Sanghyoek An
*
* @since 1.3
*
Expand Down Expand Up @@ -266,31 +267,17 @@ public void fromHeaders(MessageHeaders headers, Headers target) {
final ObjectMapper headerObjectMapper = getObjectMapper();
headers.forEach((key, rawValue) -> {
if (matches(key, rawValue)) {
Object valueToAdd = headerValueToAddOut(key, rawValue);
if (valueToAdd instanceof byte[]) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
if (!encodeToJson && valueToAdd instanceof String) {
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
}
else {
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
}
jsonHeaders.put(key, className);
if (doesMatchMultiValueHeader(key)) {
if (rawValue instanceof Iterable<?> valuesToMap) {
valuesToMap.forEach(o -> fromHeader(key, o, jsonHeaders, headerObjectMapper, target));
}
catch (Exception e) {
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
else {
fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target);
}
}
else {
fromHeader(key, rawValue, jsonHeaders, headerObjectMapper, target);
}
}
});
if (!jsonHeaders.isEmpty()) {
Expand Down Expand Up @@ -324,12 +311,42 @@ else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
populateJsonValueHeader(header, requestedType, headers);
}
else {
headers.put(headerName, headerValueToAddIn(header));
fromUserHeader(headerName, header, headers);
}
}
});
}

private void fromHeader(String key, Object rawValue, Map<String, String> jsonHeaders,
ObjectMapper headerObjectMapper, Headers target) {

Object valueToAdd = headerValueToAddOut(key, rawValue);
if (valueToAdd instanceof byte[]) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
}
else {
try {
String className = valueToAdd.getClass().getName();
boolean encodeToJson = this.encodeStrings;
if (this.toStringClasses.contains(className)) {
valueToAdd = valueToAdd.toString();
className = JAVA_LANG_STRING;
encodeToJson = true;
}
if (!encodeToJson && valueToAdd instanceof String) {
target.add(new RecordHeader(key, ((String) valueToAdd).getBytes(getCharset())));
}
else {
target.add(new RecordHeader(key, headerObjectMapper.writeValueAsBytes(valueToAdd)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we can move target.add(new RecordHeader(key, out of if..else.
And have there only value calculated.

}
jsonHeaders.putIfAbsent(key, className);
}
catch (Exception e) {
logger.error(e, () -> "Could not map " + key + " with type " + rawValue.getClass().getName());
}
}
}

private void populateJsonValueHeader(Header header, String requestedType, Map<String, Object> headers) {
Class<?> type = Object.class;
boolean trusted = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
* The exceptions are correlation and reply headers for request/reply
*
* @author Gary Russell
* @author Sanghyeok An
* @since 2.1.3
*
*/
Expand Down Expand Up @@ -94,27 +95,41 @@ public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patte
public void fromHeaders(MessageHeaders headers, Headers target) {
headers.forEach((key, value) -> {
if (!NEVER.contains(key)) {
Object valueToAdd = headerValueToAddOut(key, value);
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
target.add(new RecordHeader(key, (byte[]) valueToAdd));
if (doesMatchMultiValueHeader(key)) {
if (value instanceof Iterable<?> valuesToMap) {
valuesToMap.forEach(o -> fromHeader(key, o, target));
}
else {
fromHeader(key, value, target);
}
}
else {
fromHeader(key, value, target);
}
}
});
}

@Override
public void toHeaders(Headers source, Map<String, Object> target) {
public void toHeaders(Headers source, Map<String, Object> headers) {
source.forEach(header -> {
String headerName = header.key();
if (matchesForInbound(headerName)) {
if (headerName.equals(KafkaHeaders.DELIVERY_ATTEMPT)) {
target.put(headerName, ByteBuffer.wrap(header.value()).getInt());
headers.put(headerName, ByteBuffer.wrap(header.value()).getInt());
}
else {
target.put(headerName, headerValueToAddIn(header));
fromUserHeader(headerName, header, headers);
}
}
});
}

private void fromHeader(String key, Object value, Headers target) {
Object valueToAdd = headerValueToAddOut(key, value);
if (valueToAdd instanceof byte[] && matches(key, valueToAdd)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably can use pattern matching for instanceof:

if (valueToAdd instanceof byte[] bytes && matches(key, bytes)) {
    target.add(new RecordHeader(key, bytes));

target.add(new RecordHeader(key, (byte[]) valueToAdd));
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@
import org.junit.jupiter.api.Test;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.retrytopic.RetryTopicHeaders;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationTestUtils;
Expand All @@ -56,6 +57,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 1.3
*
Expand Down Expand Up @@ -332,6 +334,86 @@ void inboundJson() {
.containsKey("baz");
}

@Test
void multiValueHeaderToTest() {
// GIVEN
String multiValueHeader1 = "test-multi-value1";
String multiValueHeader2 = "test-multi-value2";
String singleValueHeader = "test-single-value1";

DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2);

Headers rawHeaders = new RecordHeaders();
rawHeaders.add(KafkaHeaders.DELIVERY_ATTEMPT, new byte[] { 0, 0, 0, 1 });
rawHeaders.add(KafkaHeaders.ORIGINAL_OFFSET, new byte[] { 0, 0, 0, 1 });
rawHeaders.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, new byte[] { 0, 0, 0, 5 });
rawHeaders.add(singleValueHeader, new byte[] { 0, 0, 0, 6 });

rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 0 });
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 1 });
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 2 });
rawHeaders.add(multiValueHeader1, new byte[] { 0, 0, 0, 3 });

rawHeaders.add(multiValueHeader2, new byte[] { 0, 0, 0, 4 });
rawHeaders.add(multiValueHeader2, new byte[] { 0, 0, 0, 5 });

// WHEN
Map<String, Object> mappedHeaders = new HashMap<>();
mapper.toHeaders(rawHeaders, mappedHeaders);

// THEN
assertThat(mappedHeaders.get(KafkaHeaders.DELIVERY_ATTEMPT)).isEqualTo(1);
assertThat(mappedHeaders.get(KafkaHeaders.ORIGINAL_OFFSET)).isEqualTo(new byte[] { 0, 0, 0, 1 });
assertThat(mappedHeaders.get(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS)).isEqualTo(new byte[] { 0, 0, 0, 5 });
assertThat(mappedHeaders.get(singleValueHeader)).isEqualTo(new byte[] { 0, 0, 0, 6 });

@SuppressWarnings("unchecked")
List<byte[]> multiHeader1Values = (List<byte[]>) mappedHeaders.get(multiValueHeader1);
assertThat(multiHeader1Values).contains(new byte[] { 0, 0, 0, 0 },
new byte[] { 0, 0, 0, 1 },
new byte[] { 0, 0, 0, 2 },
new byte[] { 0, 0, 0, 3 });

@SuppressWarnings("unchecked")
List<byte[]> multiHeader2Values = (List<byte[]>) mappedHeaders.get(multiValueHeader2);
assertThat(multiHeader2Values).contains(new byte[] { 0, 0, 0, 4 },
new byte[] { 0, 0, 0, 5 });
}

@Test
void multiValueHeaderFromTest() {
// GIVEN
String multiValueHeader1 = "test-multi-value1";
String multiValueHeader2 = "test-multi-value2";
String singleValueHeader = "test-single-value1";

Message<String> message = MessageBuilder
.withPayload("test-multi-value-header")
.setHeader(multiValueHeader1, List.of(new byte[] { 0, 0, 0, 1 },
new byte[] { 0, 0, 0, 2 }))
.setHeader(multiValueHeader2, List.of(new byte[] { 0, 0, 0, 3 },
new byte[] { 0, 0, 0, 4 }))
.setHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
.build();

DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setMultiValueHeaderPatterns(multiValueHeader1, multiValueHeader2);

// WHEN
Headers results = new RecordHeaders();
mapper.fromHeaders(message.getHeaders(), results);

// THEN
assertThat(results).contains(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can utilize RecordHeaders API for much cleaner assertion. This contains is just not readable.

new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 1 }),
new RecordHeader(multiValueHeader1, new byte[] { 0, 0, 0, 2 }),
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 3 }),
new RecordHeader(multiValueHeader2, new byte[] { 0, 0, 0, 4 }),
new RecordHeader(singleValueHeader, new byte[] { 0, 0, 0, 5 })
);
}

@Test
void deserializationExceptionHeadersAreMappedAsNonByteArray() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Expand Down
Loading