Skip to content

Commit 87a577e

Browse files
sobychackospring-builds
authored andcommitted
GH-3114: DeserializationException propagation
Fixes: #3114 * Since DeserializationExceptionHeader is currently porpagated as a `byte[]`, it encounters some issues when processing the header especially in batch listeners. Fixing this by providing the deserialization header without `byte[]` conversion * Adding test to verify * Refactoring in SerializationTestUtils (cherry picked from commit 7cc7fc8)
1 parent e4d9994 commit 87a577e

File tree

6 files changed

+105
-40
lines changed

6 files changed

+105
-40
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
*
5656
* @author Gary Russell
5757
* @author Artem Bilan
58+
* @author Soby Chacko
5859
*
5960
* @since 1.3
6061
*
@@ -323,6 +324,10 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
323324
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
324325
headers.put(headerName, new String(header.value(), getCharset()));
325326
}
327+
else if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
328+
headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) {
329+
headers.put(headerName, header);
330+
}
326331
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
327332
if (jsonTypes.containsKey(headerName)) {
328333
String requestedType = jsonTypes.get(headerName);

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,12 +38,32 @@
3838
*
3939
* @author Gary Russell
4040
* @author Wang ZhiYang
41+
* @author Soby Chacko
4142
*
4243
* @since 2.2
4344
*
4445
*/
4546
public final class KafkaUtils {
4647

48+
/**
49+
* Header name for deserialization exceptions.
50+
* @since 3.0.15
51+
*/
52+
public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = "springDeserializerException";
53+
54+
/**
55+
* Header name for deserialization exceptions.
56+
* @since 3.0.15
57+
*/
58+
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Key";
59+
60+
/**
61+
* Header name for deserialization exceptions.
62+
* @since 3.0.15
63+
*/
64+
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Value";
65+
66+
4767
private static Function<ProducerRecord<?, ?>, String> prFormatter = ProducerRecord::toString;
4868

4969
private static Function<ConsumerRecord<?, ?>, String> crFormatter =

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,19 +50,19 @@ public final class SerializationUtils {
5050
* Header name for deserialization exceptions.
5151
* @since 2.8
5252
*/
53-
public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = "springDeserializerException";
53+
public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = KafkaUtils.DESERIALIZER_EXCEPTION_HEADER_PREFIX;
5454

5555
/**
5656
* Header name for deserialization exceptions.
5757
* @since 2.8
5858
*/
59-
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Key";
59+
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER;
6060

6161
/**
6262
* Header name for deserialization exceptions.
6363
* @since 2.8
6464
*/
65-
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Value";
65+
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER;
6666

6767
private SerializationUtils() {
6868
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,10 +33,6 @@
3333
import static org.mockito.Mockito.times;
3434
import static org.mockito.Mockito.verify;
3535

36-
import java.io.ByteArrayOutputStream;
37-
import java.io.IOException;
38-
import java.io.ObjectOutputStream;
39-
import java.io.UncheckedIOException;
4036
import java.time.Duration;
4137
import java.util.Collections;
4238
import java.util.HashMap;
@@ -80,6 +76,7 @@
8076
/**
8177
* @author Gary Russell
8278
* @author Tomaz Fernandes
79+
* @author Soby Chacko
8380
* @since 2.4.3
8481
*
8582
*/
@@ -174,9 +171,9 @@ void valueHeaderStripped() {
174171
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
175172
Headers headers = new RecordHeaders();
176173
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
177-
header(false)));
174+
SerializationTestUtils.header(false)));
178175
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
179-
header(true)));
176+
SerializationTestUtils.header(true)));
180177
Headers custom = new RecordHeaders();
181178
custom.add(new RecordHeader("foo", "bar".getBytes()));
182179
recoverer.setHeadersFunction((rec, ex) -> custom);
@@ -206,7 +203,7 @@ void keyHeaderStripped() {
206203
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
207204
Headers headers = new RecordHeaders();
208205
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
209-
header(true)));
206+
SerializationTestUtils.header(true)));
210207
CompletableFuture future = new CompletableFuture();
211208
future.complete(new Object());
212209
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -225,9 +222,9 @@ void keyDeserOnly() {
225222
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
226223
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
227224
Headers headers = new RecordHeaders();
228-
DeserializationException deserEx = createDeserEx(true);
225+
DeserializationException deserEx = SerializationTestUtils.createDeserEx(true);
229226
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
230-
header(true, deserEx)));
227+
SerializationTestUtils.header(deserEx)));
231228
CompletableFuture future = new CompletableFuture();
232229
future.complete(new Object());
233230
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -250,9 +247,9 @@ void headersNotStripped() {
250247
recoverer.setRetainExceptionHeader(true);
251248
Headers headers = new RecordHeaders();
252249
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
253-
header(false)));
250+
SerializationTestUtils.header(false)));
254251
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
255-
header(true)));
252+
SerializationTestUtils.header(true)));
256253
CompletableFuture future = new CompletableFuture();
257254
future.complete(new Object());
258255
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -302,27 +299,6 @@ void tombstoneWithMultiTemplatesExplicit() {
302299
verify(template2).send(any(ProducerRecord.class));
303300
}
304301

