Skip to content

Commit 9cb5763

Browse files
committed
Add property to set changeConsumerThreadName for Kafka
Closes gh-36343
1 parent 49ae8c0 commit 9cb5763

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Ob
199199
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
200200
map.from(this.batchInterceptor).to(factory::setBatchInterceptor);
201201
map.from(this.threadNameSupplier).to(factory::setThreadNameSupplier);
202+
map.from(properties::getChangeConsumerThreadName).to(factory::setChangeConsumerThreadName);
202203
}
203204

204205
private void configureContainer(ContainerProperties container) {

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,12 @@ public enum Type {
10431043
*/
10441044
private boolean autoStartup = true;
10451045

1046+
/**
1047+
* Whether to instruct the container to change the consumer thread name during
1048+
* initialization.
1049+
*/
1050+
private Boolean changeConsumerThreadName;
1051+
10461052
public Type getType() {
10471053
return this.type;
10481054
}
@@ -1179,6 +1185,14 @@ public void setAutoStartup(boolean autoStartup) {
11791185
this.autoStartup = autoStartup;
11801186
}
11811187

1188+
public Boolean getChangeConsumerThreadName() {
1189+
return this.changeConsumerThreadName;
1190+
}
1191+
1192+
public void setChangeConsumerThreadName(Boolean changeConsumerThreadName) {
1193+
this.changeConsumerThreadName = changeConsumerThreadName;
1194+
}
1195+
11821196
}
11831197

11841198
public static class Ssl {

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,14 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests {
4242

4343
private ConsumerFactory<Object, Object> consumerFactory;
4444

45+
private KafkaProperties properties;
46+
4547
@BeforeEach
4648
@SuppressWarnings("unchecked")
4749
void setUp() {
4850
this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
49-
this.configurer.setKafkaProperties(new KafkaProperties());
51+
this.properties = new KafkaProperties();
52+
this.configurer.setKafkaProperties(this.properties);
5053
this.factory = spy(new ConcurrentKafkaListenerContainerFactory<>());
5154
this.consumerFactory = mock(ConsumerFactory.class);
5255

@@ -60,4 +63,11 @@ void shouldApplyThreadNameSupplier() {
6063
then(this.factory).should().setThreadNameSupplier(function);
6164
}
6265

66+
@Test
67+
void shouldApplyChangeConsumerThreadName() {
68+
this.properties.getListener().setChangeConsumerThreadName(true);
69+
this.configurer.configure(this.factory, this.consumerFactory);
70+
then(this.factory).should().setChangeConsumerThreadName(true);
71+
}
72+
6373
}

0 commit comments

Comments
 (0)