Skip to content

@RetryableTopic support KL annotated on class part 2 #3112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topi
Provides a new public API to find `RetryTopicConfiguration`.
See xref:retrytopic/retry-config.adoc#find-retry-topic-config[Find RetryTopicConfiguration]

=== RetryTopicConfigurer support process MultiMethodKafkaListenerEndpoint.
The `RetryTopicConfigurer` support process and register `MultiMethodKafkaListenerEndpoint`.
The `MultiMethodKafkaListenerEndpoint` provides `getter/setter` for properties `defaultMethod` and `methods`.
Modify the `EndpointCustomizer` that strictly for `MethodKafkaListenerEndpoint` types.
The `EndpointHandlerMethod` add new constructors construct an instance for the provided bean.
Provides new class `EndpointHandlerMultiMethod` to handler multi method for retrying endpoints.

[[x32-seek-offset-compute-fn]]
=== New API method to seek to an offset based on a user provided function
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,15 +35,16 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @see org.springframework.kafka.annotation.KafkaHandler
* @see DelegatingInvocableHandler
*/
public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerEndpoint<K, V> {

private final List<Method> methods;
private List<Method> methods;

private final Method defaultMethod;
private Method defaultMethod;

private Validator validator;

Expand All @@ -60,6 +61,43 @@ public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method d
setBean(bean);
}


/**
* Get a method list.
* @return the method list.
* @since 3.2
*/
public List<Method> getMethods() {
return this.methods;
}

/**
* Set a method list.
* @param methods the methods.
* @since 3.2
*/
public void setMethods(List<Method> methods) {
this.methods = methods;
}

/**
* Get a default method.
* @return the default method.
* @since 3.2
*/
public Method getDefaultMethod() {
return this.defaultMethod;
}

/**
* Set a default method.
* @param defaultMethod the default method.
* @since 3.2
*/
public void setDefaultMethod(Method defaultMethod) {
this.defaultMethod = defaultMethod;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding new public methods in a public class. This deserves a brief mention in whats-new.

/**
* Set a payload validator.
* @param validator the validator.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,21 +24,25 @@
* Customizes main, retry and DLT endpoints in the Retry Topic functionality
* and returns the resulting topic names.
*
* @param <T> the listener endpoint type.
*
* @author Tomaz Fernandes
* @author Wang Zhiyang
*
* @since 2.7.2
*
* @see EndpointCustomizerFactory
*
*/
@FunctionalInterface
public interface EndpointCustomizer {
public interface EndpointCustomizer<T extends MethodKafkaListenerEndpoint<?, ?>> {

/**
* Customize the endpoint and return the topic names generated for this endpoint.
* @param listenerEndpoint The main, retry or DLT endpoint to be customized.
* @return A collection containing the topic names generated for this endpoint.
*/
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint);
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(T listenerEndpoint);

class TopicNamesHolder {

Copy link
Contributor

@sobychacko sobychacko Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying the interface signature, which is part of the public API. Although we are just adding a strong type here and it may not affect any implementations, we may also need to mention this in whats-new. - the fact that EndpointCustomizer is strictly for MethodKafkaListenerEndpoint types.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import org.springframework.beans.factory.BeanFactory;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
import org.springframework.kafka.support.TopicPartitionOffset;

/**
Expand Down Expand Up @@ -63,41 +65,88 @@ public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperti
this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory;
}

public final EndpointCustomizer createEndpointCustomizer() {
return addSuffixesAndMethod(this.destinationProperties, this.beanMethod.resolveBean(this.beanFactory),
this.beanMethod.getMethod());
public final EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer() {
return addSuffixesAndMethod(this.destinationProperties);
}

protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, Method method) {
/**
* Create MethodKafkaListenerEndpoint's EndpointCustomizer, but not support MultiMethodKafkaListenerEndpoint.
* Replace by {@link #addSuffixesAndMethod(DestinationTopic.Properties)}
* @param properties the destination-topic's properties.
* @param bean the bean.
* @param method the method.
* @return the endpoint customizer.
*/
@Deprecated(since = "3.2", forRemoval = true)
@SuppressWarnings("rawtypes")
protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean,
Method method) {

RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider =
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties);
return endpoint -> {
Collection<EndpointCustomizer.TopicNamesHolder> topics =
customizeAndRegisterTopics(namesProvider, endpoint);
configurationEndpoint(endpoint, namesProvider, properties, bean);
endpoint.setMethod(method);
return topics;
};
}

/**
* Create MethodKafkaListenerEndpoint's EndpointCustomizer and support MultiMethodKafkaListenerEndpoint.
* @param properties the destination-topic's properties.
* @return the endpoint customizer.
* @since 3.2
*/
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> addSuffixesAndMethod(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public API modified. It may theoretically break any external implementations (although very unlikely that some might be implementing this, we need to be mindful of that possibility. Why don't we deprecate the original and add the modification separately?

DestinationTopic.Properties properties) {

RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider =
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties);
return endpoint -> {
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
endpoint.setId(namesProvider.getEndpointId(endpoint));
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
endpoint.getTopicPartitionsToAssign()));
Collection<EndpointCustomizer.TopicNamesHolder> topics =
customizeAndRegisterTopics(namesProvider, endpoint);
configurationEndpoint(endpoint, namesProvider, properties, this.beanMethod.resolveBean(this.beanFactory));
if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multiMethodEndpoint
&& this.beanMethod instanceof EndpointHandlerMultiMethod beanMultiMethod) {
multiMethodEndpoint.setDefaultMethod(beanMultiMethod.getDefaultMethod());
multiMethodEndpoint.setMethods(beanMultiMethod.getMethods());
}
else {
endpoint.setTopics(endpoint.getTopics().stream()
.map(namesProvider::getTopicName).toArray(String[]::new));
}
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
endpoint.setGroup(namesProvider.getGroup(endpoint));
endpoint.setBean(bean);
endpoint.setMethod(method);
Boolean autoStartDltHandler = properties.autoStartDltHandler();
if (autoStartDltHandler != null && properties.isDltTopic()) {
endpoint.setAutoStartup(autoStartDltHandler);
endpoint.setMethod(this.beanMethod.getMethod());
}
return topics;
};
}