305-
private byte[] header(boolean isKey) {
306-
return header(isKey, createDeserEx(isKey));
307-
}
308-
309-
private DeserializationException createDeserEx(boolean isKey) {
310-
return new DeserializationException(
311-
isKey ? "testK" : "testV",
312-
isKey ? "key".getBytes() : "value".getBytes(), isKey, null);
313-
}
314-
315-
private byte[] header(boolean isKey, DeserializationException deserEx) {
316-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
317-
try {
318-
new ObjectOutputStream(baos).writeObject(deserEx);
319-
}
320-
catch (IOException e) {
321-
throw new UncheckedIOException(e);
322-
}
323-
return baos.toByteArray();
324-
}
325-
326302
@SuppressWarnings({"unchecked", "rawtypes"})
327303
@Test
328304
void allOriginalHeaders() {

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,7 +34,11 @@
3434
import org.apache.kafka.common.header.internals.RecordHeaders;
3535
import org.junit.jupiter.api.Test;
3636

37+
import org.springframework.core.log.LogAccessor;
3738
import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType;
39+
import org.springframework.kafka.support.serializer.DeserializationException;
40+
import org.springframework.kafka.support.serializer.SerializationTestUtils;
41+
import org.springframework.kafka.support.serializer.SerializationUtils;
3842
import org.springframework.messaging.Message;
3943
import org.springframework.messaging.MessageHeaders;
4044
import org.springframework.messaging.support.ExecutorSubscribableChannel;
@@ -46,6 +50,7 @@
4650
/**
4751
* @author Gary Russell
4852
* @author Artem Bilan
53+
* @author Soby Chacko
4954
*
5055
* @since 1.3
5156
*
@@ -321,6 +326,38 @@ void inboundJson() {
321326
.containsKey("baz");
322327
}
323328

329+
@Test
330+
void deserializationExceptionHeadersAreMappedAsNonByteArray() {
331+
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
332+
333+
byte[] keyDeserExceptionBytes = SerializationTestUtils.header(true);
334+
Header keyHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
335+
keyDeserExceptionBytes);
336+
byte[] valueDeserExceptionBytes = SerializationTestUtils.header(false);
337+
Header valueHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
338+
valueDeserExceptionBytes);
339+
Headers headers = new RecordHeaders(
340+
new Header[] { keyHeader, valueHeader });
341+
Map<String, Object> springHeaders = new HashMap<>();
342+
mapper.toHeaders(headers, springHeaders);
343+
assertThat(springHeaders.get(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(keyHeader);
344+
assertThat(springHeaders.get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(valueHeader);
345+
346+
LogAccessor logger = new LogAccessor(this.getClass());
347+
348+
DeserializationException keyDeserializationException = SerializationUtils.byteArrayToDeserializationException(logger, keyHeader);
349+
assertThat(keyDeserExceptionBytes).containsExactly(SerializationTestUtils.header(keyDeserializationException));
350+
351+
DeserializationException valueDeserializationException =
352+
SerializationUtils.byteArrayToDeserializationException(logger, valueHeader);
353+
assertThat(valueDeserExceptionBytes).containsExactly(SerializationTestUtils.header(valueDeserializationException));
354+
355+
headers = new RecordHeaders();
356+
mapper.fromHeaders(new MessageHeaders(springHeaders), headers);
357+
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
358+
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
359+
}
360+
324361
public static final class Foo {
325362

326363
private String bar = "bar";

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,16 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.io.ByteArrayOutputStream;
20+
import java.io.IOException;
21+
import java.io.ObjectOutputStream;
22+
import java.io.UncheckedIOException;
23+
1924
import org.apache.kafka.common.header.Header;
2025

2126
/**
2227
* @author Gary Russell
28+
* @author Soby Chacko
2329
* @since 2.9.11
2430
*
2531
*/
@@ -32,4 +38,25 @@ public static Header deserializationHeader(String key, byte[] value) {
3238
return new DeserializationExceptionHeader(key, value);
3339
}
3440

41+
public static byte[] header(boolean isKey) {
42+
return header(createDeserEx(isKey));
43+
}
44+
45+
public static DeserializationException createDeserEx(boolean isKey) {
46+
return new DeserializationException(
47+
isKey ? "testK" : "testV",
48+
isKey ? "key".getBytes() : "value".getBytes(), isKey, null);
49+
}
50+
51+
public static byte[] header(DeserializationException deserEx) {
52+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
53+
try {
54+
new ObjectOutputStream(baos).writeObject(deserEx);
55+
}
56+
catch (IOException e) {
57+
throw new UncheckedIOException(e);
58+
}
59+
return baos.toByteArray();
60+
}
61+
3562
}

0 commit comments

Comments
 (0)