-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-3067: Spring Kafka support multiple headers with same key. #3874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
c7e5e09
42010c6
dfdfb8d
865b26a
019efb7
065bedc
6aed15b
462fe25
b3f4374
34e6860
dd24248
13267dc
c9c360e
4a762bd
b5375d4
c945dd5
07a49df
732ae6a
585f356
667f76c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Copyright 2017-2025 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.kafka.support; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import org.apache.kafka.common.header.Header; | ||
|
||
import org.springframework.kafka.retrytopic.RetryTopicHeaders; | ||
|
||
/** | ||
* Extended Header Mapper based on {@link DefaultKafkaHeaderMapper}. | ||
* This Header Mapper manages header values as a list, | ||
* except for certain reserved headers. | ||
* Other behaviors are identical to {@link DefaultKafkaHeaderMapper}. | ||
* | ||
* @author Sanghyeok An | ||
* | ||
* @since 4.0.0 | ||
* | ||
*/ | ||
public class MultiValueKafkaHeaderMapper extends DefaultKafkaHeaderMapper { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that I agree about extra abstraction on the matter. Make sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @artembilan How can To handle such cases, I think it would be necessary to add a public method that allows users to configure the pattern for iterable custom headers as a workaround. If the headers that should be treated as iterable are managed with a whitelist approach, I believe that by moving part of the logic from the Overall, it makes sense to me 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe the Yes, I would expect some configuration property on the On the mapping from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice. I have one question. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because they are related to racism and our team follows ethic rules to use only neutral official language. We do have a capacity to review your PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see, My bad. Anyway, I will take a crack ASAP. |
||
|
||
private final List<String> defaultSingleValueHeaderList = List.of( | ||
KafkaHeaders.PREFIX, | ||
KafkaHeaders.RECEIVED, | ||
KafkaHeaders.TOPIC, | ||
KafkaHeaders.KEY, | ||
KafkaHeaders.PARTITION, | ||
KafkaHeaders.OFFSET, | ||
KafkaHeaders.RAW_DATA, | ||
KafkaHeaders.RECORD_METADATA, | ||
KafkaHeaders.ACKNOWLEDGMENT, | ||
KafkaHeaders.CONSUMER, | ||
KafkaHeaders.RECEIVED_TOPIC, | ||
KafkaHeaders.RECEIVED_KEY, | ||
KafkaHeaders.RECEIVED_PARTITION, | ||
KafkaHeaders.TIMESTAMP_TYPE, | ||
KafkaHeaders.TIMESTAMP, | ||
KafkaHeaders.RECEIVED_TIMESTAMP, | ||
KafkaHeaders.NATIVE_HEADERS, | ||
KafkaHeaders.BATCH_CONVERTED_HEADERS, | ||
KafkaHeaders.CORRELATION_ID, | ||
KafkaHeaders.REPLY_TOPIC, | ||
KafkaHeaders.REPLY_PARTITION, | ||
KafkaHeaders.DLT_EXCEPTION_FQCN, | ||
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN, | ||
KafkaHeaders.DLT_EXCEPTION_STACKTRACE, | ||
KafkaHeaders.DLT_EXCEPTION_MESSAGE, | ||
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE, | ||
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE, | ||
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN, | ||
KafkaHeaders.DLT_ORIGINAL_TOPIC, | ||
KafkaHeaders.DLT_ORIGINAL_PARTITION, | ||
KafkaHeaders.DLT_ORIGINAL_OFFSET, | ||
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP, | ||
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, | ||
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, | ||
KafkaHeaders.GROUP_ID, | ||
KafkaHeaders.DELIVERY_ATTEMPT, | ||
KafkaHeaders.EXCEPTION_FQCN, | ||
KafkaHeaders.EXCEPTION_CAUSE_FQCN, | ||
KafkaHeaders.EXCEPTION_STACKTRACE, | ||
KafkaHeaders.EXCEPTION_MESSAGE, | ||
KafkaHeaders.KEY_EXCEPTION_STACKTRACE, | ||
KafkaHeaders.KEY_EXCEPTION_MESSAGE, | ||
KafkaHeaders.KEY_EXCEPTION_FQCN, | ||
KafkaHeaders.ORIGINAL_TOPIC, | ||
KafkaHeaders.ORIGINAL_PARTITION, | ||
KafkaHeaders.ORIGINAL_OFFSET, | ||
KafkaHeaders.ORIGINAL_TIMESTAMP, | ||
KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE, | ||
KafkaHeaders.CONVERSION_FAILURES, | ||
KafkaHeaders.LISTENER_INFO, | ||
RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, | ||
RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, | ||
RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP); | ||
|
||
private final Set<String> singleValueHeaders = new HashSet<>(this.defaultSingleValueHeaderList); | ||
|
||
/** | ||
* Adds headers that the {@link MultiValueKafkaHeaderMapper} should handle as single values. | ||
* @param headerName the header name. | ||
*/ | ||
public void addSingleValueHeader(String headerName) { | ||
this.singleValueHeaders.add(headerName); | ||
} | ||
|
||
@Override | ||
protected void handleHeader(String headerName, Header header, Map<String, Object> headers) { | ||
if (this.singleValueHeaders.contains(headerName)) { | ||
headers.put(headerName, headerValueToAddIn(header)); | ||
} | ||
else { | ||
Object values = headers.getOrDefault(headerName, new ArrayList<>()); | ||
|
||
if (values instanceof List) { | ||
@SuppressWarnings("unchecked") | ||
List<Object> castedValues = (List<Object>) values; | ||
castedValues.add(headerValueToAddIn(header)); | ||
headers.put(headerName, castedValues); | ||
} | ||
else { | ||
headers.put(headerName, headerValueToAddIn(header)); | ||
} | ||
|
||
} | ||
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mapping from Kafka headers, So, I believe this name is better in this context:
fromUserHeader
.Since standard headers have been already mapped before.