Skip to content

Commit 4e9c099

Browse files
authored
@RetryableTopic support for KL on class part 2
* @RetryableTopic support for KafkaListener annotation on class part 2 * `EndpointCustomizerFactory` adaptor `MultiMethodKafkaListenerEndpoint`. * `RetryTopicConfigurer.processAndRegisterEndpoint` support `@KafkaListener` on a class. * Add new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints. * Deprecated `EndpointCustomizerFactory.addSuffixesAndMethod`. * Document public API changes in `whats-new.adoc`(or javadoc). part2 of #3105 and this contributes to fixing #3105 eventually
1 parent ab5f0a1 commit 4e9c099

File tree

9 files changed

+381
-138
lines changed

9 files changed

+381
-138
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topi
5656
Provides a new public API to find `RetryTopicConfiguration`.
5757
See xref:retrytopic/retry-config.adoc#find-retry-topic-config[Find RetryTopicConfiguration]
5858

59+
=== RetryTopicConfigurer support process MultiMethodKafkaListenerEndpoint.
60+
The `RetryTopicConfigurer` support process and register `MultiMethodKafkaListenerEndpoint`.
61+
The `MultiMethodKafkaListenerEndpoint` provides `getter/setter` for properties `defaultMethod` and `methods`.
62+
Modify the `EndpointCustomizer` that strictly for `MethodKafkaListenerEndpoint` types.
63+
The `EndpointHandlerMethod` add new constructors construct an instance for the provided bean.
64+
Provides new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints.
65+
5966
[[x32-seek-offset-compute-fn]]
6067
=== New API method to seek to an offset based on a user provided function
6168
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.

spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,15 +35,16 @@
3535
* @param <V> the value type.
3636
*
3737
* @author Gary Russell
38+
* @author Wang Zhiyang
3839
*
3940
* @see org.springframework.kafka.annotation.KafkaHandler
4041
* @see DelegatingInvocableHandler
4142
*/
4243
public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerEndpoint<K, V> {
4344

44-
private final List<Method> methods;
45+
private List<Method> methods;
4546

46-
private final Method defaultMethod;
47+
private Method defaultMethod;
4748

4849
private Validator validator;
4950

@@ -60,6 +61,43 @@ public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method d
6061
setBean(bean);
6162
}
6263

64+
65+
/**
66+
* Get a method list.
67+
* @return the method list.
68+
* @since 3.2
69+
*/
70+
public List<Method> getMethods() {
71+
return this.methods;
72+
}
73+
74+
/**
75+
* Set a method list.
76+
* @param methods the methods.
77+
* @since 3.2
78+
*/
79+
public void setMethods(List<Method> methods) {
80+
this.methods = methods;
81+
}
82+
83+
/**
84+
* Get a default method.
85+
* @return the default method.
86+
* @since 3.2
87+
*/
88+
public Method getDefaultMethod() {
89+
return this.defaultMethod;
90+
}
91+
92+
/**
93+
* Set a default method.
94+
* @param defaultMethod the default method.
95+
* @since 3.2
96+
*/
97+
public void setDefaultMethod(Method defaultMethod) {
98+
this.defaultMethod = defaultMethod;
99+
}
100+
63101
/**
64102
* Set a payload validator.
65103
* @param validator the validator.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,21 +24,25 @@
2424
* Customizes main, retry and DLT endpoints in the Retry Topic functionality
2525
* and returns the resulting topic names.
2626
*
27+
* @param <T> the listener endpoint type.
28+
*
2729
* @author Tomaz Fernandes
30+
* @author Wang Zhiyang
31+
*
2832
* @since 2.7.2
2933
*
3034
* @see EndpointCustomizerFactory
3135
*
3236
*/
3337
@FunctionalInterface
34-
public interface EndpointCustomizer {
38+
public interface EndpointCustomizer<T extends MethodKafkaListenerEndpoint<?, ?>> {
3539

3640
/**
3741
* Customize the endpoint and return the topic names generated for this endpoint.
3842
* @param listenerEndpoint The main, retry or DLT endpoint to be customized.
3943
* @return A collection containing the topic names generated for this endpoint.
4044
*/
41-
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint);
45+
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(T listenerEndpoint);
4246

4347
class TopicNamesHolder {
4448

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import org.springframework.beans.factory.BeanFactory;
2525
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
26+
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
2627
import org.springframework.kafka.support.EndpointHandlerMethod;
28+
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
2729
import org.springframework.kafka.support.TopicPartitionOffset;
2830

2931
/**
@@ -63,41 +65,88 @@ public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperti
6365
this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
6466
}
6567

66-
public final EndpointCustomizer createEndpointCustomizer() {
67-
return addSuffixesAndMethod(this.destinationProperties, this.beanMethod.resolveBean(this.beanFactory),
68-
this.beanMethod.getMethod());
68+
public final EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer() {
69+
return addSuffixesAndMethod(this.destinationProperties);
6970
}
7071

71-
protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, Method method) {
72+
/**
73+
* Create MethodKafkaListenerEndpoint's EndpointCustomizer, but not support MultiMethodKafkaListenerEndpoint.
74+
* Replace by {@link #addSuffixesAndMethod(DestinationTopic.Properties)}
75+
* @param properties the destination-topic's properties.
76+
* @param bean the bean.
77+
* @param method the method.
78+
* @return the endpoint customizer.
79+
*/
80+
@Deprecated(since = "3.2", forRemoval = true)
81+
@SuppressWarnings("rawtypes")
82+
protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean,
83+
Method method) {
84+
85+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider =
86+
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties);
87+
return endpoint -> {
88+
Collection<EndpointCustomizer.TopicNamesHolder> topics =
89+
customizeAndRegisterTopics(namesProvider, endpoint);
90+
configurationEndpoint(endpoint, namesProvider, properties, bean);
91+
endpoint.setMethod(method);
92+
return topics;
93+
};
94+
}
95+
96+
/**
97+
* Create MethodKafkaListenerEndpoint's EndpointCustomizer and support MultiMethodKafkaListenerEndpoint.
98+
* @param properties the destination-topic's properties.
99+
* @return the endpoint customizer.
100+
* @since 3.2
101+
*/
102+
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> addSuffixesAndMethod(
103+
DestinationTopic.Properties properties) {
104+
72105
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider =
73106
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties);
74107
return endpoint -> {
75-
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
76-
endpoint.setId(namesProvider.getEndpointId(endpoint));
77-
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
78-
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
79-
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
80-
endpoint.getTopicPartitionsToAssign()));
108+
Collection<EndpointCustomizer.TopicNamesHolder> topics =
109+
customizeAndRegisterTopics(namesProvider, endpoint);
110+
configurationEndpoint(endpoint, namesProvider, properties, this.beanMethod.resolveBean(this.beanFactory));
111+
if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multiMethodEndpoint
112+
&& this.beanMethod instanceof EndpointHandlerMultiMethod beanMultiMethod) {
113+
multiMethodEndpoint.setDefaultMethod(beanMultiMethod.getDefaultMethod());
114+
multiMethodEndpoint.setMethods(beanMultiMethod.getMethods());
81115
}
82116
else {
83-
endpoint.setTopics(endpoint.getTopics().stream()
84-
.map(namesProvider::getTopicName).toArray(String[]::new));
85-
}
86-
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
87-
endpoint.setGroup(namesProvider.getGroup(endpoint));
88-
endpoint.setBean(bean);
89-
endpoint.setMethod(method);
90-
Boolean autoStartDltHandler = properties.autoStartDltHandler();
91-
if (autoStartDltHandler != null && properties.isDltTopic()) {
92-
endpoint.setAutoStartup(autoStartDltHandler);
117+
endpoint.setMethod(this.beanMethod.getMethod());
93118
}
94119
return topics;
95120
};
96121
}
97122

