Skip to content

Commit 438e896

Browse files
committed
* add @nullable at AbstractKafkaListenerEndpoint.setGroupId
* change `KafkaListenerEndpointDescriptor` type to `record` at `KafkaListenerEndpointRegistrar` * cleanup `KafkaListenerEndpointRegistry` * change `collect(Collectors.toList())` to `toList()` at `EndpointCustomizerFactory`. * cleanup `EndpointHandlerMethod`. * cleanup tests.
1 parent e3abc0e commit 438e896

9 files changed

+43
-65
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2023 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,6 +61,7 @@
6161
* @author Stephane Nicoll
6262
* @author Gary Russell
6363
* @author Artem Bilan
64+
* @author Wang Zhiyang
6465
*
6566
* @see MethodKafkaListenerEndpoint
6667
*/
@@ -176,7 +177,7 @@ public String getId() {
176177
* @param groupId the group id.
177178
* @since 1.3
178179
*/
179-
public void setGroupId(String groupId) {
180+
public void setGroupId(@Nullable String groupId) {
180181
this.groupId = groupId;
181182
}
182183

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2023 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
3939
* @author Artem Bilan
4040
* @author Gary Russell
4141
* @author Filip Halemba
42+
* @author Wang Zhiyang
4243
*
4344
* @see org.springframework.kafka.annotation.KafkaListenerConfigurer
4445
*/
@@ -256,19 +257,16 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint) {
256257
}
257258

258259

259-
private static final class KafkaListenerEndpointDescriptor {
260+
private record KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
261+
KafkaListenerContainerFactory<?> containerFactory) {
260262

261-
private final KafkaListenerEndpoint endpoint;
263+
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
264+
@Nullable KafkaListenerContainerFactory<?> containerFactory) {
262265

263-
private final KafkaListenerContainerFactory<?> containerFactory;
264-
265-
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
266-
@Nullable KafkaListenerContainerFactory<?> containerFactory) {
266+
this.endpoint = endpoint;
267+
this.containerFactory = containerFactory;
268+
}
267269

268-
this.endpoint = endpoint;
269-
this.containerFactory = containerFactory;
270270
}
271271

272-
}
273-
274272
}

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2023 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,7 @@
6666
* @author Artem Bilan
6767
* @author Gary Russell
6868
* @author Asi Bross
69+
* @author Wang Zhiyang
6970
*
7071
* @see KafkaListenerEndpoint
7172
* @see MessageListenerContainer
@@ -94,8 +95,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,
9495

9596
@Override
9697
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
97-
if (applicationContext instanceof ConfigurableApplicationContext) {
98-
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
98+
if (applicationContext instanceof ConfigurableApplicationContext cac) {
99+
this.applicationContext = cac;
99100
}
100101
}
101102

