Skip to content

Commit d052f15

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-2113: Add ex classification to DDTopicResolver
Resolves GH-2113 Adds exception classification to DefaultDestinationTopicResolver so that fatal exceptions makes records go straight to the DLT. Also stop DLT processing if such an exception is found (see GH-2118).
1 parent f51ddd4 commit d052f15

File tree

4 files changed

+74
-26
lines changed

4 files changed

+74
-26
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,28 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> t
306306

307307
NOTE: The default behavior is retrying on all exceptions and not traversing causes.
308308

309+
Since 2.8.3 there's a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
310+
See <<default-eh, DefaultErrorHandler>> for the default list of fatal exceptions.
311+
You can add or remove exceptions with:
312+
313+
====
314+
[source, java]
315+
----
316+
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
317+
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
318+
@Qualifier(RetryTopicInternalBeanNames
319+
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
320+
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
321+
ddtr.addNotRetryableExceptions(MyFatalException.class);
322+
ddtr.removeNotRetryableException(ConversionException.class);
323+
return ddtr;
324+
}
325+
----
326+
====
327+
328+
NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.
329+
330+
309331
===== Include and Exclude Topics
310332

311333
You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.context.ApplicationContext;
3030
import org.springframework.context.ApplicationListener;
3131
import org.springframework.context.event.ContextRefreshedEvent;
32+
import org.springframework.kafka.listener.ExceptionClassifier;
3233
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3334
import org.springframework.kafka.listener.TimestampedException;
3435

