Skip to content

Commit 4df9ad1

Browse files
committed
spring-projectsGH-3001: default clientIds with application name
Fixes: #spring-projectsGH-3001 Use Spring Boot's `spring.application.name` property as part of the default clientIds for Consumers, Producers and AdminClients. Helps with identifying problematic clients at server side. * Only use as a fallback if clientId wasn't specified explicitly * Do not use for Consumers with a specified groupId because KafkaConsumer will use the groupId as clientId which already is an identifiable default
1 parent a41c03a commit 4df9ad1

File tree

6 files changed

+156
-13
lines changed

6 files changed

+156
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import java.util.Iterator;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Optional;
2728
import java.util.Properties;
2829
import java.util.Set;
2930
import java.util.concurrent.ConcurrentHashMap;
@@ -36,7 +37,11 @@
3637
import org.apache.kafka.common.MetricName;
3738
import org.apache.kafka.common.serialization.Deserializer;
3839

40+
import org.springframework.beans.BeansException;
3941
import org.springframework.beans.factory.BeanNameAware;
42+
import org.springframework.context.ApplicationContext;
43+
import org.springframework.context.ApplicationContextAware;
44+
import org.springframework.core.env.EnvironmentCapable;
4045
import org.springframework.core.log.LogAccessor;
4146
import org.springframework.lang.Nullable;
4247
import org.springframework.util.Assert;
@@ -68,7 +73,7 @@
6873
* @author Chris Gilbert
6974
*/
7075
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
71-
implements ConsumerFactory<K, V>, BeanNameAware {
76+
implements ConsumerFactory<K, V>, BeanNameAware, ApplicationContextAware {
7277

7378
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));
7479

@@ -86,6 +91,8 @@ public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
8691

8792
private boolean configureDeserializers = true;
8893

