Skip to content

Commit 7585f2e

Browse files
committed
* add @nullable at AbstractKafkaListenerEndpoint.setGroupId
* change `KafkaListenerEndpointDescriptor` type to `record` at `KafkaListenerEndpointRegistrar` * cleanup `KafkaListenerEndpointRegistry` * change `collect(Collectors.toList())` to `toList()` at `EndpointCustomizerFactory`. * cleanup `EndpointHandlerMethod`. * cleanup tests.
1 parent 29f8d18 commit 7585f2e

9 files changed

+30
-58
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public String getId() {
176176
* @param groupId the group id.
177177
* @since 1.3
178178
*/
179-
public void setGroupId(String groupId) {
179+
public void setGroupId(@Nullable String groupId) {
180180
this.groupId = groupId;
181181
}
182182

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,19 +256,16 @@ public void registerEndpoint(KafkaListenerEndpoint endpoint) {
256256
}
257257

258258

259-
private static final class KafkaListenerEndpointDescriptor {
259+
private record KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
260+
KafkaListenerContainerFactory<?> containerFactory) {
260261

261-
private final KafkaListenerEndpoint endpoint;
262+
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
263+
@Nullable KafkaListenerContainerFactory<?> containerFactory) {
262264

263-
private final KafkaListenerContainerFactory<?> containerFactory;
264-
265-
private KafkaListenerEndpointDescriptor(KafkaListenerEndpoint endpoint,
266-
@Nullable KafkaListenerContainerFactory<?> containerFactory) {
265+
this.endpoint = endpoint;
266+
this.containerFactory = containerFactory;
267+
}
267268

268-
this.endpoint = endpoint;
269-
this.containerFactory = containerFactory;
270269
}
271270

272-
}
273-
274271
}

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
* @author Artem Bilan
6767
* @author Gary Russell
6868
* @author Asi Bross
69+
* @author Wang Zhiyang
6970
*
7071
* @see KafkaListenerEndpoint
7172
* @see MessageListenerContainer
@@ -94,8 +95,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,
9495

9596
@Override
9697
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
97-
if (applicationContext instanceof ConfigurableApplicationContext) {
98-
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
98+
if (applicationContext instanceof ConfigurableApplicationContext cac) {
99+
this.applicationContext = cac;
99100
}
100101
}
101102

