diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index cdc8ca944a..608ffd84d7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,6 +61,7 @@ * @author Stephane Nicoll * @author Gary Russell * @author Artem Bilan + * @author Wang Zhiyang * * @see MethodKafkaListenerEndpoint */ @@ -176,7 +177,7 @@ public String getId() { * @param groupId the group id. * @since 1.3 */ - public void setGroupId(String groupId) { + public void setGroupId(@Nullable String groupId) { this.groupId = groupId; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java index a0ac942b08..7e1e8ac8ab 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ * @author Artem Bilan * @author Gary Russell * @author Filip Halemba + * @author Wang Zhiyang * * @see org.springframework.kafka.annotation.KafkaListenerConfigurer */ @@ -256,19 +257,16 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint) { } - private static final class KafkaListenerEndpointDescriptor { + private record KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, + KafkaListenerContainerFactory containerFactory) { - private final KafkaListenerEndpoint endpoint; + private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, + @Nullable KafkaListenerContainerFactory containerFactory) { - private final KafkaListenerContainerFactory containerFactory; - - private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint, - @Nullable KafkaListenerContainerFactory containerFactory) { + this.endpoint = endpoint; + this.containerFactory = containerFactory; + } - this.endpoint = endpoint; - this.containerFactory = containerFactory; } - } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index 10841688cc..a788583c57 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,6 +66,7 @@ * @author Artem Bilan * @author Gary Russell * @author Asi Bross + * @author Wang Zhiyang * * @see KafkaListenerEndpoint * @see MessageListenerContainer @@ -94,8 +95,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry, @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - if (applicationContext instanceof ConfigurableApplicationContext) { - this.applicationContext = (ConfigurableApplicationContext) applicationContext; + if (applicationContext instanceof ConfigurableApplicationContext cac) { + this.applicationContext = cac; } } @@ -170,8 +171,7 @@ public Collection getListenerContainers() { */ @Override public Collection getAllListenerContainers() { - List containers = new ArrayList<>(); - containers.addAll(getListenerContainers()); + List containers = new ArrayList<>(getListenerContainers()); refreshContextContainers(); containers.addAll(this.unregisteredContainers.values()); return containers; @@ -232,7 +232,7 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe group = appContext.getBean(groupName + ".group", ContainerGroup.class); } else { - containerGroup = new ArrayList(); + containerGroup = new ArrayList<>(); appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText group = new ContainerGroup(groupName); appContext.getBeanFactory().registerSingleton(groupName + ".group", group); @@ -274,11 +274,9 @@ public MessageListenerContainer unregisterListenerContainer(String id) { protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory) { - if (endpoint instanceof MethodKafkaListenerEndpoint) { - MethodKafkaListenerEndpoint mkle = (MethodKafkaListenerEndpoint) endpoint; + if (endpoint instanceof MethodKafkaListenerEndpoint mkle) { Object bean = mkle.getBean(); - if (bean instanceof EndpointHandlerMethod) { - EndpointHandlerMethod ehm = (EndpointHandlerMethod) bean; + if (bean instanceof EndpointHandlerMethod ehm) { ehm = new EndpointHandlerMethod(ehm.resolveBean(this.applicationContext), ehm.getMethodName()); mkle.setBean(ehm.resolveBean(this.applicationContext)); mkle.setMethod(ehm.getMethod()); @@ -286,9 +284,9 @@ protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint } MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); - if (listenerContainer instanceof InitializingBean) { + if (listenerContainer instanceof InitializingBean initializingBean) { try { - ((InitializingBean) listenerContainer).afterPropertiesSet(); + initializingBean.afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); @@ -324,11 +322,6 @@ public int getPhase() { return this.phase; } - @Override - public boolean isAutoStartup() { - return true; - } - @Override public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java index 4481ef5ca7..962128efec 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collection; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.springframework.beans.factory.BeanFactory; @@ -34,6 +33,8 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.7.2 * * @see RetryTopicConfigurer @@ -124,7 +125,7 @@ protected Collection customizeAndRegisterTo return getTopics(endpoint) .stream() .map(topic -> new EndpointCustomizer.TopicNamesHolder(topic, namesProvider.getTopicName(topic))) - .collect(Collectors.toList()); + .toList(); } private Collection getTopics(MethodKafkaListenerEndpoint endpoint) { @@ -135,7 +136,7 @@ private Collection getTopics(MethodKafkaListenerEndpoint endpoint) topics = Arrays.stream(topicPartitionsToAssign) .map(TopicPartitionOffset::getTopic) .distinct() - .collect(Collectors.toList()); + .toList(); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java b/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java index 9d86bce5fd..f1d1968b0d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,8 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.7 * */ @@ -66,15 +68,15 @@ public EndpointHandlerMethod(Object bean, Method method) { * @return the method. */ public Method getMethod() { - if (this.beanOrClass instanceof Class) { - return forClass((Class) this.beanOrClass); + if (this.beanOrClass instanceof Class clazz) { + return forClass(clazz); } Assert.state(this.bean != null, "Bean must be resolved before accessing its method"); if (this.bean instanceof EndpointHandlerMethod) { try { return Object.class.getMethod("toString"); } - catch (NoSuchMethodException | SecurityException e) { + catch (NoSuchMethodException | SecurityException ignored) { } } return forClass(this.bean.getClass()); @@ -91,13 +93,12 @@ public String getMethodName() { } public Object resolveBean(BeanFactory beanFactory) { - if (this.bean instanceof EndpointHandlerMethod) { - return ((EndpointHandlerMethod) this.bean).beanOrClass; + if (this.bean instanceof EndpointHandlerMethod endpointHandlerMethod) { + return endpointHandlerMethod.beanOrClass; } if (this.bean == null) { try { - if (this.beanOrClass instanceof Class) { - Class clazz = (Class) this.beanOrClass; + if (this.beanOrClass instanceof Class clazz) { try { this.bean = beanFactory.getBean(clazz); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java index 8d60c46e8d..798946427f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +47,8 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Fabio da Silva Jr. + * @author Wang Zhiyang + * * @since 2.7 */ @ExtendWith(MockitoExtension.class) @@ -56,9 +58,7 @@ class RetryTopicConfigurationProviderTests { { this.beanFactory = mock(ConfigurableListableBeanFactory.class); - willAnswer(invoc -> { - return invoc.getArgument(0); - }).given(this.beanFactory).resolveEmbeddedValue(anyString()); + willAnswer(invoc -> invoc.getArgument(0)).given(this.beanFactory).resolveEmbeddedValue(anyString()); } private final String[] topics = {"topic1", "topic2"}; @@ -81,18 +81,12 @@ private Method getAnnotatedMethod(String methodName) { @Mock Object bean; - @Mock - RetryableTopic annotation; - @Mock KafkaOperations kafkaOperations; @Mock RetryTopicConfiguration retryTopicConfiguration; - @Mock - RetryTopicConfiguration retryTopicConfiguration2; - @Test void shouldProvideFromAnnotation() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java index cbfe355bbe..3928408aed 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2023 the original author or authors. + * Copyright 2021-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,8 @@ /** * @author Tomaz Fernandes * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.9 */ class RetryTopicConfigurationSupportTests { @@ -185,7 +187,6 @@ void testCreateBackOffManager() { ContainerPartitionPausingBackOffManagerFactory.class); KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class); TaskScheduler taskSchedulerMock = mock(TaskScheduler.class); - Clock clock = mock(Clock.class); ApplicationContext ctx = mock(ApplicationContext.class); given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory); given(factory.create()).willReturn(backoffManagerMock); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java index a481a82069..b4fdae85bf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java @@ -294,15 +294,6 @@ private void assertTopicNames(String retrySuffix, DestinationTopic.Properties de assertThat(retryTopicName.get(index + 1)).isEqualTo(secondTopicName); } - private void thenAssertEndpointProcessing(MethodKafkaListenerEndpoint endpoint) { - then(endpoint).should(times(1)).setTopics(topics.toArray(new String[]{})); - then(endpoint).should(times(1)).setId("testId"); - then(endpoint).should(times(1)).setGroup("testGroup"); - then(endpoint).should(times(1)).setGroupId("testGroupId"); - then(endpoint).should(times(1)).setClientIdPrefix("testClientPrefix"); - then(endpoint).should(times(1)).setBeanFactory(defaultListableBeanFactory); - } - public void noOpsMethod() { // noOps } @@ -349,7 +340,6 @@ void shouldInstantiateIfNotInContainer() { @LogLevels(classes = RetryTopicConfigurer.class, level = "info") @Test - @SuppressWarnings("deprecation") void shouldLogConsumerRecordMessage() { RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java index a515aa6978..8366936b2f 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java @@ -332,7 +332,6 @@ public RetryTopicConfiguration blockingAndTopic(KafkaTemplate te } @Bean - @SuppressWarnings("deprecation") public RetryTopicConfiguration onlyTopic(KafkaTemplate template) { return RetryTopicConfigurationBuilder .newInstance()