94+
private ApplicationContext applicationContext;
95+
8996
/**
9097
* Construct a factory with the provided configuration.
9198
* @param configs the configuration.
@@ -371,6 +378,22 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
371378
if (clientIdSuffix == null) {
372379
clientIdSuffix = "";
373380
}
381+
382+
final boolean hasGroupIdOrClientIdInProperties = properties != null
383+
&& (properties.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG));
384+
final boolean hasGroupIdOrClientIdInConfig = this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
385+
|| this.configs.containsKey(ConsumerConfig.GROUP_ID_CONFIG);
386+
if (!overrideClientIdPrefix && groupId == null && !hasGroupIdOrClientIdInProperties && !hasGroupIdOrClientIdInConfig) {
387+
final String applicationName = Optional.ofNullable(this.applicationContext)
388+
.map(EnvironmentCapable::getEnvironment)
389+
.map(environment -> environment.getProperty("spring.application.name"))
390+
.orElse(null);
391+
if (applicationName != null) {
392+
clientIdPrefix = applicationName + "-consumer";
393+
overrideClientIdPrefix = true;
394+
}
395+
}
396+
374397
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
375398
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
376399
if (groupId == null
@@ -469,6 +492,11 @@ public boolean isAutoCommit() {
469492
: !(auto instanceof String) || Boolean.parseBoolean((String) auto);
470493
}
471494

495+
@Override
496+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
497+
this.applicationContext = applicationContext;
498+
}
499+
472500
protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {
473501

474502
private String idForListeners;

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Iterator;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.Optional;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.Future;
@@ -59,6 +60,7 @@
5960
import org.springframework.context.ApplicationListener;
6061
import org.springframework.context.SmartLifecycle;
6162
import org.springframework.context.event.ContextStoppedEvent;
63+
import org.springframework.core.env.EnvironmentCapable;
6264
import org.springframework.core.log.LogAccessor;
6365
import org.springframework.kafka.KafkaException;
6466
import org.springframework.lang.Nullable;
@@ -981,9 +983,21 @@ public void closeThreadBoundProducer() {
981983
protected Map<String, Object> getProducerConfigs() {
982984
final Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
983985
checkBootstrap(newProducerConfigs);
986+
987+
final String prefix;
984988
if (this.clientIdPrefix != null) {
989+
prefix = this.clientIdPrefix;
990+
} else {
991+
prefix = Optional.ofNullable(this.applicationContext)
992+
.map(EnvironmentCapable::getEnvironment)
993+
.map(environment -> environment.getProperty("spring.application.name"))
994+
.map(applicationName -> applicationName + "-producer")
995+
.orElse(null);
996+
}
997+
998+
if (prefix != null) {
985999
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
986-
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
1000+
prefix + "-" + this.clientIdCounter.incrementAndGet());
9871001
}
9881002
return newProducerConfigs;
9891003
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737

3838
import org.apache.commons.logging.LogFactory;
3939
import org.apache.kafka.clients.admin.AdminClient;
40+
import org.apache.kafka.clients.admin.AdminClientConfig;
4041
import org.apache.kafka.clients.admin.AlterConfigOp;
4142
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
4243
import org.apache.kafka.clients.admin.AlterConfigsResult;
@@ -55,10 +56,12 @@
5556
import org.apache.kafka.common.errors.TopicExistsException;
5657
import org.apache.kafka.common.errors.UnsupportedVersionException;
5758

59+
import org.jetbrains.annotations.NotNull;
5860
import org.springframework.beans.BeansException;
5961
import org.springframework.beans.factory.SmartInitializingSingleton;
6062
import org.springframework.context.ApplicationContext;
6163
import org.springframework.context.ApplicationContextAware;
64+
import org.springframework.core.env.EnvironmentCapable;
6265
import org.springframework.core.log.LogAccessor;
6366
import org.springframework.kafka.KafkaException;
6467
import org.springframework.kafka.support.TopicForRetryable;
@@ -86,6 +89,8 @@ public class KafkaAdmin extends KafkaResourceFactory
8689

8790
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaAdmin.class));
8891

92+
private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();
93+
8994
private final Map<String, Object> configs;
9095

9196
private ApplicationContext applicationContext;
@@ -374,9 +379,23 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
374379
}
375380

376381
AdminClient createAdmin() {
377-
Map<String, Object> configs2 = new HashMap<>(this.configs);
382+
return AdminClient.create(getAdminConfig());
383+
}
384+
385+
protected Map<String, Object> getAdminConfig() {
386+
final Map<String, Object> configs2 = new HashMap<>(this.configs);
378387
checkBootstrap(configs2);
379-
return AdminClient.create(configs2);
388+
389+
if (!configs2.containsKey(AdminClientConfig.CLIENT_ID_CONFIG)) {
390+
Optional.ofNullable(this.applicationContext)
391+
.map(EnvironmentCapable::getEnvironment)
392+
.map(environment -> environment.getProperty("spring.application.name"))
393+
.ifPresent(applicationName -> configs2.put(
394+
AdminClientConfig.CLIENT_ID_CONFIG,
395+
applicationName + "-admin-" + CLIENT_ID_COUNTER.getAndIncrement())
396+
);
397+
}
398+
return configs2;
380399
}
381400

382401
private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.verify;
20+
import static org.mockito.Mockito.*;
21+
import static org.mockito.Mockito.when;
2222

2323
import java.util.AbstractMap;
2424
import java.util.ArrayList;
@@ -43,7 +43,9 @@
4343

4444
import org.springframework.aop.framework.ProxyFactory;
4545
import org.springframework.beans.factory.annotation.Autowired;
46+
import org.springframework.context.ApplicationContext;
4647
import org.springframework.context.annotation.Configuration;
48+
import org.springframework.core.env.Environment;
4749
import org.springframework.kafka.core.ConsumerFactory.Listener;
4850
import org.springframework.kafka.listener.ContainerProperties;
4951
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
@@ -120,6 +122,7 @@ protected Consumer<String, String> createRawConsumer(Map<String, Object> configP
120122
return null;
121123
}
122124
};
125+
target.setApplicationContext(createApplicationContextWithApplicationName());
123126
target.setBootstrapServersSupplier(() -> "foo");
124127
target.createConsumer(null, null, null, null);
125128
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo("foo");
@@ -143,6 +146,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
143146
return null;
144147
}
145148
};
149+
target.setApplicationContext(createApplicationContextWithApplicationName());
146150
target.createConsumer(null, null, null, overrides);
147151
assertThat(configPassedToKafkaConsumer.get("config1")).isEqualTo("overridden");
148152
assertThat(configPassedToKafkaConsumer.get("config2")).isSameAs(originalConfig.get("config2"));
@@ -165,6 +169,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
165169
return null;
166170
}
167171
};
172+
target.setApplicationContext(createApplicationContextWithApplicationName());
168173
target.createConsumer(null, null, "-1", null);
169174
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("original-1");
170175
}
@@ -198,6 +203,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
198203
return null;
199204
}
200205
};
206+
target.setApplicationContext(createApplicationContextWithApplicationName());
201207
target.createConsumer(null, "overridden", null, null);
202208
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
203209
}
@@ -214,6 +220,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
214220
return null;
215221
}
216222
};
223+
target.setApplicationContext(createApplicationContextWithApplicationName());
217224
target.createConsumer(null, "overridden", null, null);
218225
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
219226
}
@@ -231,6 +238,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
231238
return null;
232239
}
233240
};
241+
target.setApplicationContext(createApplicationContextWithApplicationName());
234242
target.createConsumer(null, "overridden", "-1", null);
235243
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
236244
}
@@ -250,10 +258,27 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
250258
return null;
251259
}
252260
};
261+
target.setApplicationContext(createApplicationContextWithApplicationName());
253262
target.createConsumer(null, "overridden", "-1", overrides);
254263
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
255264
}
256265

266+
@Test
267+
public void testApplicationNameIfNoGroupIdAsClientIdWhenCreatingConsumer() {
268+
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
269+
DefaultKafkaConsumerFactory<String, String> target =
270+
new DefaultKafkaConsumerFactory<String, String>(Map.of()) {
271+
272+
@Override
273+
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
274+
configPassedToKafkaConsumer.putAll(configProps);
275+
return null;
276+
}
277+
};
278+
target.setApplicationContext(createApplicationContextWithApplicationName());
279+
target.createConsumer(null, null, "-1", null);
280+
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("appname-consumer-1");
281+
}
257282

258283
@Test
259284
public void testOverriddenGroupIdWhenCreatingConsumer() {
@@ -476,6 +501,14 @@ void configDeserializer() {
476501
verify(value).configure(config, false);
477502
}
478503

504+
private static ApplicationContext createApplicationContextWithApplicationName() {
505+
final Environment environment = mock(Environment.class);
506+
when(environment.getProperty("spring.application.name")).thenReturn("appname");
507+
final ApplicationContext applicationContext = mock(ApplicationContext.class);
508+
when(applicationContext.getEnvironment()).thenReturn(environment);
509+
return applicationContext;
510+
}
511+
479512
@Configuration
480513
public static class Config {
481514

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,7 @@
2525
import static org.mockito.ArgumentMatchers.isNull;
2626
import static org.mockito.BDDMockito.given;
2727
import static org.mockito.BDDMockito.willAnswer;
28-
import static org.mockito.Mockito.inOrder;
29-
import static org.mockito.Mockito.mock;
30-
import static org.mockito.Mockito.times;
31-
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.*;
3229

3330
import java.time.Duration;
3431
import java.util.ArrayList;
@@ -53,8 +50,10 @@
5350
import org.junit.jupiter.api.Test;
5451
import org.mockito.InOrder;
5552

53+
import org.mockito.Mockito;
5654
import org.springframework.context.ApplicationContext;
5755
import org.springframework.context.event.ContextStoppedEvent;
56+
import org.springframework.core.env.Environment;
5857
import org.springframework.kafka.core.ProducerFactory.Listener;
5958
import org.springframework.kafka.test.utils.KafkaTestUtils;
6059
import org.springframework.kafka.transaction.KafkaTransactionManager;
@@ -778,4 +777,26 @@ protected Producer<String, String> createRawProducer(Map<String, Object> rawConf
778777
assertThat(producerConfigs).containsEntry("linger.ms", 200);
779778
}
780779

780+
@Test
781+
void testDefaultClientIdPrefixIsSpringBootApplicationName() {
782+
final DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(Map.of());
783+
final Environment environment = mock(Environment.class);
784+
when(environment.getProperty("spring.application.name")).thenReturn("appname");
785+
final ApplicationContext applicationContext = mock(ApplicationContext.class);
786+
when(applicationContext.getEnvironment()).thenReturn(environment);
787+
pf.setApplicationContext(applicationContext);
788+
assertThat(pf.getProducerConfigs()).containsEntry(ProducerConfig.CLIENT_ID_CONFIG, "appname-producer-1");
789+
}
790+
791+
@Test
792+
void testExplicitClientIdPrefixOverridesDefault() {
793+
final DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(Map.of(ProducerConfig.CLIENT_ID_CONFIG, "clientId"));
794+
final Environment environment = mock(Environment.class);
795+
when(environment.getProperty("spring.application.name")).thenReturn("appname");
796+
final ApplicationContext applicationContext = mock(ApplicationContext.class);
797+
when(applicationContext.getEnvironment()).thenReturn(environment);
798+
pf.setApplicationContext(applicationContext);
799+
assertThat(pf.getProducerConfigs()).containsEntry(ProducerConfig.CLIENT_ID_CONFIG, "clientId-1");
800+
}
801+
781802
}

0 commit comments

Comments
 (0)