private void configurationEndpoint(MethodKafkaListenerEndpoint<?, ?> endpoint,
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
DestinationTopic.Properties properties, Object bean) {

endpoint.setId(namesProvider.getEndpointId(endpoint));
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider,
endpoint.getTopicPartitionsToAssign()));
}
else {
endpoint.setTopics(endpoint.getTopics().stream()
.map(namesProvider::getTopicName).toArray(String[]::new));
}
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint));
endpoint.setGroup(namesProvider.getGroup(endpoint));
endpoint.setBean(bean);
Boolean autoStartDltHandler = properties.autoStartDltHandler();
if (autoStartDltHandler != null && properties.isDltTopic()) {
endpoint.setAutoStartup(autoStartDltHandler);
}
}

private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
TopicPartitionOffset[] topicPartitionOffsets) {

return Stream.of(topicPartitionOffsets)
.map(tpo -> properties.isMainEndpoint()
? getTPOForMainTopic(namesProvider, tpo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.lang.reflect.Method;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;

import org.apache.commons.logging.LogFactory;
Expand All @@ -36,6 +37,7 @@
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.EndpointHandlerMethod;
import org.springframework.kafka.support.EndpointHandlerMultiMethod;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.TopicForRetryable;
import org.springframework.lang.NonNull;
Expand Down Expand Up @@ -150,6 +152,19 @@
* // ... message processing
* }</code>
*</pre>
* <p> Since 3.2 , {@link org.springframework.kafka.annotation.RetryableTopic} annotation support
* {@link org.springframework.kafka.annotation.KafkaListener} annotated class, such as:
* <pre>
* <code>@RetryableTopic(attempts = 3,
* backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
* <code>@KafkaListener(topics = "my-annotated-topic")
* static class ListenerBean {
* <code> @KafkaHandler
* public void processMessage(MyPojo message) {
* // ... message processing
* }</code>
* }</code>
*</pre>
* <p> Or through meta-annotations, such as:
* <pre>
* <code>@RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))</code>
Expand Down Expand Up @@ -281,7 +296,7 @@ public void processMainAndRetryListeners(EndpointProcessor endpointProcessor,
KafkaListenerEndpointRegistrar registrar,
@Nullable KafkaListenerContainerFactory<?> factory,
String defaultContainerFactoryBeanName) {
throwIfMultiMethodEndpoint(mainEndpoint);

String id = mainEndpoint.getId();
if (id == null) {
id = "no.id.provided";
Expand All @@ -300,6 +315,7 @@ private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
RetryTopicConfiguration configuration,
DestinationTopicProcessor.Context context,
String defaultContainerFactoryBeanName) {

this.destinationTopicProcessor
.processDestinationTopicProperties(destinationTopicProperties ->
processAndRegisterEndpoint(mainEndpoint,
Expand Down Expand Up @@ -330,7 +346,13 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
endpoint = mainEndpoint;
}
else {
endpoint = new MethodKafkaListenerEndpoint<>();
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multi) {
endpoint = new MultiMethodKafkaListenerEndpoint<>(multi.getMethods(), multi.getDefaultMethod(),
multi.getBean());
}
else {
endpoint = new MethodKafkaListenerEndpoint<>();
}
endpoint.setId(mainEndpoint.getId());
endpoint.setMainListenerId(mainEndpoint.getId());
}
Expand All @@ -345,12 +367,12 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);

createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties)
.customizeEndpointAndCollectTopics(endpoint)
.forEach(topicNamesHolder ->
this.destinationTopicProcessor
.registerDestinationTopic(topicNamesHolder.getMainTopic(),
topicNamesHolder.getCustomizedTopic(),
destinationTopicProperties, context));
.customizeEndpointAndCollectTopics(endpoint)
.forEach(topicNamesHolder ->
this.destinationTopicProcessor
.registerDestinationTopic(topicNamesHolder.getMainTopic(),
topicNamesHolder.getCustomizedTopic(),
destinationTopicProperties, context));

registrar.registerEndpoint(endpoint, resolvedFactory);
endpoint.setBeanFactory(this.beanFactory);
Expand All @@ -359,9 +381,10 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
RetryTopicConfiguration configuration,
DestinationTopic.Properties props) {

EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
EndpointHandlerMethod retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(dltHandlerMethod) : retryBeanMethod;
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(mainEndpoint, dltHandlerMethod)
: getRetryEndpointHandlerMethod(mainEndpoint);
}