123+
private void configurationEndpoint(MethodKafkaListenerEndpoint<?, ?> endpoint,
124+
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
125+
DestinationTopic.Properties properties, Object bean) {
126+
127+
endpoint.setId(namesProvider.getEndpointId(endpoint));
128+
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
129+
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
130+
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
131+
endpoint.getTopicPartitionsToAssign()));
132+
}
133+
else {
134+
endpoint.setTopics(endpoint.getTopics().stream()
135+
.map(namesProvider::getTopicName).toArray(String[]::new));
136+
}
137+
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
138+
endpoint.setGroup(namesProvider.getGroup(endpoint));
139+
endpoint.setBean(bean);
140+
Boolean autoStartDltHandler = properties.autoStartDltHandler();
141+
if (autoStartDltHandler != null && properties.isDltTopic()) {
142+
endpoint.setAutoStartup(autoStartDltHandler);
143+
}
144+
}
145+
98146
private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
99147
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
100148
TopicPartitionOffset[] topicPartitionOffsets) {
149+
101150
return Stream.of(topicPartitionOffsets)
102151
.map(tpo -> properties.isMainEndpoint()
103152
? getTPOForMainTopic(namesProvider, tpo)

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.lang.reflect.Method;
2020
import java.util.Collection;
21+
import java.util.List;
2122
import java.util.function.Consumer;
2223

2324
import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,7 @@
3637
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
3738
import org.springframework.kafka.support.Acknowledgment;
3839
import org.springframework.kafka.support.EndpointHandlerMethod;
40+
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
3941
import org.springframework.kafka.support.KafkaUtils;
4042
import org.springframework.kafka.support.TopicForRetryable;
4143
import org.springframework.lang.NonNull;
@@ -150,6 +152,19 @@
150152
* // ... message processing
151153
* }</code>
152154
*</pre>
155+
* <p> Since 3.2 , {@link org.springframework.kafka.annotation.RetryableTopic} annotation support
156+
* {@link org.springframework.kafka.annotation.KafkaListener} annotated class, such as:
157+
* <pre>
158+
* <code>@RetryableTopic(attempts = 3,
159+
* backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
160+
* <code>@KafkaListener(topics = "my-annotated-topic")
161+
* static class ListenerBean {
162+
* <code> @KafkaHandler
163+
* public void processMessage(MyPojo message) {
164+
* // ... message processing
165+
* }</code>
166+
* }</code>
167+
*</pre>
153168
* <p> Or through meta-annotations, such as:
154169
* <pre>
155170
* <code>@RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
@@ -281,7 +296,7 @@ public void processMainAndRetryListeners(EndpointProcessor endpointProcessor,
281296
KafkaListenerEndpointRegistrar registrar,
282297
@Nullable KafkaListenerContainerFactory<?> factory,
283298
String defaultContainerFactoryBeanName) {
284-
throwIfMultiMethodEndpoint(mainEndpoint);
299+
285300
String id = mainEndpoint.getId();
286301
if (id == null) {
287302
id = "no.id.provided";
@@ -300,6 +315,7 @@ private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
300315
RetryTopicConfiguration configuration,
301316
DestinationTopicProcessor.Context context,
302317
String defaultContainerFactoryBeanName) {
318+
303319
this.destinationTopicProcessor
304320
.processDestinationTopicProperties(destinationTopicProperties ->
305321
processAndRegisterEndpoint(mainEndpoint,
@@ -330,7 +346,13 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
330346
endpoint = mainEndpoint;
331347
}
332348
else {
333-
endpoint = new MethodKafkaListenerEndpoint<>();
349+
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multi) {
350+
endpoint = new MultiMethodKafkaListenerEndpoint<>(multi.getMethods(), multi.getDefaultMethod(),
351+
multi.getBean());
352+
}
353+
else {
354+
endpoint = new MethodKafkaListenerEndpoint<>();
355+
}
334356
endpoint.setId(mainEndpoint.getId());
335357
endpoint.setMainListenerId(mainEndpoint.getId());
336358
}
@@ -345,12 +367,12 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
345367
getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);
346368

347369
createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties)
348-
.customizeEndpointAndCollectTopics(endpoint)
349-
.forEach(topicNamesHolder ->
350-
this.destinationTopicProcessor
351-
.registerDestinationTopic(topicNamesHolder.getMainTopic(),
352-
topicNamesHolder.getCustomizedTopic(),
353-
destinationTopicProperties, context));
370+
.customizeEndpointAndCollectTopics(endpoint)
371+
.forEach(topicNamesHolder ->
372+
this.destinationTopicProcessor
373+
.registerDestinationTopic(topicNamesHolder.getMainTopic(),
374+
topicNamesHolder.getCustomizedTopic(),
375+
destinationTopicProperties, context));
354376

355377
registrar.registerEndpoint(endpoint, resolvedFactory);
356378
endpoint.setBeanFactory(this.beanFactory);
@@ -359,9 +381,10 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
359381
protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
360382
RetryTopicConfiguration configuration,
361383
DestinationTopic.Properties props) {
384+
362385
EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
363-
EndpointHandlerMethod retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
364-
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(dltHandlerMethod) : retryBeanMethod;
386+
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(mainEndpoint, dltHandlerMethod)
387+
: getRetryEndpointHandlerMethod(mainEndpoint);
365388
}
366389

367390
private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfiguration config) {
@@ -383,7 +406,7 @@ protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfigur
383406
);
384407
}
385408