@@ -170,8 +171,7 @@ public Collection<MessageListenerContainer> getListenerContainers() {
170171
*/
171172
@Override
172173
public Collection<MessageListenerContainer> getAllListenerContainers() {
173-
List<MessageListenerContainer> containers = new ArrayList<>();
174-
containers.addAll(getListenerContainers());
174+
List<MessageListenerContainer> containers = new ArrayList<>(getListenerContainers());
175175
refreshContextContainers();
176176
containers.addAll(this.unregisteredContainers.values());
177177
return containers;
@@ -232,7 +232,7 @@ public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListe
232232
group = appContext.getBean(groupName + ".group", ContainerGroup.class);
233233
}
234234
else {
235-
containerGroup = new ArrayList<MessageListenerContainer>();
235+
containerGroup = new ArrayList<>();
236236
appContext.getBeanFactory().registerSingleton(groupName, containerGroup); // NOSONAR - hasText
237237
group = new ContainerGroup(groupName);
238238
appContext.getBeanFactory().registerSingleton(groupName + ".group", group);
@@ -274,21 +274,19 @@ public MessageListenerContainer unregisterListenerContainer(String id) {
274274
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
275275
KafkaListenerContainerFactory<?> factory) {
276276

277-
if (endpoint instanceof MethodKafkaListenerEndpoint) {
278-
MethodKafkaListenerEndpoint<?, ?> mkle = (MethodKafkaListenerEndpoint<?, ?>) endpoint;
277+
if (endpoint instanceof MethodKafkaListenerEndpoint<?, ?> mkle) {
279278
Object bean = mkle.getBean();
280-
if (bean instanceof EndpointHandlerMethod) {
281-
EndpointHandlerMethod ehm = (EndpointHandlerMethod) bean;
279+
if (bean instanceof EndpointHandlerMethod ehm) {
282280
ehm = new EndpointHandlerMethod(ehm.resolveBean(this.applicationContext), ehm.getMethodName());
283281
mkle.setBean(ehm.resolveBean(this.applicationContext));
284282
mkle.setMethod(ehm.getMethod());
285283
}
286284
}
287285
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
288286

289-
if (listenerContainer instanceof InitializingBean) {
287+
if (listenerContainer instanceof InitializingBean initializingBean) {
290288
try {
291-
((InitializingBean) listenerContainer).afterPropertiesSet();
289+
initializingBean.afterPropertiesSet();
292290
}
293291
catch (Exception ex) {
294292
throw new BeanInitializationException("Failed to initialize message listener container", ex);
@@ -324,11 +322,6 @@ public int getPhase() {
324322
return this.phase;
325323
}
326324

327-
@Override
328-
public boolean isAutoStartup() {
329-
return true;
330-
}
331-
332325
@Override
333326
public void start() {
334327
for (MessageListenerContainer listenerContainer : getListenerContainers()) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.lang.reflect.Method;
2020
import java.util.Arrays;
2121
import java.util.Collection;
22-
import java.util.stream.Collectors;
2322
import java.util.stream.Stream;
2423

2524
import org.springframework.beans.factory.BeanFactory;
@@ -124,7 +123,7 @@ protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTo
124123
return getTopics(endpoint)
125124
.stream()
126125
.map(topic -> new EndpointCustomizer.TopicNamesHolder(topic, namesProvider.getTopicName(topic)))
127-
.collect(Collectors.toList());
126+
.toList();
128127
}
129128

130129
private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint) {
@@ -135,7 +134,7 @@ private Collection<String> getTopics(MethodKafkaListenerEndpoint<?, ?> endpoint)
135134
topics = Arrays.stream(topicPartitionsToAssign)
136135
.map(TopicPartitionOffset::getTopic)
137136
.distinct()
138-
.collect(Collectors.toList());
137+
.toList();
139138
}
140139
}
141140

spring-kafka/src/main/java/org/springframework/kafka/support/EndpointHandlerMethod.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
*
3333
* @author Tomaz Fernandes
3434
* @author Gary Russell
35+
* @author Wang Zhiyang
36+
*
3537
* @since 2.7
3638
*
3739
*/
@@ -66,15 +68,15 @@ public EndpointHandlerMethod(Object bean, Method method) {
6668
* @return the method.
6769
*/
6870
public Method getMethod() {
69-
if (this.beanOrClass instanceof Class) {
70-
return forClass((Class<?>) this.beanOrClass);
71+
if (this.beanOrClass instanceof Class<?> clazz) {
72+
return forClass(clazz);
7173
}
7274
Assert.state(this.bean != null, "Bean must be resolved before accessing its method");
7375
if (this.bean instanceof EndpointHandlerMethod) {
7476
try {
7577
return Object.class.getMethod("toString");
7678
}
77-
catch (NoSuchMethodException | SecurityException e) {
79+
catch (NoSuchMethodException | SecurityException ignored) {
7880
}
7981
}
8082
return forClass(this.bean.getClass());
@@ -91,13 +93,12 @@ public String getMethodName() {
9193
}
9294

9395
public Object resolveBean(BeanFactory beanFactory) {
94-
if (this.bean instanceof EndpointHandlerMethod) {
95-
return ((EndpointHandlerMethod) this.bean).beanOrClass;
96+
if (this.bean instanceof EndpointHandlerMethod endpointHandlerMethod) {
97+
return endpointHandlerMethod.beanOrClass;
9698
}
9799
if (this.bean == null) {
98100
try {
99-
if (this.beanOrClass instanceof Class) {
100-
Class<?> clazz = (Class<?>) this.beanOrClass;
101+
if (this.beanOrClass instanceof Class<?> clazz) {
101102
try {
102103
this.bean = beanFactory.getBean(clazz);
103104
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
* @author Tomaz Fernandes
4848
* @author Gary Russell
4949
* @author Fabio da Silva Jr.
50+
* @author Wang Zhiyang
51+
*
5052
* @since 2.7
5153
*/
5254
@ExtendWith(MockitoExtension.class)
@@ -56,9 +58,7 @@ class RetryTopicConfigurationProviderTests {
5658

5759
{
5860
this.beanFactory = mock(ConfigurableListableBeanFactory.class);
59-
willAnswer(invoc -> {
60-
return invoc.getArgument(0);
61-
}).given(this.beanFactory).resolveEmbeddedValue(anyString());
61+
willAnswer(invoc -> invoc.getArgument(0)).given(this.beanFactory).resolveEmbeddedValue(anyString());
6262
}
6363

6464
private final String[] topics = {"topic1", "topic2"};
@@ -81,18 +81,12 @@ private Method getAnnotatedMethod(String methodName) {
8181
@Mock
8282
Object bean;
8383

84-
@Mock
85-
RetryableTopic annotation;
86-
8784
@Mock
8885
KafkaOperations<?, ?> kafkaOperations;
8986

9087
@Mock
9188
RetryTopicConfiguration retryTopicConfiguration;
9289

93-
@Mock
94-
RetryTopicConfiguration retryTopicConfiguration2;
95-
9690
@Test
9791
void shouldProvideFromAnnotation() {
9892

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,6 @@ void testCreateBackOffManager() {
185185
ContainerPartitionPausingBackOffManagerFactory.class);
186186
KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class);
187187
TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
188-
Clock clock = mock(Clock.class);
189188
ApplicationContext ctx = mock(ApplicationContext.class);
190189
given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory);
191190
given(factory.create()).willReturn(backoffManagerMock);

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -294,15 +294,6 @@ private void assertTopicNames(String retrySuffix, DestinationTopic.Properties de
294294
assertThat(retryTopicName.get(index + 1)).isEqualTo(secondTopicName);
295295
}
296296

297-
private void thenAssertEndpointProcessing(MethodKafkaListenerEndpoint<?, ?> endpoint) {
298-
then(endpoint).should(times(1)).setTopics(topics.toArray(new String[]{}));
299-
then(endpoint).should(times(1)).setId("testId");
300-
then(endpoint).should(times(1)).setGroup("testGroup");
301-
then(endpoint).should(times(1)).setGroupId("testGroupId");
302-
then(endpoint).should(times(1)).setClientIdPrefix("testClientPrefix");
303-
then(endpoint).should(times(1)).setBeanFactory(defaultListableBeanFactory);
304-
}
305-
306297
public void noOpsMethod() {
307298
// noOps
308299
}
@@ -349,7 +340,6 @@ void shouldInstantiateIfNotInContainer() {
349340

350341
@LogLevels(classes = RetryTopicConfigurer.class, level = "info")
351342
@Test
352-
@SuppressWarnings("deprecation")
353343
void shouldLogConsumerRecordMessage() {
354344
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
355345
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicExceptionRoutingIntegrationTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,6 @@ public RetryTopicConfiguration blockingAndTopic(KafkaTemplate<String, String> te
332332
}
333333

334334
@Bean
335-
@SuppressWarnings("deprecation")
336335
public RetryTopicConfiguration onlyTopic(KafkaTemplate<String, String> template) {
337336
return RetryTopicConfigurationBuilder
338337
.newInstance()

0 commit comments

Comments
 (0)