Skip to content

Commit b2beec3

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-2226: Add RetryTopicConfigurationSupport
Resolves #2226 Add RetryTopicConfigurationSupport Add @EnableRetryTopic Add KafkaBackOffManagerConfigurationSupport Address code review comments * Change deprecated constants for strings * Remove deprecation suppressions * Remove ApplicationContext and BeanFactory from constructors * Change TaskExecutor shutdown logic to DisposableBean Remove KafkaBackOffManagerConfigurationSupport Move Task Executor logic to Timing Adjuster Add unit tests Enable managing fatal exceptions with a List Create defaultFatalExceptionsList method in `ExceptionClassifier` Change RetryTopicConfigurationSupport logic to provide a list instead of a configurer for non-blocking retries Make provided TaskExecutor a bean Add KafkaBackOffManagerConfigurer to RetryTopicConfigurationSupport Add documentation Review javadocs Few other adjustments Use JavaUtils instead of if statements Add acceptIfInstanceOf to JavaUtils Address PR review comments Restore `ThreadPoolTaskExecutor` bean Polishing per review comments.
1 parent 44e3f52 commit b2beec3

30 files changed

+1501
-261
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 89 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ NOTE: You can have separate `ListenerContainerFactory` instances for the main an
4848

4949
==== Configuration
5050

51+
Starting with version 2.9, the `@EnableKafkaRetryTopic` annotation should be used in a `@Configuration` annotated class.
52+
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
53+
Also, to configure the feature's components and global features, the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the appropriate methods overridden.
54+
For more details refer to <<retry-topic-global-settings>>.
55+
56+
NOTE: It is not necessary to also add `@EnableKafka`, if you add this annotation, because `@EnableKafkaRetryTopic` is meta-annotated with `@EnableKafka`.
57+
5158
===== Using the `@RetryableTopic` annotation
5259

5360
To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
@@ -76,7 +83,7 @@ public void processMessage(MyPojo message) {
7683
----
7784
====
7885

79-
NOTE: If you don't specify a kafkaTemplate name a bean with name `retryTopicDefaultKafkaTemplate` will be looked up.
86+
NOTE: If you don't specify a kafkaTemplate name a bean with name `defaultRetryTopicKafkaTemplate` will be looked up.
8087
If no bean is found an exception is thrown.
8188

8289
===== Using `RetryTopicConfiguration` beans
@@ -151,6 +158,40 @@ public KafkaTemplate<String, Object> kafkaTemplate() {
151158
IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
152159
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.
153160

161+
[[retry-topic-global-settings]]
162+
===== Configuring Global Settings and Features
163+
164+
Since 2.9, the previous bean overriding approach for configuring components has been deprecated.
165+
This does not change the `RetryTopicConfiguration` beans approach - only components' configurations.
166+
Now the `RetryTopicConfigurationSupport` class should be extended in a `@Configuration` class, and the proper methods overridden.
167+
An example follows:
168+
169+
====
170+
[source, java]
171+
----
172+
173+
@EnableKafka
174+
@Configuration
175+
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
176+
177+
@Override
178+
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
179+
blockingRetries
180+
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
181+
.backOff(new FixedBackOff(3000, 3));
182+
}
183+
184+
@Override
185+
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
186+
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
187+
}
188+
}
189+
----
190+
====
191+
192+
IMPORTANT: When using this configuration approach, the `@EnableKafkaRetryTopic` annotation should not be used to prevent context failing to start due to duplicated beans.
193+
Use the simple `@EnableKafka` annotation instead.
194+
154195
==== Features
155196

156197
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.
@@ -315,24 +356,22 @@ NOTE: The default behavior is retrying on all exceptions and not traversing caus
315356

316357
Since 2.8.3 there's a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
317358
See <<default-eh>> for the default list of fatal exceptions.
318-
You can add or remove exceptions to and from this list with:
359+
You can add or remove exceptions to and from this list by overriding the `configureNonBlockingRetries` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
360+
See <<retry-topic-global-settings>> for more information.
319361

