Skip to content

polish retryable topic. #3104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,7 @@
* @author Stephane Nicoll
* @author Gary Russell
* @author Artem Bilan
* @author Wang Zhiyang
*
* @see MethodKafkaListenerEndpoint
*/
Expand Down Expand Up @@ -176,7 +177,7 @@ public String getId() {
* @param groupId the group id.
* @since 1.3
*/
public void setGroupId(String groupId) {
public void setGroupId(@Nullable String groupId) {
this.groupId = groupId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,6 +39,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Filip Halemba
* @author Wang Zhiyang
*
* @see org.springframework.kafka.annotation.KafkaListenerConfigurer
*/
Expand Down Expand Up @@ -256,19 +257,16 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint) {
}


private static final class KafkaListenerEndpointDescriptor {
private record KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> containerFactory) {

private final KafkaListenerEndpoint endpoint;
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
@Nullable KafkaListenerContainerFactory<?> containerFactory) {

private final KafkaListenerContainerFactory<?> containerFactory;

private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
@Nullable KafkaListenerContainerFactory<?> containerFactory) {
this.endpoint = endpoint;
this.containerFactory = containerFactory;
}

this.endpoint = endpoint;
this.containerFactory = containerFactory;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author Asi Bross
* @author Wang Zhiyang
*
* @see KafkaListenerEndpoint
* @see MessageListenerContainer
Expand Down Expand Up @@ -94,8 +95,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (applicationContext instanceof ConfigurableApplicationContext) {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
if (applicationContext instanceof ConfigurableApplicationContext cac) {
this.applicationContext = cac;
}
}

Expand Down Expand Up @@ -170,8 +171,7 @@ public Collection<MessageListenerContainer> getListenerContainers() {
*/
@Override
public Collection<MessageListenerContainer> getAllListenerContainers() {
List<MessageListenerContainer> containers = new ArrayList<>();
containers.addAll(getListenerContainers());
List<MessageListenerContainer> containers = new ArrayList<>(getListenerContainers());
refreshContextContainers();
containers.addAll(this.unregisteredContainers.values());
return containers;
Expand Down Expand Up @@ -232,7 +232,7 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe
group = appContext.getBean(groupName + ".group", ContainerGroup.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
containerGroup = new ArrayList<>();
appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText
group = new ContainerGroup(groupName);
appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
Expand Down Expand Up @@ -274,21 +274,19 @@ public MessageListenerContainer unregisterListenerContainer(String id) {
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {

if (endpoint instanceof MethodKafkaListenerEndpoint) {
MethodKafkaListenerEndpoint<?, ?> mkle = (MethodKafkaListenerEndpoint<?, ?>) endpoint;
if (endpoint instanceof MethodKafkaListenerEndpoint<?, ?> mkle) {
Object bean = mkle.getBean();
if (bean instanceof EndpointHandlerMethod) {
EndpointHandlerMethod ehm = (EndpointHandlerMethod) bean;
if (bean instanceof EndpointHandlerMethod ehm) {
ehm = new EndpointHandlerMethod(ehm.resolveBean(this.applicationContext), ehm.getMethodName());
mkle.setBean(ehm.resolveBean(this.applicationContext));
mkle.setMethod(ehm.getMethod());
}
}
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

if (listenerContainer instanceof InitializingBean) {
if (listenerContainer instanceof InitializingBean initializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
initializingBean.afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
Expand Down Expand Up @@ -324,11 +322,6 @@ public int getPhase() {
return this.phase;
}

@Override
public boolean isAutoStartup() {
return true;
}

@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.springframework.beans.factory.BeanFactory;
Expand All @@ -34,6 +33,8 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.7.2
*
* @see RetryTopicConfigurer
Expand Down Expand Up @@ -124,7 +125,7 @@ protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTo
return getTopics(endpoint)
.stream()
.map(topic -> new EndpointCustomizer.TopicNamesHolder(topic, namesProvider.getTopicName(topic)))
.collect(Collectors.toList());
.toList();
}

private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint) {
Expand All @@ -135,7 +136,7 @@ private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint)
topics = Arrays.stream(topicPartitionsToAssign)
.map(TopicPartitionOffset::getTopic)
.distinct()
.collect(Collectors.toList());
.toList();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,8 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.7
*
*/
Expand Down Expand Up @@ -66,15 +68,15 @@ public EndpointHandlerMethod(Object bean, Method method) {
* @return the method.
*/
public Method getMethod() {
if (this.beanOrClass instanceof Class) {
return forClass((Class<?>) this.beanOrClass);
if (this.beanOrClass instanceof Class<?> clazz) {
return forClass(clazz);
}
Assert.state(this.bean != null, "Bean must be resolved before accessing its method");
if (this.bean instanceof EndpointHandlerMethod) {
try {
return Object.class.getMethod("toString");
}
catch (NoSuchMethodException | SecurityException e) {
catch (NoSuchMethodException | SecurityException ignored) {
}
}
return forClass(this.bean.getClass());
Expand All @@ -91,13 +93,12 @@ public String getMethodName() {
}

public Object resolveBean(BeanFactory beanFactory) {
if (this.bean instanceof EndpointHandlerMethod) {
return ((EndpointHandlerMethod) this.bean).beanOrClass;
if (this.bean instanceof EndpointHandlerMethod endpointHandlerMethod) {
return endpointHandlerMethod.beanOrClass;
}
if (this.bean == null) {
try {
if (this.beanOrClass instanceof Class) {
Class<?> clazz = (Class<?>) this.beanOrClass;
if (this.beanOrClass instanceof Class<?> clazz) {
try {
this.bean = beanFactory.getBean(clazz);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,6 +47,8 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Fabio da Silva Jr.
* @author Wang Zhiyang
*
* @since 2.7
*/
@ExtendWith(MockitoExtension.class)
Expand All @@ -56,9 +58,7 @@ class RetryTopicConfigurationProviderTests {

{
this.beanFactory = mock(ConfigurableListableBeanFactory.class);
willAnswer(invoc -> {
return invoc.getArgument(0);
}).given(this.beanFactory).resolveEmbeddedValue(anyString());
willAnswer(invoc -> invoc.getArgument(0)).given(this.beanFactory).resolveEmbeddedValue(anyString());
}

private final String[] topics = {"topic1", "topic2"};
Expand All @@ -81,18 +81,12 @@ private Method getAnnotatedMethod(String methodName) {
@Mock
Object bean;

@Mock
RetryableTopic annotation;

@Mock
KafkaOperations<?, ?> kafkaOperations;

@Mock
RetryTopicConfiguration retryTopicConfiguration;

@Mock
RetryTopicConfiguration retryTopicConfiguration2;

@Test
void shouldProvideFromAnnotation() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,6 +56,8 @@
/**
* @author Tomaz Fernandes
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.9
*/
class RetryTopicConfigurationSupportTests {
Expand Down Expand Up @@ -185,7 +187,6 @@ void testCreateBackOffManager() {
ContainerPartitionPausingBackOffManagerFactory.class);
KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class);
TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
Clock clock = mock(Clock.class);
ApplicationContext ctx = mock(ApplicationContext.class);
given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory);
given(factory.create()).willReturn(backoffManagerMock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,6 @@ private void assertTopicNames(String retrySuffix, DestinationTopic.Properties de
assertThat(retryTopicName.get(index + 1)).isEqualTo(secondTopicName);
}

private void thenAssertEndpointProcessing(MethodKafkaListenerEndpoint<?, ?> endpoint) {
then(endpoint).should(times(1)).setTopics(topics.toArray(new String[]{}));
then(endpoint).should(times(1)).setId("testId");
then(endpoint).should(times(1)).setGroup("testGroup");
then(endpoint).should(times(1)).setGroupId("testGroupId");
then(endpoint).should(times(1)).setClientIdPrefix("testClientPrefix");
then(endpoint).should(times(1)).setBeanFactory(defaultListableBeanFactory);
}

public void noOpsMethod() {
// noOps
}
Expand Down Expand Up @@ -349,7 +340,6 @@ void shouldInstantiateIfNotInContainer() {

@LogLevels(classes = RetryTopicConfigurer.class, level = "info")
@Test
@SuppressWarnings("deprecation")
void shouldLogConsumerRecordMessage() {
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ public RetryTopicConfiguration blockingAndTopic(KafkaTemplate<String, String> te
}

@Bean
@SuppressWarnings("deprecation")
public RetryTopicConfiguration onlyTopic(KafkaTemplate<String, String> template) {
return RetryTopicConfigurationBuilder
.newInstance()
Expand Down