386-
protected EndpointCustomizer createEndpointCustomizer(
409+
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer(
387410
EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) {
388411

389412
return new EndpointCustomizerFactory(destinationTopicProperties,
@@ -393,8 +416,28 @@ protected EndpointCustomizer createEndpointCustomizer(
393416
.createEndpointCustomizer();
394417
}
395418

396-
private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandlerMethod dltEndpointHandlerMethod) {
397-
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
419+
private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
420+
@Nullable EndpointHandlerMethod dltEndpointHandlerMethod) {
421+
422+
EndpointHandlerMethod dltHandlerMethod = dltEndpointHandlerMethod != null
423+
? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
424+
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
425+
dltHandlerMethod = new EndpointHandlerMultiMethod(dltHandlerMethod.resolveBean(this.beanFactory),
426+
dltHandlerMethod.getMethod(), List.of(dltHandlerMethod.getMethod()));
427+
}
428+
return dltHandlerMethod;
429+
}
430+
431+
private EndpointHandlerMethod getRetryEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
432+
EndpointHandlerMethod retryBeanMethod;
433+
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multi) {
434+
retryBeanMethod = new EndpointHandlerMultiMethod(multi.getBean(), multi.getDefaultMethod(),
435+
multi.getMethods());
436+
}
437+
else {
438+
retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
439+
}
440+
return retryBeanMethod;
398441
}
399442

400443
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
@@ -419,12 +462,6 @@ private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpo
419462
return this.listenerContainerFactoryConfigurer.decorateFactory(resolvedFactory);
420463
}
421464

422-
private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
423-
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
424-
throw new IllegalArgumentException("Retry Topic is not compatible with " + MultiMethodKafkaListenerEndpoint.class);
425-
}
426-
}
427-
428465
public static EndpointHandlerMethod createHandlerMethodWith(Object beanOrClass, String methodName) {
429466
return new EndpointHandlerMethod(beanOrClass, methodName);
430467
}

0 commit comments

Comments
 (0)