320362
====
321363
[source, java]
322364
----
323-
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
324-
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
325-
@Qualifier(RetryTopicInternalBeanNames
326-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
327-
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
328-
ddtr.addNotRetryableExceptions(MyFatalException.class);
329-
ddtr.removeNotRetryableException(ConversionException.class);
330-
return ddtr;
365+
366+
@Override
367+
protected void manageNonBlockingRetriesFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
368+
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
331369
}
370+
332371
----
333372
====
334373

335-
NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.
374+
NOTE: To disable fatal exceptions' classification, just clear the provided list.
336375

337376

338377
===== Include and Exclude Topics
@@ -419,19 +458,18 @@ This is to avoid creation of excessively large messages (due to the stack trace
419458

420459
See <<dlpr-headers>> for more information.
421460

422-
To reconfigure the framework to use different settings for these properties, replace the standard `DeadLetterPublishingRecovererFactory` bean by adding a `recovererCustomizer`:
461+
To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecoverer` customizer by overriding the `configureCustomizers` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport`.
462+
See <<retry-topic-global-settings>> for more details.
423463

424464
====
425465
[source, java]
426466
----
427-
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
428-
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
429-
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
430-
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
431-
dlpr.appendOriginalHeaders(true);
467+
@Override
468+
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
469+
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
470+
dlpr.setAppendOriginalHeaders(true);
432471
dlpr.setStripPreviousExceptionHeaders(false);
433472
});
434-
return factory;
435473
}
436474
----
437475
====
@@ -444,32 +482,21 @@ Starting with version 2.8.4, if you wish to add custom headers (in addition to t
444482
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
445483
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
446484

447-
To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
448-
The default policy is `FixedBackOff`, with nine retries and no delay between them.
449-
Optionally, you can provide your own back off policy.
485+
To configure blocking retries, override the `configureBlockingRetries` method in a `@Configuration` class that extends `RetryTopicConfigurationSupport` and add the exceptions you want to retry, along with the `BackOff` to be used.
486+
The default `BackOff` is a `FixedBackOff` with no delay and 9 attempts.
487+
See <<retry-topic-global-settings>> for more information.
450488

451489
====
452490
[source, java]
453491
----
454-
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
455-
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
456-
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
457-
@Qualifier(RetryTopicInternalBeanNames
458-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
459-
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
460-
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
461-
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
462-
return lcfc;
463-
}
464-
----
465-
====
466492
467-
If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:
493+
@Override
494+
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
495+
blockingRetries
496+
.retryOn(MyBlockingRetryException.class, MyOtherBlockingRetryException.class)
497+
.backOff(new FixedBackOff(3000, 5));
498+
}
468499
469-
====
470-
[source, java]
471-
----
472-
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
473500
----
474501
====
475502

@@ -480,23 +507,16 @@ Here's an example with both configurations working together:
480507
====
481508
[source, java]
482509
----
483-
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
484-
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
485-
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
486-
@Qualifier(RetryTopicInternalBeanNames
487-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
488-
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
489-
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
490-
return lcfc;
510+
@Override
511+
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
512+
blockingRetries
513+
.retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
514+
.backOff(new FixedBackOff(50, 3));
491515
}
492516
493-
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
494-
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
495-
@Qualifier(RetryTopicInternalBeanNames
496-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
497-
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
498-
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
499-
return ddtr;
517+
@Override
518+
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
519+
nonBlockingFatalExceptions.add(ShouldSkipBothRetriesException.class);
500520
}
501521
502522
----
@@ -590,9 +610,14 @@ More complex naming strategies can be accomplished by registering a bean that im
590610
====
591611
[source, java]
592612
----
593-
@Bean
594-
public RetryTopicNamesProviderFactory myRetryNamingProviderFactory() {
595-
return new CustomRetryTopicNamesProviderFactory();
613+
@Override
614+
protected RetryTopicComponentFactory createComponentFactory() {
615+
return new RetryTopicComponentFactory() {
616+
@Override
617+
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
618+
return new CustomRetryTopicNamesProviderFactory();
619+
}
620+
};
596621
}
597622
----
598623
====
@@ -811,21 +836,14 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
811836

