Skip to content

Commit a7690ab

Browse files
committed
spring-projectsGH-3114: DeserializationException propagation
Fixes: spring-projects#3114 * Since DeserializationExceptionHeader is currently porpagated as a `byte[]`, it encounters some issues when processing the header especially in batch listeners. Fixing this by providing the deserialization header without `byte[]` conversion * Adding test to verify * Refactoring in SerializationTestUtils
1 parent ab5f0a1 commit a7690ab

File tree

4 files changed

+82
-35
lines changed

4 files changed

+82
-35
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.kafka.common.header.Headers;
3232
import org.apache.kafka.common.header.internals.RecordHeader;
3333

34+
import org.springframework.kafka.support.serializer.SerializationUtils;
3435
import org.springframework.messaging.MessageHeaders;
3536
import org.springframework.util.Assert;
3637
import org.springframework.util.ClassUtils;
@@ -48,6 +49,7 @@
4849
*
4950
* @author Gary Russell
5051
* @author Artem Bilan
52+
* @author Soby Chacko
5153
*
5254
* @since 1.3
5355
*
@@ -314,6 +316,11 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
314316
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
315317
headers.put(headerName, new String(header.value(), getCharset()));
316318
}
319+
else if ((headerName.equals(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
320+
headerName.equals(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER))
321+
&& matchesForInbound(headerName)) {
322+
headers.put(headerName, header);
323+
}
317324
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
318325
if (jsonTypes.containsKey(headerName)) {
319326
String requestedType = jsonTypes.get(headerName);

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,10 +33,6 @@
3333
import static org.mockito.Mockito.times;
3434
import static org.mockito.Mockito.verify;
3535

36-
import java.io.ByteArrayOutputStream;
37-
import java.io.IOException;
38-
import java.io.ObjectOutputStream;
39-
import java.io.UncheckedIOException;
4036
import java.time.Duration;
4137
import java.util.Collections;
4238
import java.util.HashMap;
@@ -80,6 +76,7 @@
8076
/**
8177
* @author Gary Russell
8278
* @author Tomaz Fernandes
79+
* @author Soby Chacko
8380
* @since 2.4.3
8481
*
8582
*/
@@ -174,9 +171,9 @@ void valueHeaderStripped() {
174171
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
175172
Headers headers = new RecordHeaders();
176173
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
177-
header(false)));
174+
SerializationTestUtils.header(false)));
178175
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
179-
header(true)));
176+
SerializationTestUtils.header(true)));
180177
Headers custom = new RecordHeaders();
181178
custom.add(new RecordHeader("foo", "bar".getBytes()));
182179
recoverer.setHeadersFunction((rec, ex) -> custom);
@@ -206,7 +203,7 @@ void keyHeaderStripped() {
206203
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
207204
Headers headers = new RecordHeaders();
208205
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
209-
header(true)));
206+
SerializationTestUtils.header(true)));
210207
CompletableFuture future = new CompletableFuture();
211208
future.complete(new Object());
212209
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -225,9 +222,9 @@ void keyDeserOnly() {
225222
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
226223
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
227224
Headers headers = new RecordHeaders();
228-
DeserializationException deserEx = createDeserEx(true);
225+
DeserializationException deserEx = SerializationTestUtils.createDeserEx(true);
229226
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
230-
header(true, deserEx)));
227+
SerializationTestUtils.header(deserEx)));
231228
CompletableFuture future = new CompletableFuture();
232229
future.complete(new Object());
233230
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -250,9 +247,9 @@ void headersNotStripped() {
250247
recoverer.setRetainExceptionHeader(true);
251248
Headers headers = new RecordHeaders();
252249
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
253-
header(false)));
250+
SerializationTestUtils.header(false)));
254251
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
255-
header(true)));
252+
SerializationTestUtils.header(true)));
256253
CompletableFuture future = new CompletableFuture();
257254
future.complete(new Object());
258255
willReturn(future).given(template).send(any(ProducerRecord.class));
@@ -302,27 +299,6 @@ void tombstoneWithMultiTemplatesExplicit() {
302299
verify(template2).send(any(ProducerRecord.class));
303300
}
304301

