-
Notifications
You must be signed in to change notification settings - Fork 1.6k
@RetryableTopic support KL annotated on class part 2 #3112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2021 the original author or authors. | ||
* Copyright 2021-2024 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -24,21 +24,25 @@ | |
* Customizes main, retry and DLT endpoints in the Retry Topic functionality | ||
* and returns the resulting topic names. | ||
* | ||
* @param <T> the listener endpoint type. | ||
* | ||
* @author Tomaz Fernandes | ||
* @author Wang Zhiyang | ||
* | ||
* @since 2.7.2 | ||
* | ||
* @see EndpointCustomizerFactory | ||
* | ||
*/ | ||
@FunctionalInterface | ||
public interface EndpointCustomizer { | ||
public interface EndpointCustomizer<T extends MethodKafkaListenerEndpoint<?, ?>> { | ||
|
||
/** | ||
* Customize the endpoint and return the topic names generated for this endpoint. | ||
* @param listenerEndpoint The main, retry or DLT endpoint to be customized. | ||
* @return A collection containing the topic names generated for this endpoint. | ||
*/ | ||
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint); | ||
Collection<TopicNamesHolder> customizeEndpointAndCollectTopics(T listenerEndpoint); | ||
|
||
class TopicNamesHolder { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modifying the interface signature, which is part of the public API. Although we are just adding a strong type here and it may not affect any implementations, we may also need to mention this in |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,14 +16,15 @@ | |
|
||
package org.springframework.kafka.retrytopic; | ||
|
||
import java.lang.reflect.Method; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.stream.Stream; | ||
|
||
import org.springframework.beans.factory.BeanFactory; | ||
import org.springframework.kafka.config.MethodKafkaListenerEndpoint; | ||
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; | ||
import org.springframework.kafka.support.EndpointHandlerMethod; | ||
import org.springframework.kafka.support.EndpointHandlerMultiMethod; | ||
import org.springframework.kafka.support.TopicPartitionOffset; | ||
|
||
/** | ||
|
@@ -63,41 +64,58 @@ public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperti | |
this.retryTopicNamesProviderFactory = retryTopicNamesProviderFactory; | ||
} | ||
|
||
public final EndpointCustomizer createEndpointCustomizer() { | ||
return addSuffixesAndMethod(this.destinationProperties, this.beanMethod.resolveBean(this.beanFactory), | ||
this.beanMethod.getMethod()); | ||
public final EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer() { | ||
return addSuffixesAndMethod(this.destinationProperties); | ||
} | ||
|
||
protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties properties, Object bean, Method method) { | ||
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> addSuffixesAndMethod( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Public API modified. It may theoretically break any external implementations (although very unlikely that some might be implementing this, we need to be mindful of that possibility. Why don't we deprecate the original and add the modification separately? |
||
DestinationTopic.Properties properties) { | ||
|
||
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider = | ||
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties); | ||
return endpoint -> { | ||
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint); | ||
endpoint.setId(namesProvider.getEndpointId(endpoint)); | ||
endpoint.setGroupId(namesProvider.getGroupId(endpoint)); | ||
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { | ||
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, | ||
endpoint.getTopicPartitionsToAssign())); | ||
Collection<EndpointCustomizer.TopicNamesHolder> topics = | ||
customizeAndRegisterTopics(namesProvider, endpoint); | ||
configurationEndpoint(endpoint, namesProvider, properties, this.beanMethod.resolveBean(this.beanFactory)); | ||
if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> multiMethodEndpoint | ||
&& this.beanMethod instanceof EndpointHandlerMultiMethod beanMultiMethod) { | ||
multiMethodEndpoint.setDefaultMethod(beanMultiMethod.getDefaultMethod()); | ||
multiMethodEndpoint.setMethods(beanMultiMethod.getMethods()); | ||
} | ||
else { | ||
endpoint.setTopics(endpoint.getTopics().stream() | ||
.map(namesProvider::getTopicName).toArray(String[]::new)); | ||
} | ||
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint)); | ||
endpoint.setGroup(namesProvider.getGroup(endpoint)); | ||
endpoint.setBean(bean); | ||
endpoint.setMethod(method); | ||
Boolean autoStartDltHandler = properties.autoStartDltHandler(); | ||
if (autoStartDltHandler != null && properties.isDltTopic()) { | ||
endpoint.setAutoStartup(autoStartDltHandler); | ||
endpoint.setMethod(this.beanMethod.getMethod()); | ||
} | ||
return topics; | ||
}; | ||
} | ||
|
||
private void configurationEndpoint(MethodKafkaListenerEndpoint<?, ?> endpoint, | ||
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, | ||
DestinationTopic.Properties properties, Object bean) { | ||
|
||
endpoint.setId(namesProvider.getEndpointId(endpoint)); | ||
endpoint.setGroupId(namesProvider.getGroupId(endpoint)); | ||
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { | ||
endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, | ||
endpoint.getTopicPartitionsToAssign())); | ||
} | ||
else { | ||
endpoint.setTopics(endpoint.getTopics().stream() | ||
.map(namesProvider::getTopicName).toArray(String[]::new)); | ||
} | ||
endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint)); | ||
endpoint.setGroup(namesProvider.getGroup(endpoint)); | ||
endpoint.setBean(bean); | ||
Boolean autoStartDltHandler = properties.autoStartDltHandler(); | ||
if (autoStartDltHandler != null && properties.isDltTopic()) { | ||
endpoint.setAutoStartup(autoStartDltHandler); | ||
} | ||
} | ||
|
||
private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties, | ||
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, | ||
TopicPartitionOffset[] topicPartitionOffsets) { | ||
|
||
return Stream.of(topicPartitionOffsets) | ||
.map(tpo -> properties.isMainEndpoint() | ||
? getTPOForMainTopic(namesProvider, tpo) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,7 @@ public class EndpointHandlerMethod { | |
|
||
private final Object beanOrClass; | ||
|
||
private final String methodName; | ||
private String methodName; | ||
|
||
private Object bean; | ||
|
||
|
@@ -54,6 +54,12 @@ public EndpointHandlerMethod(Object beanOrClass, String methodName) { | |
this.methodName = methodName; | ||
} | ||
|
||
public EndpointHandlerMethod(Object bean) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. new public constructor that needs to be mentioned in docs (or |
||
Assert.notNull(bean, () -> "No bean for destination provided!"); | ||
this.bean = bean; | ||
this.beanOrClass = bean.getClass(); | ||
} | ||
|
||
public EndpointHandlerMethod(Object bean, Method method) { | ||
Assert.notNull(bean, () -> "No bean for destination provided!"); | ||
Assert.notNull(method, () -> "No method for destination bean class provided!"); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding new public methods in a public class. This deserves a brief mention in
whats-new
.