Skip to content

Commit f28ee55

Browse files
authored
GH-3012: Class level KL Non-blocking retry
Fixes: #3012 * Non-blocking retries support for `@KafkaListener` on class level, which includes `@RetryableTopic,` `@DltHandler,` `RetryTopicConfiguration,` and `RetryTopicConfigurationSupport.` * Doc and test for Non-blocking retries support for @KafkaListener on class level. * Addressing PR review. See these earlier related PRs: #3112 #3107
1 parent b0fb756 commit f28ee55

File tree

12 files changed

+2141
-78
lines changed

12 files changed

+2141
-78
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ Version 2.9 changed the mechanism to bootstrap infrastructure beans; see xref:re
77
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
88
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.
99

10+
Since 3.2, Spring for Apache Kafka supports non-blocking retries with xref:kafka/receiving-messages/class-level-kafkalistener.adoc[@KafkaListener on a Class].
11+
1012
IMPORTANT: Non-blocking retries are not supported with xref:kafka/receiving-messages/listener-annotation.adoc#batch-listeners[Batch Listeners].
1113

1214
IMPORTANT: Non-Blocking Retries cannot combine with xref:kafka/transactions.adoc#container-transaction-manager[Container Transactions].

spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ public void processMessage(MyPojo message) {
2828
}
2929
----
3030

