Skip to content

Commit a6616ba

Browse files
garyrussellartembilan
authored andcommitted
GH-1894: DLPR: Fix Headers with Only Key Exception
Resolves #1894 When there was only a key deserialization exception, the exception info was added twice, under the key keys and value keys. **cherry-pick to 2.7.x**
1 parent 9ad308d commit a6616ba

File tree

2 files changed

+58
-12
lines changed

2 files changed

+58
-12
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -315,21 +315,35 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
315315
DeserializationException kDeserEx = ListenerUtils.getExceptionFromHeader(record,
316316
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, this.logger);
317317
Headers headers = new RecordHeaders(record.headers().toArray());
318-
if (kDeserEx != null && !this.retainExceptionHeader) {
319-
headers.remove(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER);
320-
addExceptionInfoHeaders(headers, kDeserEx, true);
321-
}
322-
if (vDeserEx != null && !this.retainExceptionHeader) {
323-
headers.remove(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
324-
}
325-
enhanceHeaders(headers, record, exception); // NOSONAR headers are never null
318+
addAndEnhanceHeaders(record, exception, vDeserEx, kDeserEx, headers);
326319
ProducerRecord<Object, Object> outRecord = createProducerRecord(record, tp, headers,
327320
kDeserEx == null ? null : kDeserEx.getData(), vDeserEx == null ? null : vDeserEx.getData());
328321
KafkaOperations<Object, Object> kafkaTemplate =
329322
(KafkaOperations<Object, Object>) this.templateResolver.apply(outRecord);
330323
sendOrThrow(outRecord, kafkaTemplate);
331324
}
332325

326+
private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception exception,
327+
@Nullable DeserializationException vDeserEx, @Nullable DeserializationException kDeserEx, Headers headers) {
328+
329+
if (kDeserEx != null) {
330+
if (!this.retainExceptionHeader) {
331+
headers.remove(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER);
332+
}
333+
addExceptionInfoHeaders(headers, kDeserEx, true);
334+
}
335+
if (vDeserEx != null) {
336+
if (!this.retainExceptionHeader) {
337+
headers.remove(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER);
338+
}
339+
addExceptionInfoHeaders(headers, vDeserEx, false);
340+
}
341+
if (kDeserEx == null && vDeserEx == null) {
342+
addExceptionInfoHeaders(headers, exception, false);
343+
}
344+
enhanceHeaders(headers, record, exception); // NOSONAR headers are never null
345+
}
346+
333347
private void sendOrThrow(ProducerRecord<Object, Object> outRecord,
334348
@Nullable KafkaOperations<Object, Object> kafkaTemplate) {
335349

@@ -501,7 +515,6 @@ private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
501515

502516
private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception exception) {
503517
maybeAddOriginalHeaders(kafkaHeaders, record);
504-
addExceptionInfoHeaders(kafkaHeaders, exception, false);
505518
Headers headers = this.headersFunction.apply(record, exception);
506519
if (headers != null) {
507520
headers.forEach(kafkaHeaders::add);

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,29 @@ void keyHeaderStripped() {
205205
assertThat(headers.lastHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
206206
}
207207

208+
@SuppressWarnings({ "rawtypes", "unchecked" })
209+
@Test
210+
void keyDeserOnly() {
211+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
212+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
213+
Headers headers = new RecordHeaders();
214+
DeserializationException deserEx = createDeserEx(true);
215+
headers.add(
216+
new RecordHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER, header(true, deserEx)));
217+
SettableListenableFuture future = new SettableListenableFuture();
218+
future.set(new Object());
219+
willReturn(future).given(template).send(any(ProducerRecord.class));
220+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.CREATE_TIME,
221+
0L, 0, 0, "bar", "baz", headers);
222+
recoverer.accept(record, deserEx);
223+
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
224+
verify(template).send(captor.capture());
225+
headers = captor.getValue().headers();
226+
assertThat(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value()).isEqualTo("testK".getBytes());
227+
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_FQCN)).isNull();
228+
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)).isNull();
229+
}
230+
208231
@SuppressWarnings({ "rawtypes", "unchecked" })
209232
@Test
210233
void headersNotStripped() {
@@ -225,6 +248,8 @@ void headersNotStripped() {
225248
headers = captor.getValue().headers();
226249
assertThat(headers.lastHeader(ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
227250
assertThat(headers.lastHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
251+
assertThat(headers.lastHeader(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE).value()).isEqualTo("testK".getBytes());
252+
assertThat(headers.lastHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE).value()).isEqualTo("testV".getBytes());
228253
}
229254

230255
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -262,11 +287,19 @@ void tombstoneWithMultiTemplatesExplicit() {
262287
}
263288

264289
private byte[] header(boolean isKey) {
290+
return header(isKey, createDeserEx(isKey));
291+
}
292+
293+
private DeserializationException createDeserEx(boolean isKey) {
294+
return new DeserializationException(
295+
isKey ? "testK" : "testV",
296+
isKey ? "key".getBytes() : "value".getBytes(), isKey, null);
297+
}
298+
299+
private byte[] header(boolean isKey, DeserializationException deserEx) {
265300
ByteArrayOutputStream baos = new ByteArrayOutputStream();
266301
try {
267-
new ObjectOutputStream(baos).writeObject(new DeserializationException(
268-
isKey ? "testK" : "testV",
269-
isKey ? "key".getBytes() : "value".getBytes(), isKey, null));
302+
new ObjectOutputStream(baos).writeObject(deserEx);
270303
}
271304
catch (IOException e) {
272305
throw new UncheckedIOException(e);

0 commit comments

Comments
 (0)