private Consumer<Collection<String>> getTopicCreationFunction(RetryTopicConfiguration config) {
Expand All @@ -383,7 +406,7 @@ protected void createNewTopicBeans(Collection<String> topics, RetryTopicConfigur
);
}

protected EndpointCustomizer createEndpointCustomizer(
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer(
EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) {

return new EndpointCustomizerFactory(destinationTopicProperties,
Expand All @@ -393,8 +416,28 @@ protected EndpointCustomizer createEndpointCustomizer(
.createEndpointCustomizer();
}

private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandlerMethod dltEndpointHandlerMethod) {
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
@Nullable EndpointHandlerMethod dltEndpointHandlerMethod) {

EndpointHandlerMethod dltHandlerMethod = dltEndpointHandlerMethod != null
? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
dltHandlerMethod = new EndpointHandlerMultiMethod(dltHandlerMethod.resolveBean(this.beanFactory),
dltHandlerMethod.getMethod(), List.of(dltHandlerMethod.getMethod()));
}
return dltHandlerMethod;
}

private EndpointHandlerMethod getRetryEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
EndpointHandlerMethod retryBeanMethod;
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multi) {
retryBeanMethod = new EndpointHandlerMultiMethod(multi.getBean(), multi.getDefaultMethod(),
multi.getMethods());
}
else {
retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
}
return retryBeanMethod;
}

private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
Expand All @@ -419,12 +462,6 @@ private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpo
return this.listenerContainerFactoryConfigurer.decorateFactory(resolvedFactory);
}

private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
if (mainEndpoint instanceof MultiMethodKafkaListenerEndpoint) {
throw new IllegalArgumentException("Retry Topic is not compatible with " + MultiMethodKafkaListenerEndpoint.class);
}
}

public static EndpointHandlerMethod createHandlerMethodWith(Object beanOrClass, String methodName) {
return new EndpointHandlerMethod(beanOrClass, methodName);
}
Expand Down
Loading