@@ -170,8 +171,7 @@ public Collection<MessageListenerContainer> getListenerContainers() {
170171
*/
171172
@Override
172173
public Collection<MessageListenerContainer> getAllListenerContainers() {
173-
List<MessageListenerContainer> containers = new ArrayList<>();
174-
containers.addAll(getListenerContainers());
174+
List<MessageListenerContainer> containers = new ArrayList<>(getListenerContainers());
175175
refreshContextContainers();
176176
containers.addAll(this.unregisteredContainers.values());
177177
return containers;
@@ -232,7 +232,7 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe
232232
group = appContext.getBean(groupName + ".group", ContainerGroup.class);
233233
}
234234
else {
235-
containerGroup = new ArrayList<MessageListenerContainer>();
235+
containerGroup = new ArrayList<>();
236236
appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText
237237
group = new ContainerGroup(groupName);
238238
appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
@@ -274,21 +274,19 @@ public MessageListenerContainer unregisterListenerContainer(String id) {
274274
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
275275
KafkaListenerContainerFactory<?> factory) {
276276

277-
if (endpoint instanceof MethodKafkaListenerEndpoint) {
278-
MethodKafkaListenerEndpoint<?, ?> mkle = (MethodKafkaListenerEndpoint<?, ?>) endpoint;
277+
if (endpoint instanceof MethodKafkaListenerEndpoint<?, ?> mkle) {
279278
Object bean = mkle.getBean();
280-
if (bean instanceof EndpointHandlerMethod) {
281-
EndpointHandlerMethod ehm = (EndpointHandlerMethod) bean;
279+
if (bean instanceof EndpointHandlerMethod ehm) {
282280
ehm = new EndpointHandlerMethod(ehm.resolveBean(this.applicationContext), ehm.getMethodName());
283281
mkle.setBean(ehm.resolveBean(this.applicationContext));
284282
mkle.setMethod(ehm.getMethod());
285283
}
286284
}
287285
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
288286

289-
if (listenerContainer instanceof InitializingBean) {
287+
if (listenerContainer instanceof InitializingBean initializingBean) {
290288
try {
291-
((InitializingBean) listenerContainer).afterPropertiesSet();
289+
initializingBean.afterPropertiesSet();
292290
}
293291
catch (Exception ex) {
294292
throw new BeanInitializationException("Failed to initialize message listener container", ex);
@@ -324,11 +322,6 @@ public int getPhase() {
324322
return this.phase;
325323
}
326324

327-
@Override
328-
public boolean isAutoStartup() {
329-
return true;
330-
}
331-
332325
@Override
333326
public void start() {
334327
for (MessageListenerContainer listenerContainer : getListenerContainers()) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
1919
import java.lang.reflect.Method;
2020
import java.util.Arrays;
2121
import java.util.Collection;
22-
import java.util.stream.Collectors;
2322
import java.util.stream.Stream;
2423

2524
import org.springframework.beans.factory.BeanFactory;
@@ -34,6 +33,8 @@
3433
*
3534
* @author Tomaz Fernandes
3635
* @author Gary Russell
36+
* @author Wang Zhiyang
37+
*
3738
* @since 2.7.2
3839
*
3940
* @see RetryTopicConfigurer
@@ -124,7 +125,7 @@ protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTo
124125
return getTopics(endpoint)
125126
.stream()
126127
.map(topic -> new EndpointCustomizer.TopicNamesHolder(topic, namesProvider.getTopicName(topic)))
127-
.collect(Collectors.toList());
128+
.toList();
128129
}
129130

130131
private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint) {
@@ -135,7 +136,7 @@ private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint)
135136
topics = Arrays.stream(topicPartitionsToAssign)
136137
.map(TopicPartitionOffset::getTopic)
137138
.distinct()
138-
.collect(Collectors.toList());
139+
.toList();
139140
}
140141
}
141142

spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,8 @@
3232
*
3333
* @author Tomaz Fernandes
3434
* @author Gary Russell
35+
* @author Wang Zhiyang
36+
*
3537
* @since 2.7
3638
*
3739
*/
@@ -66,15 +68,15 @@ public EndpointHandlerMethod(Object bean, Method method) {
6668
* @return the method.
6769
*/
6870
public Method getMethod() {
69-
if (this.beanOrClass instanceof Class) {
70-
return forClass((Class<?>) this.beanOrClass);
71+
if (this.beanOrClass instanceof Class<?> clazz) {
72+
return forClass(clazz);
7173
}
7274
Assert.state(this.bean != null, "Bean must be resolved before accessing its method");
7375
if (this.bean instanceof EndpointHandlerMethod) {
7476
try {
7577
return Object.class.getMethod("toString");
7678
}
77-
catch (NoSuchMethodException | SecurityException e) {
79+
catch (NoSuchMethodException | SecurityException ignored) {
7880
}
7981
}
8082
return forClass(this.bean.getClass());
@@ -91,13 +93,12 @@ public String getMethodName() {
9193
}
9294

9395
public Object resolveBean(BeanFactory beanFactory) {
94-
if (this.bean instanceof EndpointHandlerMethod) {
95-
return ((EndpointHandlerMethod) this.bean).beanOrClass;
96+
if (this.bean instanceof EndpointHandlerMethod endpointHandlerMethod) {
97+
return endpointHandlerMethod.beanOrClass;
9698
}
9799
if (this.bean == null) {
98100
try {
99-
if (this.beanOrClass instanceof Class) {
100-
Class<?> clazz = (Class<?>) this.beanOrClass;
101+
if (this.beanOrClass instanceof Class<?> clazz) {
101102
try {
102103
this.bean = beanFactory.getBean(clazz);
103104
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,6 +47,8 @@
4747
* @author Tomaz Fernandes
4848
* @author Gary Russell
4949
* @author Fabio da Silva Jr.
50+
* @author Wang Zhiyang
51+
*
5052
* @since 2.7
5153
*/
5254
@ExtendWith(MockitoExtension.class)
@@ -56,9 +58,7 @@ class RetryTopicConfigurationProviderTests {
5658

5759
{
5860
this.beanFactory = mock(ConfigurableListableBeanFactory.class);
59-
willAnswer(invoc -> {
60-
return invoc.getArgument(0);
61-
}).given(this.beanFactory).resolveEmbeddedValue(anyString());
61+
willAnswer(invoc -> invoc.getArgument(0)).given(this.beanFactory).resolveEmbeddedValue(anyString());
6262
}
6363

6464
private final String[] topics = {"topic1", "topic2"};
@@ -81,18 +81,12 @@ private Method getAnnotatedMethod(String methodName) {
8181
@Mock
8282
Object bean;
8383

84-
@Mock
85-
RetryableTopic annotation;
86-
8784
@Mock
8885
KafkaOperations<?, ?> kafkaOperations;
8986

9087
@Mock
9188
RetryTopicConfiguration retryTopicConfiguration;
9289

93-
@Mock
94-
RetryTopicConfiguration retryTopicConfiguration2;
95-
9690
@Test
9791
void shouldProvideFromAnnotation() {
9892

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,8 @@
5656
/**
5757
* @author Tomaz Fernandes
5858
* @author Gary Russell
59+
* @author Wang Zhiyang
60+
*
5961
* @since 2.9
6062
*/
6163
class RetryTopicConfigurationSupportTests {
@@ -185,7 +187,6 @@ void testCreateBackOffManager() {
185187
ContainerPartitionPausingBackOffManagerFactory.class);
186188
KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class);
187189
TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
188-
Clock clock = mock(Clock.class);
189190
ApplicationContext ctx = mock(ApplicationContext.class);
190191
given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory);
191192
given(factory.create()).willReturn(backoffManagerMock);

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -294,15 +294,6 @@ private void assertTopicNames(String retrySuffix, DestinationTopic.Properties de
294294
assertThat(retryTopicName.get(index + 1)).isEqualTo(secondTopicName);
295295
}
296296

297-
private void thenAssertEndpointProcessing(MethodKafkaListenerEndpoint<?, ?> endpoint) {
298-
then(endpoint).should(times(1)).setTopics(topics.toArray(new String[]{}));
299-
then(endpoint).should(times(1)).setId("testId");
300-
then(endpoint).should(times(1)).setGroup("testGroup");
301-
then(endpoint).should(times(1)).setGroupId("testGroupId");
302-
then(endpoint).should(times(1)).setClientIdPrefix("testClientPrefix");
303-
then(endpoint).should(times(1)).setBeanFactory(defaultListableBeanFactory);
304-
}
305-
306297
public void noOpsMethod() {
307298
// noOps
308299
}
@@ -349,7 +340,6 @@ void shouldInstantiateIfNotInContainer() {
349340

350341
@LogLevels(classes = RetryTopicConfigurer.class, level = "info")
351342
@Test
352-
@SuppressWarnings("deprecation")
353343
void shouldLogConsumerRecordMessage() {
354344
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
355345
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,6 @@ public RetryTopicConfiguration blockingAndTopic(KafkaTemplate<String, String> te
332332
}
333333

334334
@Bean
335-
@SuppressWarnings("deprecation")
336335
public RetryTopicConfiguration onlyTopic(KafkaTemplate<String, String> template) {
337336
return RetryTopicConfigurationBuilder
338337
.newInstance()

0 commit comments

Comments
 (0)