-
Notifications
You must be signed in to change notification settings - Fork 1.6k
GH-3114: DeserializationException propagation #3115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fixes: spring-projects#3114 * Since DeserializationExceptionHeader is currently porpagated as a `byte[]`, it encounters some issues when processing the header especially in batch listeners. Fixing this by providing the deserialization header without `byte[]` conversion * Adding test to verify * Refactoring in SerializationTestUtils
@@ -31,6 +31,7 @@ | |||
import org.apache.kafka.common.header.Headers; | |||
import org.apache.kafka.common.header.internals.RecordHeader; | |||
|
|||
import org.springframework.kafka.support.serializer.SerializationUtils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be sure that there is no package tangle . Usually sub-packages uses their parents.
@@ -314,6 +316,11 @@ public void toHeaders(Headers source, final Map<String, Object> headers) { | |||
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) { | |||
headers.put(headerName, new String(header.value(), getCharset())); | |||
} | |||
else if ((headerName.equals(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) || | |||
headerName.equals(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) | |||
&& matchesForInbound(headerName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this header has to be mapped unconditionally. It is not some what end-user provides.
* | ||
* @since 2.2 | ||
* | ||
*/ | ||
public final class KafkaUtils { | ||
|
||
/** | ||
* Header name for deserialization exceptions. | ||
* @since 3.2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the GH issue mandate this fix is supposed to be back-ported down to 3.0.x
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Will merge and cherry-pick when build is green.
Fixes: #3114
byte[]
, it encounters some issues when processing the header especially in batch listeners. Fixing this by providing the deserialization header withoutbyte[]
conversion