Skip to content

GH-3114: DeserializationException propagation #3115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 14, 2024

Conversation

sobychacko
Copy link
Contributor

Fixes: #3114

  • Since DeserializationExceptionHeader is currently porpagated as a byte[], it encounters some issues when processing the header especially in batch listeners. Fixing this by providing the deserialization header without byte[] conversion
  • Adding test to verify
  • Refactoring in SerializationTestUtils

Fixes: spring-projects#3114

* Since DeserializationExceptionHeader is currently porpagated as a `byte[]`,
  it encounters some issues when processing the header especially in batch
  listeners. Fixing this by providing the deserialization header without `byte[]` conversion
* Adding test to verify
* Refactoring in SerializationTestUtils
@sobychacko sobychacko requested a review from artembilan March 12, 2024 20:19
@@ -31,6 +31,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.kafka.support.serializer.SerializationUtils;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be sure that there is no package tangle . Usually sub-packages uses their parents.

@@ -314,6 +316,11 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
headers.put(headerName, new String(header.value(), getCharset()));
}
else if ((headerName.equals(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
headerName.equals(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER))
&& matchesForInbound(headerName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this header has to be mapped unconditionally. It is not some what end-user provides.

*
* @since 2.2
*
*/
public final class KafkaUtils {

/**
* Header name for deserialization exceptions.
* @since 3.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the GH issue mandate this fix is supposed to be back-ported down to 3.0.x

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Will merge and cherry-pick when build is green.

@artembilan artembilan merged commit 7cc7fc8 into spring-projects:main Mar 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Failing to handle deserialization exceptions in batch listener
2 participants