-
Notifications
You must be signed in to change notification settings - Fork 1.6k
@RetryableTopic support KL annotated on class part 2 #3112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2016-2023 the original author or authors. | ||
* Copyright 2016-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -35,15 +35,16 @@ | |
* @param <V> the value type. | ||
* | ||
* @author Gary Russell | ||
* @author Wang Zhiyang | ||
* | ||
* @see org.springframework.kafka.annotation.KafkaHandler | ||
* @see DelegatingInvocableHandler | ||
*/ | ||
public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerEndpoint<K, V> { | ||
|
||
private final List<Method> methods; | ||
private List<Method> methods; | ||
|
||
private final Method defaultMethod; | ||
private Method defaultMethod; | ||
|
||
private Validator validator; | ||
|
||
|
@@ -60,6 +61,43 @@ public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method d | |
setBean(bean); | ||
} | ||
|
||
|
||
/** | ||
* Get a method list. | ||
* @return the method list. | ||
* @since 3.2 | ||
*/ | ||
public List<Method> getMethods() { | ||
return this.methods; | ||
} | ||
|
||
/** | ||
* Set a method list. | ||
* @param methods the methods. | ||
* @since 3.2 | ||
*/ | ||
public void setMethods(List<Method> methods) { | ||
this.methods = methods; | ||
} | ||
|
||
/** | ||
* Get a default method. | ||
* @return the default method. | ||
* @since 3.2 | ||
*/ | ||
public Method getDefaultMethod() { | ||
return this.defaultMethod; | ||
} | ||
|
||
/** | ||
* Set a default method. | ||
* @param defaultMethod the default method. | ||
* @since 3.2 | ||
*/ | ||
public void setDefaultMethod(Method defaultMethod) { | ||
this.defaultMethod = defaultMethod; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding new public methods in a public class. This deserves a brief mention in |
||
/** | ||
* Set a payload validator. | ||
* @param validator the validator. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2021 the original author or authors. | ||
* Copyright 2021-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -24,21 +24,25 @@ | |
* Customizes main, retry and DLT endpoints in the Retry Topic functionality | ||
* and returns the resulting topic names. | ||
* | ||
* @param <T> the listener endpoint type. | ||
* | ||
* @author Tomaz Fernandes | ||
* @author Wang Zhiyang | ||
* | ||
* @since 2.7.2 | ||
* | ||
* @see EndpointCustomizerFactory | ||
* | ||
*/ | ||
@FunctionalInterface | ||
public interface EndpointCustomizer { | ||
public interface EndpointCustomizer<T extends MethodKafkaListenerEndpoint<?, ?>> { | ||
|
||
/** | ||
* Customize the endpoint and return the topic names generated for this endpoint. | ||
* @param listenerEndpoint The main, retry or DLT endpoint to be customized. | ||
* @return A collection containing the topic names generated for this endpoint. | ||
*/ | ||
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint); | ||
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(T listenerEndpoint); | ||
|
||
class TopicNamesHolder { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modifying the interface signature, which is part of the public API. Although we are just adding a strong type here and it may not affect any implementations, we may also need to mention this in |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,9 @@ | |
|
||
import org.springframework.beans.factory.BeanFactory; | ||
import org.springframework.kafka.config.MethodKafkaListenerEndpoint; | ||
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; | ||
import org.springframework.kafka.support.EndpointHandlerMethod; | ||
import org.springframework.kafka.support.EndpointHandlerMultiMethod; | ||
import org.springframework.kafka.support.TopicPartitionOffset; | ||
|
||
/** | ||
|
@@ -63,41 +65,88 @@ public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperti | |
this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory; | ||
} | ||
|
||
public final EndpointCustomizer createEndpointCustomizer() { | ||
return addSuffixesAndMethod(this.destinationProperties, this.beanMethod.resolveBean(this.beanFactory), | ||
this.beanMethod.getMethod()); | ||
public final EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer() { | ||
return addSuffixesAndMethod(this.destinationProperties); | ||
} | ||
|
||
protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, Method method) { | ||
/** | ||
* Create MethodKafkaListenerEndpoint's EndpointCustomizer, but not support MultiMethodKafkaListenerEndpoint. | ||
* Replace by {@link #addSuffixesAndMethod(DestinationTopic.Properties)} | ||
* @param properties the destination-topic's properties. | ||
* @param bean the bean. | ||
* @param method the method. | ||
* @return the endpoint customizer. | ||
*/ | ||
@Deprecated(since = "3.2", forRemoval = true) | ||
@SuppressWarnings("rawtypes") | ||
protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, | ||
Method method) { | ||
|
||
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider = | ||
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties); | ||
return endpoint -> { | ||
Collection<EndpointCustomizer.TopicNamesHolder> topics = | ||
customizeAndRegisterTopics(namesProvider, endpoint); | ||
configurationEndpoint(endpoint, namesProvider, properties, bean); | ||
endpoint.setMethod(method); | ||
return topics; | ||
}; | ||
} | ||
|
||
/** | ||
* Create MethodKafkaListenerEndpoint's EndpointCustomizer and support MultiMethodKafkaListenerEndpoint. | ||
* @param properties the destination-topic's properties. | ||
* @return the endpoint customizer. | ||
* @since 3.2 | ||
*/ | ||
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> addSuffixesAndMethod( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Public API modified. It may theoretically break any external implementations (although very unlikely that some might be implementing this, we need to be mindful of that possibility. Why don't we deprecate the original and add the modification separately? |
||
DestinationTopic.Properties properties) { | ||
|
||
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider = | ||
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties); | ||
return endpoint -> { | ||
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint); | ||
endpoint.setId(namesProvider.getEndpointId(endpoint)); | ||
endpoint.setGroupId(namesProvider.getGroupId(endpoint)); | ||
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { | ||
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, | ||
endpoint.getTopicPartitionsToAssign())); | ||
Collection<EndpointCustomizer.TopicNamesHolder> topics = | ||
customizeAndRegisterTopics(namesProvider, endpoint); | ||
configurationEndpoint(endpoint, namesProvider, properties, this.beanMethod.resolveBean(this.beanFactory)); | ||
if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multiMethodEndpoint | ||
&& this.beanMethod instanceof EndpointHandlerMultiMethod beanMultiMethod) { | ||
multiMethodEndpoint.setDefaultMethod(beanMultiMethod.getDefaultMethod()); | ||
multiMethodEndpoint.setMethods(beanMultiMethod.getMethods()); | ||
} | ||
else { | ||
endpoint.setTopics(endpoint.getTopics().stream() | ||
.map(namesProvider::getTopicName).toArray(String[]::new)); | ||
} | ||
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint)); | ||
endpoint.setGroup(namesProvider.getGroup(endpoint)); | ||
endpoint.setBean(bean); | ||
endpoint.setMethod(method); | ||
Boolean autoStartDltHandler = properties.autoStartDltHandler(); | ||
if (autoStartDltHandler != null && properties.isDltTopic()) { | ||
endpoint.setAutoStartup(autoStartDltHandler); | ||
endpoint.setMethod(this.beanMethod.getMethod()); | ||
} | ||
return topics; | ||
}; | ||
} | ||
|
||
private void configurationEndpoint(MethodKafkaListenerEndpoint<?, ?> endpoint, | ||
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, | ||
DestinationTopic.Properties properties, Object bean) { | ||
|
||
endpoint.setId(namesProvider.getEndpointId(endpoint)); | ||
endpoint.setGroupId(namesProvider.getGroupId(endpoint)); | ||
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { | ||
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, | ||
endpoint.getTopicPartitionsToAssign())); | ||
} | ||
else { | ||
endpoint.setTopics(endpoint.getTopics().stream() | ||
.map(namesProvider::getTopicName).toArray(String[]::new)); | ||
} | ||
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint)); | ||
endpoint.setGroup(namesProvider.getGroup(endpoint)); | ||
endpoint.setBean(bean); | ||
Boolean autoStartDltHandler = properties.autoStartDltHandler(); | ||
if (autoStartDltHandler != null && properties.isDltTopic()) { | ||
endpoint.setAutoStartup(autoStartDltHandler); | ||
} | ||
} | ||
|
||
private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties, | ||
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, | ||
TopicPartitionOffset[] topicPartitionOffsets) { | ||
|
||
return Stream.of(topicPartitionOffsets) | ||
.map(tpo -> properties.isMainEndpoint() | ||
? getTPOForMainTopic(namesProvider, tpo) | ||
|
Uh oh!
There was an error while loading. Please reload this page.