812837
IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.
813838

814-
If you need to revert the factory configuration behavior to prior 2.8.3, you can replace the standard `RetryTopicConfigurer` bean and set `useLegacyFactoryConfigurer` to `true`, such as:
839+
If you need to revert the factory configuration behavior to prior 2.8.3, you can override the `configureRetryTopicConfigurer` method of a `@Configuration` class that extends `RetryTopicConfigurationSupport` as explained in <<retry-topic-global-settings>> and set `useLegacyFactoryConfigurer` to `true`, such as:
815840

816841
====
817842
[source, java]
818843
----
819-
820-
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
821-
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
822-
ListenerContainerFactoryResolver containerFactoryResolver,
823-
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
824-
BeanFactory beanFactory,
825-
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
826-
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
827-
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
828-
return retryTopicConfigurer;
844+
@Override
845+
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
846+
return rtc -> rtc.useLegacyFactoryConfigurer(true);
829847
}
830848
----
831849
====
@@ -840,14 +858,10 @@ For example, to change the logging level to WARN you might add:
840858
====
841859
[source, java]
842860
----
843-
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
844-
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
845-
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
846-
@Qualifier(RetryTopicInternalBeanNames
847-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
848-
ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
849-
configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
850-
return configurer;
861+
@Override
862+
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
863+
customizersConfigurer.customizeErrorHandler(commonErrorHandler ->
864+
((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN))
851865
}
852866
----
853867
====
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import java.lang.annotation.Documented;
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
import org.springframework.context.annotation.Configuration;
26+
import org.springframework.context.annotation.Import;
27+
import org.springframework.kafka.config.RetryTopicConfigurationSupport;
28+
29+
/**
30+
* Enables the non-blocking topic-based delayed retries feature. To be used in
31+
* {@link Configuration Configuration} classes as follows:
32+
* <pre class="code">
33+
*
34+
* &#064;EnableKafkaRetryTopic
35+
* &#064;Configuration
36+
* public class AppConfig {
37+
* }
38+
*
39+
* &#064;Component
40+
* public class MyListener {
41+
*
42+
* &#064;RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(4000))
43+
* &#064;KafkaListener(topics = "myTopic")
44+
* public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
45+
* logger.info("Message {} received in topic {} ", message, receivedTopic);
46+
* }
47+
*
48+
* &#064;DltHandler
49+
* public void dltHandler(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
50+
* logger.info("Message {} received in dlt handler at topic {} ", message, receivedTopic);
51+
* }
52+
* </pre>
53+
*
54+
* Using this annotation configures the default {@link RetryTopicConfigurationSupport}
55+
* bean. This annotation is meta-annotated with {@code @EnableKafka} so it is not
56+
* necessary to specify both.
57+
*
58+
* To configure the feature's components, extend the
59+
* {@link RetryTopicConfigurationSupport} class and override the appropriate methods on a
60+
* {@link Configuration @Configuration} class, such as:
61+
*
62+
* <pre class="code">
63+
*
64+
* &#064;Configuration
65+
* &#064;EnableKafka
66+
* public class AppConfig extends RetryTopicConfigurationSupport {
67+
* &#064;Override
68+
* protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
69+
* blockingRetries
70+
* .retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
71+
* .backOff(new FixedBackOff(50, 3));
72+
* }
73+
*
74+
* &#064;Override
75+
* protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
76+
* nonBlockingRetries
77+
* .addToFatalExceptions(ShouldSkipBothRetriesException.class);
78+
* }
79+
* </pre>
80+
* In this case, you should not use this annotation, use {@code @EnableKafka} instead.
81+
*
82+
* @author Tomaz Fernandes
83+
* @since 2.9
84+
*/
85+
@Retention(RetentionPolicy.RUNTIME)
86+
@Target(ElementType.TYPE)
87+
@Documented
88+
@Import(RetryTopicConfigurationSupport.class)
89+
@EnableKafka
90+
public @interface EnableKafkaRetryTopic {
91+
}

0 commit comments

Comments
 (0)