31+
Since 3.2, `@RetryableTopic` support for @KafkaListener on a class would be:
32+
[source,java]
33+
----
34+
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
35+
@KafkaListener(topics = "my-annotated-topic")
36+
public class ClassLevelRetryListener {
37+
38+
@KafkaHandler
39+
public void processMessage(MyPojo message) {
40+
// ... message processing
41+
}
42+
43+
}
44+
----
45+
3146
You can specify a method in the same class to process the dlt messages by annotating it with the `@DltHandler` annotation.
3247
If no DltHandler method is provided a default consumer is created which only logs the consumption.
3348

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc
5555
Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`.
5656
See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay].
5757

58+
=== Non-blocking retries support class level @KafkaListener
59+
Non-blocking retries support xref:kafka/receiving-messages/class-level-kafkalistener.adoc[@KafkaListener on a Class].
60+
See xref:retrytopic.adoc[Non-Blocking Retries].
61+
5862
=== Support process @RetryableTopic on a class in RetryTopicConfigurationProvider.
5963
Provides a new public API to find `RetryTopicConfiguration`.
6064
See xref:retrytopic/retry-config.adoc#find-retry-topic-config[Find RetryTopicConfiguration]

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 66 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
185185

186186
private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
187187

188+
@Nullable
188189
private ApplicationContext applicationContext;
189190

190191
private BeanFactory beanFactory;
@@ -197,6 +198,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
197198

198199
private AnnotationEnhancer enhancer;
199200

201+
@Nullable
200202
private RetryTopicConfigurer retryTopicConfigurer;
201203

202204
@Override
@@ -273,9 +275,11 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
273275
public synchronized void setBeanFactory(BeanFactory beanFactory) {
274276
this.beanFactory = beanFactory;
275277
if (beanFactory instanceof ConfigurableListableBeanFactory clbf) {
276-
this.resolver = clbf.getBeanExpressionResolver();
277-
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
278-
this.listenerScope);
278+
BeanExpressionResolver beanExpressionResolver = clbf.getBeanExpressionResolver();
279+
if (beanExpressionResolver != null) {
280+
this.resolver = beanExpressionResolver;
281+
}
282+
this.expressionContext = new BeanExpressionContext(clbf, this.listenerScope);
279283
}
280284
}
281285

@@ -333,9 +337,11 @@ public void afterSingletonsInstantiated() {
333337

334338
// Actually register all listeners
335339
this.registrar.afterPropertiesSet();
336-
Map<String, ContainerGroupSequencer> sequencers =
337-
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
338-
sequencers.values().forEach(ContainerGroupSequencer::initialize);
340+
if (this.applicationContext != null) {
341+
Map<String, ContainerGroupSequencer> sequencers =
342+
this.applicationContext.getBeansOfType(ContainerGroupSequencer.class, false, false);
343+
sequencers.values().forEach(ContainerGroupSequencer::initialize);
344+
}
339345
}
340346

341347
private void buildEnhancer() {
@@ -368,36 +374,36 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
368374
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
369375
Class<?> targetClass = AopUtils.getTargetClass(bean);
370376
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
371-
final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
372-
final List<Method> multiMethods = new ArrayList<>();
373377
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
374378
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
375379
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
376380
return (!listenerMethods.isEmpty() ? listenerMethods : null);
377381
});
378-
if (hasClassLevelListeners) {
379-
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
380-
(ReflectionUtils.MethodFilter) method ->
381-
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
382-
multiMethods.addAll(methodsWithHandler);
383-
}
384-
if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {
382+
boolean hasClassLevelListeners = !classLevelListeners.isEmpty();
383+
boolean hasMethodLevelListeners = !annotatedMethods.isEmpty();
384+
if (!hasMethodLevelListeners && !hasClassLevelListeners) {
385385
this.nonAnnotatedClasses.add(bean.getClass());
386386
this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
387387
}
388388
else {
389-
// Non-empty set of methods
390-
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
391-
Method method = entry.getKey();
392-
for (KafkaListener listener : entry.getValue()) {
393-
processKafkaListener(listener, method, bean, beanName);
389+
if (hasMethodLevelListeners) {
390+
// Non-empty set of methods
391+
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
392+
Method method = entry.getKey();
393+
for (KafkaListener listener : entry.getValue()) {
394+
processKafkaListener(listener, method, bean, beanName);
395+
}
394396
}
397+
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
398+
+ beanName + "': " + annotatedMethods);
399+
}
400+
if (hasClassLevelListeners) {
401+
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
402+
(ReflectionUtils.MethodFilter) method ->
403+
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
404+
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
405+
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
395406
}
396-
this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
397-
+ beanName + "': " + annotatedMethods);
398-
}
399-
if (hasClassLevelListeners) {
400-
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
401407
}
402408
}
403409
return bean;
@@ -444,30 +450,25 @@ private KafkaListener enhance(AnnotatedElement element, KafkaListener ann) {
444450
}
445451

446452
private synchronized void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners,
447-
List<Method> multiMethods, Object bean, String beanName) {
453+
List<Method> multiMethods, Class<?> clazz, Object bean, String beanName) {
448454

449455
List<Method> checkedMethods = new ArrayList<>();
450456
Method defaultMethod = null;
451457
for (Method method : multiMethods) {
452458
Method checked = checkProxy(method, bean);
453459
KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);
454460
if (annotation != null && annotation.isDefault()) {
455-
final Method toAssert = defaultMethod;
461+
Method toAssert = defaultMethod;
456462
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
457-
+ toAssert.toString() + " and " + method.toString());
463+
+ toAssert.toString() + " and " + method);
458464
defaultMethod = checked;
459465
}
460466
checkedMethods.add(checked);
461467
}
462468
for (KafkaListener classLevelListener : classLevelListeners) {
463469
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
464470
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
465-
String beanRef = classLevelListener.beanRef();
466-
this.listenerScope.addListener(beanRef, bean);
467-
endpoint.setId(getEndpointId(classLevelListener));
468-
processListener(endpoint, classLevelListener, bean, beanName, resolveTopics(classLevelListener),
469-
resolveTopicPartitions(classLevelListener));
470-
this.listenerScope.removeListener(beanRef);
471+
processMainAndRetryListeners(classLevelListener, bean, beanName, endpoint, null, clazz);
471472
}
472473
}
473474

@@ -477,39 +478,34 @@ protected synchronized void processKafkaListener(KafkaListener kafkaListener, Me
477478
Method methodToUse = checkProxy(method, bean);
478479
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
479480
endpoint.setMethod(methodToUse);
481+
processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, methodToUse, null);
482+
}
483+
484+
private void processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
485+
MethodKafkaListenerEndpoint<K, V> endpoint, @Nullable Method methodToUse, @Nullable Class<?> clazz) {
480486

481487
String beanRef = kafkaListener.beanRef();
482488
this.listenerScope.addListener(beanRef, bean);
483489
endpoint.setId(getEndpointId(kafkaListener));
484490
String[] topics = resolveTopics(kafkaListener);
485491
TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
486-
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
492+
if (!processMainAndRetryListeners(kafkaListener, bean, beanName, endpoint, topics, tps, methodToUse, clazz)) {
487493
processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
488494
}
489495
this.listenerScope.removeListener(beanRef);
490496
}
491497

492498
private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object bean, String beanName,
493-
Method methodToUse, MethodKafkaListenerEndpoint<K, V> endpoint, String[] topics,
494-
TopicPartitionOffset[] tps) {
495-
496-
String[] retryableCandidates = topics;
497-
if (retryableCandidates.length == 0 && tps.length > 0) {
498-
retryableCandidates = Arrays.stream(tps)
499-
.map(tp -> tp.getTopic())
500-
.distinct()
501-
.toList()
502-
.toArray(new String[0]);
503-
}
499+
MethodKafkaListenerEndpoint<K, V> endpoint, String[] topics, TopicPartitionOffset[] tps,
500+
@Nullable Method methodToUse, @Nullable Class<?> clazz) {
504501

502+
String[] retryableCandidates = getTopicsFromTopicPartitionOffset(topics, tps);
505503
RetryTopicConfiguration retryTopicConfiguration = new RetryTopicConfigurationProvider(this.beanFactory,
506504
this.resolver, this.expressionContext)
507-
.findRetryConfigurationFor(retryableCandidates, methodToUse, bean);
508-
505+
.findRetryConfigurationFor(retryableCandidates, methodToUse, clazz, bean);
509506
if (retryTopicConfiguration == null) {
510-
String[] candidates = retryableCandidates;
511507
this.logger.debug(() ->
512-
"No retry topic configuration found for topics " + Arrays.toString(candidates));
508+
"No retry topic configuration found for topics " + Arrays.toString(retryableCandidates));
513509
return false;
514510
}
515511

@@ -525,6 +521,18 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
525521
return true;
526522
}
527523

524+
private String[] getTopicsFromTopicPartitionOffset(String[] topics, TopicPartitionOffset[] tps) {
525+
String[] retryableCandidates = topics;
526+
if (retryableCandidates.length == 0 && tps.length > 0) {
527+
retryableCandidates = Arrays.stream(tps)
528+
.map(TopicPartitionOffset::getTopic)
529+
.distinct()
530+
.toList()
531+
.toArray(new String[0]);
532+
}
533+
return retryableCandidates;
534+
}
535+
528536
private RetryTopicConfigurer getRetryTopicConfigurer() {
529537
if (this.retryTopicConfigurer == null) {
530538
try {
@@ -737,7 +745,7 @@ private KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListener k
737745
private void resolveContainerPostProcessor(MethodKafkaListenerEndpoint<?, ?> endpoint,
738746
KafkaListener kafkaListener) {
739747

740-
final String containerPostProcessor = kafkaListener.containerPostProcessor();
748+
String containerPostProcessor = kafkaListener.containerPostProcessor();
741749
if (StringUtils.hasText(containerPostProcessor)) {
742750
endpoint.setContainerPostProcessor(this.beanFactory.getBean(containerPostProcessor,
743751
ContainerPostProcessor.class));
@@ -804,7 +812,8 @@ private String getEndpointId(KafkaListener kafkaListener) {
804812
}
805813
}
806814

807-
private String getEndpointGroupId(KafkaListener kafkaListener, String id) {
815+
@Nullable
816+
private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String id) {
808817
String groupId = null;
809818
if (StringUtils.hasText(kafkaListener.groupId())) {
810819
groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
@@ -1086,8 +1095,7 @@ private void addFormatters(FormatterRegistry registry) {
10861095

10871096
private <T> Collection<T> getBeansOfType(Class<T> type) {
10881097
if (KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory lbf) {
1089-
return lbf.getBeansOfType(type)
1090-
.values();
1098+
return lbf.getBeansOfType(type).values();
10911099
}
10921100
else {
10931101
return Collections.emptySet();
@@ -1241,7 +1249,7 @@ public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, Anno
12411249

12421250
}
12431251

1244-
private final class BytesToNumberConverter implements ConditionalGenericConverter {
1252+
private static final class BytesToNumberConverter implements ConditionalGenericConverter {
12451253

12461254
BytesToNumberConverter() {
12471255
}
@@ -1265,6 +1273,9 @@ public Set<ConvertiblePair> getConvertibleTypes() {
12651273
@Nullable
12661274
public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
12671275
byte[] bytes = (byte[]) source;
1276+
if (bytes == null) {
1277+
return null;
1278+
}
12681279
if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {
12691280
Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONAR
12701281
return ByteBuffer.wrap(bytes).getLong();

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.kafka.listener.ListenerContainerRegistry;
4545
import org.springframework.kafka.listener.MessageListenerContainer;
4646
import org.springframework.kafka.support.EndpointHandlerMethod;
47+
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
4748
import org.springframework.lang.Nullable;
4849
import org.springframework.util.Assert;
4950
import org.springframework.util.StringUtils;
@@ -274,7 +275,17 @@ public MessageListenerContainer unregisterListenerContainer(String id) {
274275
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
275276
KafkaListenerContainerFactory<?> factory) {
276277

277-
if (endpoint instanceof MethodKafkaListenerEndpoint<?, ?> mkle) {
278+
if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> mmkle) {
279+
Object bean = mmkle.getBean();
280+
if (bean instanceof EndpointHandlerMultiMethod ehmm) {
281+
ehmm = new EndpointHandlerMultiMethod(ehmm.resolveBean(this.applicationContext),
282+
ehmm.getDefaultMethod(), ehmm.getMethods());
283+
mmkle.setBean(ehmm.resolveBean(this.applicationContext));
284+
mmkle.setDefaultMethod(ehmm.getDefaultMethod());
285+
mmkle.setMethods(ehmm.getMethods());
286+
}
287+
}
288+
else if (endpoint instanceof MethodKafkaListenerEndpoint<?, ?> mkle) {
278289
Object bean = mkle.getBean();
279290
if (bean instanceof EndpointHandlerMethod ehm) {
280291
ehm = new EndpointHandlerMethod(ehm.resolveBean(this.applicationContext), ehm.getMethodName());

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,19 @@
165165
* }</code>
166166
* }</code>
167167
*</pre>
168+
* <p> Since 3.2, {@link org.springframework.kafka.annotation.RetryableTopic} annotation supports
169+
* {@link org.springframework.kafka.annotation.KafkaListener} annotated class, such as:
170+
* <pre>
171+
* <code>@RetryableTopic(attempts = 3,
172+
* backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
173+
* <code>@KafkaListener(topics = "my-annotated-topic")
174+
* static class ListenerBean {
175+
* <code> @KafkaHandler
176+
* public void processMessage(MyPojo message) {
177+
* // ... message processing
178+
* }</code>
179+
* }</code>
180+
*</pre>
168181
* <p> Or through meta-annotations, such as:
169182
* <pre>
170183
* <code>@RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>

0 commit comments

Comments
 (0)