@@ -45,7 +46,8 @@
4546
* @since 2.7
4647
*
4748
*/
48-
public class DefaultDestinationTopicResolver implements DestinationTopicResolver, ApplicationListener<ContextRefreshedEvent> {
49+
public class DefaultDestinationTopicResolver extends ExceptionClassifier
50+
implements DestinationTopicResolver, ApplicationListener<ContextRefreshedEvent> {
4951

5052
private static final String NO_OPS_SUFFIX = "-noOps";
5153

@@ -75,13 +77,18 @@ public DestinationTopic resolveDestinationTopic(String topic, Integer attempt, E
7577
long originalTimestamp) {
7678
DestinationTopicHolder destinationTopicHolder = getDestinationHolderFor(topic);
7779
return destinationTopicHolder.getSourceDestination().isDltTopic()
78-
? handleDltProcessingFailure(destinationTopicHolder)
80+
? handleDltProcessingFailure(destinationTopicHolder, e)
7981
: destinationTopicHolder.getSourceDestination().shouldRetryOn(attempt, maybeUnwrapException(e))
82+
&& isNotFatalException(e)
8083
&& !isPastTimout(originalTimestamp, destinationTopicHolder)
8184
? resolveRetryDestination(destinationTopicHolder)
8285
: resolveDltOrNoOpsDestination(topic);
8386
}
8487

88+
private Boolean isNotFatalException(Exception e) {
89+
return getClassifier().classify(e);
90+
}
91+
8592
private Throwable maybeUnwrapException(Throwable e) {
8693
return FRAMEWORK_EXCEPTIONS
8794
.stream()
@@ -97,10 +104,11 @@ private boolean isPastTimout(long originalTimestamp, DestinationTopicHolder dest
97104
Instant.now(this.clock).toEpochMilli() > originalTimestamp + timeout;
98105
}
99106

100-
private DestinationTopic handleDltProcessingFailure(DestinationTopicHolder destinationTopicHolder) {
107+
private DestinationTopic handleDltProcessingFailure(DestinationTopicHolder destinationTopicHolder, Exception e) {
101108
return destinationTopicHolder.getSourceDestination().isAlwaysRetryOnDltFailure()
102-
? destinationTopicHolder.getSourceDestination()
103-
: destinationTopicHolder.getNextDestination();
109+
&& isNotFatalException(e)
110+
? destinationTopicHolder.getSourceDestination()
111+
: destinationTopicHolder.getNextDestination();
104112
}
105113

106114
private DestinationTopic resolveRetryDestination(DestinationTopicHolder destinationTopicHolder) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolverTests.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,11 +20,9 @@
2020
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2121
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
2222

23-
import java.math.BigInteger;
2423
import java.time.Clock;
2524
import java.time.Instant;
2625
import java.util.Collections;
27-
import java.util.Map;
2826

2927
import org.junit.jupiter.api.BeforeEach;
3028
import org.junit.jupiter.api.Test;
@@ -36,6 +34,7 @@
3634
import org.springframework.context.event.ContextRefreshedEvent;
3735
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3836
import org.springframework.kafka.listener.TimestampedException;
37+
import org.springframework.kafka.support.converter.ConversionException;
3938

4039
/**
4140
* @author Tomaz Fernandes
@@ -45,8 +44,6 @@
4544
@ExtendWith(MockitoExtension.class)
4645
class DefaultDestinationTopicResolverTests extends DestinationTopicTests {
4746

48-
private Map<String, DefaultDestinationTopicResolver.DestinationTopicHolder> destinationTopicMap;
49-
5047
@Mock
5148
private ApplicationContext applicationContext;
5249

@@ -61,8 +58,6 @@ class DefaultDestinationTopicResolverTests extends DestinationTopicTests {
6158

6259
private final long failureTimestamp = Instant.now(this.clock).plusMillis(500).toEpochMilli();
6360

64-
private final byte[] originalTimestampBytes = BigInteger.valueOf(originalTimestamp).toByteArray();
65-
6661
@BeforeEach
6762
public void setup() {
6863

@@ -77,16 +72,16 @@ public void setup() {
7772
void shouldResolveRetryDestination() {
7873
assertThat(defaultDestinationTopicContainer
7974
.resolveDestinationTopic(mainDestinationTopic.getDestinationName(), 1,
80-
new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic);
75+
new RuntimeException(), this.originalTimestamp)).isEqualTo(firstRetryDestinationTopic);
8176
assertThat(defaultDestinationTopicContainer
8277
.resolveDestinationTopic(firstRetryDestinationTopic.getDestinationName(), 1,
83-
new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic);
78+
new RuntimeException(), this.originalTimestamp)).isEqualTo(secondRetryDestinationTopic);
8479
assertThat(defaultDestinationTopicContainer
8580
.resolveDestinationTopic(secondRetryDestinationTopic.getDestinationName(), 1,
86-
new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic);
81+
new RuntimeException(), this.originalTimestamp)).isEqualTo(dltDestinationTopic);
8782
assertThat(defaultDestinationTopicContainer
8883
.resolveDestinationTopic(dltDestinationTopic.getDestinationName(), 1,
89-
new IllegalArgumentException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic);
84+
new RuntimeException(), this.originalTimestamp)).isEqualTo(noOpsDestinationTopic);
9085

9186
assertThat(defaultDestinationTopicContainer
9287
.resolveDestinationTopic(mainDestinationTopic2.getDestinationName(), 1,
@@ -106,22 +101,38 @@ void shouldResolveRetryDestination() {
106101
void shouldResolveDltDestinationForNonRetryableException() {
107102
assertThat(defaultDestinationTopicContainer
108103
.resolveDestinationTopic(mainDestinationTopic.getDestinationName(),
109-
1, new RuntimeException(), originalTimestamp)).isEqualTo(dltDestinationTopic);
104+
1, new IllegalArgumentException(), originalTimestamp)).isEqualTo(dltDestinationTopic);
105+
}
106+
107+
@Test
108+
void shouldResolveDltDestinationForFatalDefaultException() {
109+
assertThat(defaultDestinationTopicContainer
110+
.resolveDestinationTopic(mainDestinationTopic.getDestinationName(),
111+
1, new ConversionException("Test exception", new RuntimeException()), originalTimestamp))
112+
.isEqualTo(dltDestinationTopic);
113+
}
114+
115+
@Test
116+
void shouldResolveNoOpsForFatalDefaultExceptionInDlt() {
117+
assertThat(defaultDestinationTopicContainer
118+
.resolveDestinationTopic(dltDestinationTopic.getDestinationName(),
119+
1, new ConversionException("Test exception", new RuntimeException()), originalTimestamp))
120+
.isEqualTo(noOpsDestinationTopic);
110121
}
111122

112123
@Test
113124
void shouldResolveRetryDestinationForWrappedListenerExecutionFailedException() {
114125
assertThat(defaultDestinationTopicContainer
115126
.resolveDestinationTopic(mainDestinationTopic.getDestinationName(),
116127
1, new ListenerExecutionFailedException("Test exception!",
117-
new IllegalArgumentException()), originalTimestamp)).isEqualTo(firstRetryDestinationTopic);
128+
new RuntimeException()), originalTimestamp)).isEqualTo(firstRetryDestinationTopic);
118129
}
119130

120131
@Test
121132
void shouldResolveRetryDestinationForWrappedTimestampedException() {
122133
assertThat(defaultDestinationTopicContainer
123134
.resolveDestinationTopic(mainDestinationTopic.getDestinationName(),
124-
1, new TimestampedException(new IllegalArgumentException()), originalTimestamp))
135+
1, new TimestampedException(new RuntimeException()), originalTimestamp))
125136
.isEqualTo(firstRetryDestinationTopic);
126137
}
127138

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DestinationTopicTests.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,15 +62,15 @@ public class DestinationTopicTests {
6262

6363
protected DestinationTopic.Properties mainTopicProps =
6464
new DestinationTopic.Properties(0, "", DestinationTopic.Type.RETRY, 4, 1,
65-
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOn(), noTimeout);
65+
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout);
6666

6767
protected DestinationTopic.Properties firstRetryProps =
6868
new DestinationTopic.Properties(1000, retrySuffix + "-1000", DestinationTopic.Type.RETRY, 4, 1,
69-
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOn(), noTimeout);
69+
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout);
7070

7171
protected DestinationTopic.Properties secondRetryProps =
7272
new DestinationTopic.Properties(2000, retrySuffix + "-2000", DestinationTopic.Type.RETRY, 4, 1,
73-
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOn(), noTimeout);
73+
DltStrategy.FAIL_ON_ERROR, kafkaOperations1, getShouldRetryOnDenyList(), noTimeout);
7474

7575
protected DestinationTopic.Properties dltTopicProps =
7676
new DestinationTopic.Properties(0, dltSuffix, DestinationTopic.Type.DLT, 4, 1,
@@ -218,11 +218,18 @@ public class DestinationTopicTests {
218218

219219
// Classifiers
220220

221-
private final BinaryExceptionClassifier classifier = new BinaryExceptionClassifierBuilder()
221+
private final BinaryExceptionClassifier allowListClassifier = new BinaryExceptionClassifierBuilder()
222222
.retryOn(IllegalArgumentException.class).build();
223223

224+
private final BinaryExceptionClassifier denyListClassifier = new BinaryExceptionClassifierBuilder()
225+
.notRetryOn(IllegalArgumentException.class).build();
226+
224227
private BiPredicate<Integer, Throwable> getShouldRetryOn() {
225-
return (a, e) -> a < maxAttempts && classifier.classify(e);
228+
return (a, e) -> a < maxAttempts && allowListClassifier.classify(e);
229+
}
230+
231+
private BiPredicate<Integer, Throwable> getShouldRetryOnDenyList() {
232+
return (a, e) -> a < maxAttempts && denyListClassifier.classify(e);
226233
}
227234

228235
class PropsHolder {

0 commit comments

Comments
 (0)