Skip to content

Commit 2d29a2b

Browse files
spring-projectsGH-2226: Add RetryTopicConfigurationSupport
Resolves spring-projects#2226 Add RetryTopicConfigurationSupport Add @EnableRetryTopic Add KafkaBackOffManagerConfigurationSupport
1 parent feb3563 commit 2d29a2b

16 files changed

+839
-73
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import java.lang.annotation.Documented;
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
import org.springframework.context.annotation.Configuration;
26+
import org.springframework.context.annotation.Import;
27+
import org.springframework.kafka.config.KafkaBackOffManagerConfigurationSupport;
28+
import org.springframework.kafka.config.RetryTopicConfigurationSupport;
29+
30+
/**
31+
* Enables the non-blocking topic-based delayed retries feature. To be used in
32+
* {@link Configuration Configuration} classes as follows:
33+
* <pre class="code">
34+
*
35+
* &#064;EnableRetryTopic
36+
* &#064;Configuration
37+
* public class AppConfig {
38+
*
39+
* &#064;Bean
40+
* public RetryTopicConfiguration myRetryTopicConfiguration(KafkaTemplate kafkaTemplate) {
41+
* return RetryTopicConfigurationBuilder
42+
* .newInstance()
43+
* .maxAttempts(4)
44+
* .create(kafkaTemplate);
45+
* }
46+
* // other &#064;Bean definitions
47+
* }
48+
* </pre>
49+
*
50+
* To configure the feature's components, extend the {@link RetryTopicConfigurationSupport}
51+
* class and override the appropriate methods. Then import the subclass using the
52+
* {@link Import @Import} annotation on a {@link Configuration @Configuration} class,
53+
* such as:
54+
*
55+
* <pre class="code">
56+
*
57+
* &#064;Configuration
58+
* &#064;EnableKafka
59+
* &#064;Import(MyRetryTopicConfigurationSupport.class)
60+
* public class AppConfig {
61+
* }
62+
*
63+
* public static class MyRetryTopicConfigurationSupport extends RetryTopicConfigurationSupport {
64+
*
65+
* &#064;Override
66+
* protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
67+
* blockingRetries
68+
* .retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
69+
* .backOff(new FixedBackOff(50, 3));
70+
* }
71+
*
72+
* &#064;Override
73+
* protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
74+
* nonBlockingRetries
75+
* .addToFatalExceptions(ShouldSkipBothRetriesException.class);
76+
* }
77+
* </pre>
78+
*
79+
* @author Tomaz Fernandes
80+
* @since 2.9
81+
*/
82+
@Retention(RetentionPolicy.RUNTIME)
83+
@Target(ElementType.TYPE)
84+
@Documented
85+
@Import({RetryTopicConfigurationSupport.class, KafkaBackOffManagerConfigurationSupport.class})
86+
public @interface EnableRetryTopic {
87+
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import org.springframework.kafka.listener.ContainerGroupSequencer;
8787
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
8888
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
89+
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
8990
import org.springframework.kafka.retrytopic.RetryTopicBootstrapper;
9091
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
9192
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
@@ -509,11 +510,15 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
509510
return true;
510511
}
511512