305-
private byte[] header(boolean isKey) {
306-
return header(isKey, createDeserEx(isKey));
307-
}
308-
309-
private DeserializationException createDeserEx(boolean isKey) {
310-
return new DeserializationException(
311-
isKey ? "testK" : "testV",
312-
isKey ? "key".getBytes() : "value".getBytes(), isKey, null);
313-
}
314-
315-
private byte[] header(boolean isKey, DeserializationException deserEx) {
316-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
317-
try {
318-
new ObjectOutputStream(baos).writeObject(deserEx);
319-
}
320-
catch (IOException e) {
321-
throw new UncheckedIOException(e);
322-
}
323-
return baos.toByteArray();
324-
}
325-
326302
@SuppressWarnings({"unchecked", "rawtypes"})
327303
@Test
328304
void allOriginalHeaders() {

spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,7 +34,11 @@
3434
import org.apache.kafka.common.header.internals.RecordHeaders;
3535
import org.junit.jupiter.api.Test;
3636

37+
import org.springframework.core.log.LogAccessor;
3738
import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType;
39+
import org.springframework.kafka.support.serializer.DeserializationException;
40+
import org.springframework.kafka.support.serializer.SerializationTestUtils;
41+
import org.springframework.kafka.support.serializer.SerializationUtils;
3842
import org.springframework.messaging.Message;
3943
import org.springframework.messaging.MessageHeaders;
4044
import org.springframework.messaging.support.ExecutorSubscribableChannel;
@@ -46,6 +50,7 @@
4650
/**
4751
* @author Gary Russell
4852
* @author Artem Bilan
53+
* @author Soby Chacko
4954
*
5055
* @since 1.3
5156
*
@@ -321,6 +326,38 @@ void inboundJson() {
321326
.containsKey("baz");
322327
}
323328

329+
@Test
330+
void deserializationExceptionHeadersAreMappedAsNonByteArray() {
331+
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
332+
333+
byte[] keyDeserExceptionBytes = SerializationTestUtils.header(true);
334+
Header keyHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
335+
keyDeserExceptionBytes);
336+
byte[] valueDeserExceptionBytes = SerializationTestUtils.header(false);
337+
Header valueHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
338+
valueDeserExceptionBytes);
339+
Headers headers = new RecordHeaders(
340+
new Header[] { keyHeader, valueHeader });
341+
Map<String, Object> springHeaders = new HashMap<>();
342+
mapper.toHeaders(headers, springHeaders);
343+
assertThat(springHeaders.get(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(keyHeader);
344+
assertThat(springHeaders.get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(valueHeader);
345+
346+
LogAccessor logger = new LogAccessor(this.getClass());
347+
348+
DeserializationException keyDeserializationException = SerializationUtils.byteArrayToDeserializationException(logger, keyHeader);
349+
assertThat(keyDeserExceptionBytes).containsExactly(SerializationTestUtils.header(keyDeserializationException));
350+
351+
DeserializationException valueDeserializationException =
352+
SerializationUtils.byteArrayToDeserializationException(logger, valueHeader);
353+
assertThat(valueDeserExceptionBytes).containsExactly(SerializationTestUtils.header(valueDeserializationException));
354+
355+
headers = new RecordHeaders();
356+
mapper.fromHeaders(new MessageHeaders(springHeaders), headers);
357+
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
358+
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
359+
}
360+
324361
public static final class Foo {
325362

326363
private String bar = "bar";

spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,16 @@
1616

1717
package org.springframework.kafka.support.serializer;
1818

19+
import java.io.ByteArrayOutputStream;
20+
import java.io.IOException;
21+
import java.io.ObjectOutputStream;
22+
import java.io.UncheckedIOException;
23+
1924
import org.apache.kafka.common.header.Header;
2025

2126
/**
2227
* @author Gary Russell
28+
* @author Soby Chacko
2329
* @since 2.9.11
2430
*
2531
*/
@@ -32,4 +38,25 @@ public static Header deserializationHeader(String key, byte[] value) {
3238
return new DeserializationExceptionHeader(key, value);
3339
}
3440

41+
public static byte[] header(boolean isKey) {
42+
return header(createDeserEx(isKey));
43+
}
44+
45+
public static DeserializationException createDeserEx(boolean isKey) {
46+
return new DeserializationException(
47+
isKey ? "testK" : "testV",
48+
isKey ? "key".getBytes() : "value".getBytes(), isKey, null);
49+
}
50+
51+
public static byte[] header(DeserializationException deserEx) {
52+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
53+
try {
54+
new ObjectOutputStream(baos).writeObject(deserEx);
55+
}
56+
catch (IOException e) {
57+
throw new UncheckedIOException(e);
58+
}
59+
return baos.toByteArray();
60+
}
61+
3562
}

0 commit comments

Comments
 (0)