Skip to content

Commit 6f63a57

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-2118: Make DLPR skip same topic fatal ex optional
Fixes GH-2118 The logic introduced to stop endless loops in DLR processing for fatal exceptions interferes with RT's single-topic strategy. Make it optional and set the default to false in DLPRF for RT. The loop is now addressed in DefaultDestinationTopicResolver as in GH-2113.
1 parent d052f15 commit 6f63a57

File tree

4 files changed

+46
-8
lines changed

4 files changed

+46
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement
103103

104104
private boolean stripPreviousExceptionHeaders = true;
105105

106+
private boolean skipSameTopicFatalExceptions = true;
107+
106108
/**
107109
* Create an instance with the provided template and a default destination resolving
108110
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
@@ -314,6 +316,16 @@ public void setStripPreviousExceptionHeaders(boolean stripPreviousExceptionHeade
314316
this.stripPreviousExceptionHeaders = stripPreviousExceptionHeaders;
315317
}
316318

319+
/**
320+
* Set to false if you want to forward the record to the same topic even though
321+
* the exception is fatal by this class' classification, e.g. to handle this scenario
322+
* in a different layer.
323+
* @param skipSameTopicFatalExceptions false to forward in this scenario.
324+
*/
325+
public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions) {
326+
this.skipSameTopicFatalExceptions = skipSameTopicFatalExceptions;
327+
}
328+
317329
@SuppressWarnings("unchecked")
318330
@Override
319331
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
@@ -324,7 +336,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
324336
+ " skipped because destination resolver returned null");
325337
return;
326338
}
327-
if (tp.topic().equals(record.topic()) && !getClassifier().classify(exception)) {
339+
if (this.skipSameTopicFatalExceptions
340+
&& tp.topic().equals(record.topic())
341+
&& !getClassifier().classify(exception)) {
328342
this.logger.error("Recovery of " + ListenerUtils.recordToString(record, true)
329343
+ " skipped because not retryable exception " + exception.toString()
330344
+ " and the destination resolver routed back to the same topic");

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -140,9 +140,10 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
140140
recoverer.setFailIfSendResultIsError(true);
141141
recoverer.setAppendOriginalHeaders(false);
142142
recoverer.setThrowIfNoDestinationReturned(false);
143+
recoverer.setSkipSameTopicFatalExceptions(false);
143144
this.recovererCustomizer.accept(recoverer);
144-
this.fatalExceptions.forEach(ex -> recoverer.addNotRetryableExceptions(ex));
145-
this.nonFatalExceptions.forEach(ex -> recoverer.removeNotRetryableException(ex));
145+
this.fatalExceptions.forEach(recoverer::addNotRetryableExceptions);
146+
this.nonFatalExceptions.forEach(recoverer::removeNotRetryableException);
146147
return recoverer;
147148
}
148149

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -565,4 +565,25 @@ void noCircularRoutingIfFatal() {
565565
verify(template).send(any(ProducerRecord.class));
566566
}
567567

568+
@SuppressWarnings("unchecked")
569+
@Test
570+
void doNotSkipCircularFatalIfSet() {
571+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
572+
ListenableFuture<Object> future = mock(ListenableFuture.class);
573+
given(template.send(any(ProducerRecord.class))).willReturn(future);
574+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
575+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
576+
(cr, e) -> new TopicPartition("foo", 0));
577+
recoverer.setSkipSameTopicFatalExceptions(false);
578+
recoverer.accept(record, new ClassCastException());
579+
verify(template).send(any(ProducerRecord.class));
580+
recoverer.addNotRetryableExceptions(IllegalStateException.class);
581+
recoverer.accept(record, new IllegalStateException());
582+
verify(template, times(2)).send(any(ProducerRecord.class));
583+
recoverer.removeNotRetryableException(IllegalStateException.class);
584+
recoverer.setFailIfSendResultIsError(false);
585+
recoverer.accept(record, new IllegalStateException());
586+
verify(template, times(3)).send(any(ProducerRecord.class));
587+
}
588+
568589
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -306,14 +306,16 @@ void shouldCallDLPRCustomizer() {
306306
}
307307

308308
@Test
309-
void shouldNotSendMessageIfCircularFatal() {
309+
void shouldSendMessageEvenIfCircularFatal() {
310310
// setup
311311
TimestampedException e = new TimestampedException(new IllegalStateException(), this.clock);
312312
long failureTimestamp = e.getTimestamp();
313313
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, failureTimestamp))
314314
.willReturn(destinationTopic);
315315
given(destinationTopic.isNoOpsTopic()).willReturn(false);
316316
given(destinationTopic.getDestinationName()).willReturn(testTopic);
317+
given(this.destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
318+
willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations();
317319
this.consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampBytes);
318320

319321
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(
@@ -326,7 +328,7 @@ void shouldNotSendMessageIfCircularFatal() {
326328
deadLetterPublishingRecoverer.accept(this.consumerRecord, e);
327329

328330
// then
329-
then(kafkaOperations).should(times(0)).send(any(ProducerRecord.class));
331+
then(kafkaOperations).should(times(1)).send(any(ProducerRecord.class));
330332
}
331333

332334
}

0 commit comments

Comments
 (0)