513+
@SuppressWarnings("deprecation")
512514
private RetryTopicConfigurer getRetryTopicConfigurer() {
513515
bootstrapRetryTopicIfNecessary();
514-
return this.beanFactory.getBean(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER, RetryTopicConfigurer.class);
516+
return this.beanFactory.containsBean(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
517+
? this.beanFactory.getBean(RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER, RetryTopicConfigurer.class)
518+
: this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
515519
}
516520

521+
@SuppressWarnings("deprecation")
517522
private void bootstrapRetryTopicIfNecessary() {
518523
if (!(this.beanFactory instanceof BeanDefinitionRegistry)) {
519524
throw new IllegalStateException("BeanFactory must be an instance of "

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
import org.springframework.core.annotation.AnnotationUtils;
3333
import org.springframework.expression.spel.support.StandardEvaluationContext;
3434
import org.springframework.kafka.core.KafkaOperations;
35+
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
3536
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
3637
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
3738
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
@@ -200,6 +201,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
200201
.orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER);
201202
}
202203

204+
@SuppressWarnings("deprecation")
203205
private KafkaOperations<?, ?> getKafkaTemplate(String kafkaTemplateName, String[] topics) {
204206
if (StringUtils.hasText(kafkaTemplateName)) {
205207
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name");
@@ -218,12 +220,19 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
218220
}
219221
catch (NoSuchBeanDefinitionException ex) {
220222
try {
221-
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
223+
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
224+
KafkaOperations.class);
222225
}
223-
catch (NoSuchBeanDefinitionException exc) {
224-
exc.addSuppressed(ex);
225-
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", // NOSONAR (lost stack trace)
226-
exc);
226+
catch (NoSuchBeanDefinitionException ex2) {
227+
try {
228+
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
229+
}
230+
catch (NoSuchBeanDefinitionException exc) {
231+
exc.addSuppressed(ex);
232+
exc.addSuppressed(ex2);
233+
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", // NOSONAR (lost stack trace)
234+
exc);
235+
}
227236
}
228237
}
229238
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import java.time.Clock;
20+
21+
import org.springframework.beans.BeansException;
22+
import org.springframework.beans.factory.annotation.Qualifier;
23+
import org.springframework.context.ApplicationContext;
24+
import org.springframework.context.ApplicationContextAware;
25+
import org.springframework.context.ApplicationListener;
26+
import org.springframework.context.ConfigurableApplicationContext;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.event.ContextClosedEvent;
29+
import org.springframework.core.task.TaskExecutor;
30+
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
31+
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
32+
import org.springframework.kafka.listener.ListenerContainerRegistry;
33+
import org.springframework.kafka.listener.PartitionPausingBackoffManager;
34+
import org.springframework.kafka.listener.WakingKafkaConsumerTimingAdjuster;
35+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
36+
import org.springframework.util.Assert;
37+
38+
/**
39+
* This class provides configuration support for a global
40+
* {@link KafkaConsumerBackoffManager} instance. Consider overriding any of the
41+
* protected methods for providing different components or configuration.
42+
* This class is automatically imported by the
43+
* {@link org.springframework.kafka.annotation.EnableRetryTopic @EnableRetryTopic}
44+
* annotation.
45+
*
46+
* @author Tomaz Fernandes
47+
* @since 2.9
48+
*/
49+
public class KafkaBackOffManagerConfigurationSupport implements ApplicationContextAware {
50+
51+
private ConfigurableApplicationContext applicationContext;
52+
53+
/**
54+
* Provides the {@link KafkaConsumerBackoffManager} instance.
55+
* To customize it or any of the components, consider overriding
56+
* one of the more fine graned methods:
57+
* <ul>
58+
* <li>{@link #backOffManagerClock}</li>
59+
* <li>{@link #timingAdjuster}</li>
60+
* <li>{@link #timingAdjusterTaskExecutor}</li>
61+
* </ul>
62+
* @param registry the global {@link ListenerContainerRegistry} instance.
63+
* @return the instance.
64+
*/
65+
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
66+
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
67+
ListenerContainerRegistry registry) {
68+
return new PartitionPausingBackoffManager(registry, timingAdjuster(timingAdjusterTaskExecutor()), backOffManagerClock());
69+
}
70+
71+
/**
72+
* Override this method to provide a different {@link Clock}
73+
* instance to be used with the {@link KafkaConsumerBackoffManager}.
74+
* @return the instance.
75+
*/
76+
protected Clock backOffManagerClock() {
77+
return Clock.systemUTC();
78+
}
79+
80+
/**
81+
* Override this method to provide a different {@link KafkaConsumerTimingAdjuster}
82+
* to be used with the {@link KafkaConsumerBackoffManager}.
83+
* @param taskExecutor the task executor.
84+
* @return the instance.
85+
*/
86+
protected KafkaConsumerTimingAdjuster timingAdjuster(TaskExecutor taskExecutor) {
87+
return new WakingKafkaConsumerTimingAdjuster(taskExecutor);
88+
}
89+
90+
/**
91+
* Override this method to provide a different {@link TaskExecutor}
92+
* to be used with the {@link KafkaConsumerTimingAdjuster}.
93+
* @return the instance.
94+
*/
95+
protected TaskExecutor timingAdjusterTaskExecutor() {
96+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
97+
executor.initialize();
98+
this.applicationContext
99+
.addApplicationListener((ApplicationListener<ContextClosedEvent>) event -> executor.shutdown());
100+
return executor;
101+
}
102+
103+
@Override
104+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
105+
Assert.isInstanceOf(ConfigurableApplicationContext.class, applicationContext);
106+
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
107+
}
108+
}

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerConfigUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
*
2222
* @author Juergen Hoeller
2323
* @author Gary Russell
24+
* @author Tomaz Fernandes
2425
*/
2526
public abstract class KafkaListenerConfigUtils {
2627

@@ -36,4 +37,10 @@ public abstract class KafkaListenerConfigUtils {
3637
public static final String KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME =
3738
"org.springframework.kafka.config.internalKafkaListenerEndpointRegistry";
3839

40+
/**
41+
* The bean name of the internally managed Kafka consumer back off manager.
42+
*/
43+
public static final String KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME =
44+
"org.springframework.kafka.config.internalKafkaConsumerBackOffManager";
45+
3946
}

0 commit comments

Comments
 (0)