Skip to content

Commit ab5f0a1

Browse files
authored
spring-projectsGH-3001: default clientIds with application name
Fixes: #spring-projectsGH-3001 * Use Spring Boot's `spring.application.name` property as part of the default clientIds for Consumers, Producers, and AdminClients. Helps with identifying problematic clients on the server side. * Only use as a fallback if clientId wasn't specified explicitly * Do not use for Consumers with a specified groupId because KafkaConsumer will use the groupId as clientId, which already is an identifiable default
1 parent a41c03a commit ab5f0a1

File tree

8 files changed

+200
-8
lines changed

8 files changed

+200
-8
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/connecting.adoc

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,37 @@ These listeners can be used, for example, to create and bind a Micrometer `Kafka
5858

5959
The framework provides listeners that do exactly that; see xref:kafka/micrometer.adoc#micrometer-native[Micrometer Native Metrics].
6060

61+
[[default-client-id-prefixes]]
62+
== Default client ID prefixes
63+
64+
Starting with version 3.2, for Spring Boot applications which define an application name using the `spring.application.name` property, this name is now used
65+
as a default prefix for auto-generated client IDs for these client types:
66+
67+
- consumer clients which don't use a consumer group
68+
- producer clients
69+
- admin clients
70+
71+
This makes it easier to identify these clients at server side for troubleshooting or applying quotas.
72+
73+
.Example client ids resulting for a Spring Boot application with `spring.application.name=myapp`
74+
[%autowidth]
75+
|===
76+
|Client Type |Without application name |With application name
77+
78+
|consumer without consumer group
79+
|consumer-null-1
80+
|myapp-consumer-1
81+
82+
|consumer with consumer group "mygroup"
83+
|consumer-mygroup-1
84+
|consumer-mygroup-1
85+
86+
|producer
87+
|producer-1
88+
|myapp-producer-1
89+
90+
|admin
91+
|adminclient-1
92+
|myapp-admin-1
93+
|===
94+

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,9 @@ See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.
6767
When this constructor is used, the framework calls the function with the input argument of the current consumer offset position.
6868
See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.
6969

70+
[[x32-default-clientid-prefix]]
71+
=== Spring Boot application name as default client ID prefix
72+
73+
For Spring Boot applications which define an application name, this name is now used
74+
as a default prefix for auto-generated client IDs for certain client types.
75+
See xref:kafka/connecting.adoc#default-client-id-prefixes[Default client ID prefixes] for more details.

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import java.util.Iterator;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Optional;
2728
import java.util.Properties;
2829
import java.util.Set;
2930
import java.util.concurrent.ConcurrentHashMap;
@@ -36,7 +37,11 @@
3637
import org.apache.kafka.common.MetricName;
3738
import org.apache.kafka.common.serialization.Deserializer;
3839

40+
import org.springframework.beans.BeansException;
3941
import org.springframework.beans.factory.BeanNameAware;
42+
import org.springframework.context.ApplicationContext;
43+
import org.springframework.context.ApplicationContextAware;
44+
import org.springframework.core.env.EnvironmentCapable;
4045
import org.springframework.core.log.LogAccessor;
4146
import org.springframework.lang.Nullable;
4247
import org.springframework.util.Assert;
@@ -66,9 +71,10 @@
6671
* @author Murali Reddy
6772
* @author Artem Bilan
6873
* @author Chris Gilbert
74+
* @author Adrian Gygax
6975
*/
7076
public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
71-
implements ConsumerFactory<K, V>, BeanNameAware {
77+
implements ConsumerFactory<K, V>, BeanNameAware, ApplicationContextAware {
7278

7379
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaConsumerFactory.class));
7480

@@ -86,6 +92,8 @@ public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
8692

8793
private boolean configureDeserializers = true;
8894

95+
private ApplicationContext applicationContext;
96+
8997
/**
9098
* Construct a factory with the provided configuration.
9199
* @param configs the configuration.
@@ -371,6 +379,22 @@ protected Consumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nullable
371379
if (clientIdSuffix == null) {
372380
clientIdSuffix = "";
373381
}
382+
383+
final boolean hasGroupIdOrClientIdInProperties = properties != null
384+
&& (properties.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || properties.containsKey(ConsumerConfig.GROUP_ID_CONFIG));
385+
final boolean hasGroupIdOrClientIdInConfig = this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
386+
|| this.configs.containsKey(ConsumerConfig.GROUP_ID_CONFIG);
387+
if (!overrideClientIdPrefix && groupId == null && !hasGroupIdOrClientIdInProperties && !hasGroupIdOrClientIdInConfig) {
388+
final String applicationName = Optional.ofNullable(this.applicationContext)
389+
.map(EnvironmentCapable::getEnvironment)
390+
.map(environment -> environment.getProperty("spring.application.name"))
391+
.orElse(null);
392+
if (applicationName != null) {
393+
clientIdPrefix = applicationName + "-consumer";
394+
overrideClientIdPrefix = true;
395+
}
396+
}
397+
374398
boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG)
375399
&& StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix;
376400
if (groupId == null
@@ -469,6 +493,11 @@ public boolean isAutoCommit() {
469493
: !(auto instanceof String) || Boolean.parseBoolean((String) auto);
470494
}
471495

496+
@Override
497+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
498+
this.applicationContext = applicationContext;
499+
}
500+
472501
protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {
473502

474503
private String idForListeners;

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Iterator;
2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.Optional;
2627
import java.util.concurrent.BlockingQueue;
2728
import java.util.concurrent.ConcurrentHashMap;
2829
import java.util.concurrent.Future;
@@ -59,6 +60,7 @@
5960
import org.springframework.context.ApplicationListener;
6061
import org.springframework.context.SmartLifecycle;
6162
import org.springframework.context.event.ContextStoppedEvent;
63+
import org.springframework.core.env.EnvironmentCapable;
6264
import org.springframework.core.log.LogAccessor;
6365
import org.springframework.kafka.KafkaException;
6466
import org.springframework.lang.Nullable;
@@ -110,6 +112,7 @@
110112
* @author Artem Bilan
111113
* @author Chris Gilbert
112114
* @author Thomas Strauß
115+
* @author Adrian Gygax
113116
*/
114117
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
115118
implements ProducerFactory<K, V>, ApplicationContextAware,
@@ -981,9 +984,22 @@ public void closeThreadBoundProducer() {
981984
protected Map<String, Object> getProducerConfigs() {
982985
final Map<String, Object> newProducerConfigs = new HashMap<>(this.configs);
983986
checkBootstrap(newProducerConfigs);
987+
988+
final String prefix;
984989
if (this.clientIdPrefix != null) {
990+
prefix = this.clientIdPrefix;
991+
}
992+
else {
993+
prefix = Optional.ofNullable(this.applicationContext)
994+
.map(EnvironmentCapable::getEnvironment)
995+
.map(environment -> environment.getProperty("spring.application.name"))
996+
.map(applicationName -> applicationName + "-producer")
997+
.orElse(null);
998+
}
999+
1000+
if (prefix != null) {
9851001
newProducerConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,
986-
this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
1002+
prefix + "-" + this.clientIdCounter.incrementAndGet());
9871003
}
9881004
return newProducerConfigs;
9891005
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737

3838
import org.apache.commons.logging.LogFactory;
3939
import org.apache.kafka.clients.admin.AdminClient;
40+
import org.apache.kafka.clients.admin.AdminClientConfig;
4041
import org.apache.kafka.clients.admin.AlterConfigOp;
4142
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
4243
import org.apache.kafka.clients.admin.AlterConfigsResult;
@@ -59,6 +60,7 @@
5960
import org.springframework.beans.factory.SmartInitializingSingleton;
6061
import org.springframework.context.ApplicationContext;
6162
import org.springframework.context.ApplicationContextAware;
63+
import org.springframework.core.env.EnvironmentCapable;
6264
import org.springframework.core.log.LogAccessor;
6365
import org.springframework.kafka.KafkaException;
6466
import org.springframework.kafka.support.TopicForRetryable;
@@ -71,6 +73,7 @@
7173
*
7274
* @author Gary Russell
7375
* @author Artem Bilan
76+
* @author Adrian Gygax
7477
*
7578
* @since 1.3
7679
*/
@@ -86,6 +89,8 @@ public class KafkaAdmin extends KafkaResourceFactory
8689

8790
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaAdmin.class));
8891

92+
private static final AtomicInteger CLIENT_ID_COUNTER = new AtomicInteger();
93+
8994
private final Map<String, Object> configs;
9095

9196
private ApplicationContext applicationContext;
@@ -374,9 +379,23 @@ public Map<String, TopicDescription> describeTopics(String... topicNames) {
374379
}
375380

376381
AdminClient createAdmin() {
377-
Map<String, Object> configs2 = new HashMap<>(this.configs);
382+
return AdminClient.create(getAdminConfig());
383+
}
384+
385+
protected Map<String, Object> getAdminConfig() {
386+
final Map<String, Object> configs2 = new HashMap<>(this.configs);
378387
checkBootstrap(configs2);
379-
return AdminClient.create(configs2);
388+
389+
if (!configs2.containsKey(AdminClientConfig.CLIENT_ID_CONFIG)) {
390+
Optional.ofNullable(this.applicationContext)
391+
.map(EnvironmentCapable::getEnvironment)
392+
.map(environment -> environment.getProperty("spring.application.name"))
393+
.ifPresent(applicationName -> configs2.put(
394+
AdminClientConfig.CLIENT_ID_CONFIG,
395+
applicationName + "-admin-" + CLIENT_ID_COUNTER.getAndIncrement())
396+
);
397+
}
398+
return configs2;
380399
}
381400

382401
private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.given;
2021
import static org.mockito.Mockito.mock;
2122
import static org.mockito.Mockito.verify;
2223

@@ -43,7 +44,9 @@
4344

4445
import org.springframework.aop.framework.ProxyFactory;
4546
import org.springframework.beans.factory.annotation.Autowired;
47+
import org.springframework.context.ApplicationContext;
4648
import org.springframework.context.annotation.Configuration;
49+
import org.springframework.core.env.Environment;
4750
import org.springframework.kafka.core.ConsumerFactory.Listener;
4851
import org.springframework.kafka.listener.ContainerProperties;
4952
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
@@ -60,6 +63,7 @@
6063
* @author Gary Russell
6164
* @author Chris Gilbert
6265
* @author Artem Bilan
66+
* @author Adrian Gygax
6367
*
6468
* @since 1.0.6
6569
*/
@@ -120,6 +124,7 @@ protected Consumer<String, String> createRawConsumer(Map<String, Object> configP
120124
return null;
121125
}
122126
};
127+
target.setApplicationContext(createApplicationContextWithApplicationName());
123128
target.setBootstrapServersSupplier(() -> "foo");
124129
target.createConsumer(null, null, null, null);
125130
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)).isEqualTo("foo");
@@ -143,6 +148,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
143148
return null;
144149
}
145150
};
151+
target.setApplicationContext(createApplicationContextWithApplicationName());
146152
target.createConsumer(null, null, null, overrides);
147153
assertThat(configPassedToKafkaConsumer.get("config1")).isEqualTo("overridden");
148154
assertThat(configPassedToKafkaConsumer.get("config2")).isSameAs(originalConfig.get("config2"));
@@ -165,6 +171,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
165171
return null;
166172
}
167173
};
174+
target.setApplicationContext(createApplicationContextWithApplicationName());
168175
target.createConsumer(null, null, "-1", null);
169176
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("original-1");
170177
}
@@ -198,6 +205,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
198205
return null;
199206
}
200207
};
208+
target.setApplicationContext(createApplicationContextWithApplicationName());
201209
target.createConsumer(null, "overridden", null, null);
202210
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
203211
}
@@ -214,6 +222,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
214222
return null;
215223
}
216224
};
225+
target.setApplicationContext(createApplicationContextWithApplicationName());
217226
target.createConsumer(null, "overridden", null, null);
218227
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden");
219228
}
@@ -231,6 +240,7 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
231240
return null;
232241
}
233242
};
243+
target.setApplicationContext(createApplicationContextWithApplicationName());
234244
target.createConsumer(null, "overridden", "-1", null);
235245
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
236246
}
@@ -250,10 +260,27 @@ protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object>
250260
return null;
251261
}
252262
};
263+
target.setApplicationContext(createApplicationContextWithApplicationName());
253264
target.createConsumer(null, "overridden", "-1", overrides);
254265
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("overridden-1");
255266
}
256267

268+
@Test
269+
public void testApplicationNameIfNoGroupIdAsClientIdWhenCreatingConsumer() {
270+
final Map<String, Object> configPassedToKafkaConsumer = new HashMap<>();
271+
DefaultKafkaConsumerFactory<String, String> target =
272+
new DefaultKafkaConsumerFactory<String, String>(Map.of()) {
273+
274+
@Override
275+
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configProps) {
276+
configPassedToKafkaConsumer.putAll(configProps);
277+
return null;
278+
}
279+
};
280+
target.setApplicationContext(createApplicationContextWithApplicationName());
281+
target.createConsumer(null, null, "-1", null);
282+
assertThat(configPassedToKafkaConsumer.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("appname-consumer-1");
283+
}
257284

258285
@Test
259286
public void testOverriddenGroupIdWhenCreatingConsumer() {
@@ -476,6 +503,14 @@ void configDeserializer() {
476503
verify(value).configure(config, false);
477504
}
478505

506+
private static ApplicationContext createApplicationContextWithApplicationName() {
507+
final Environment environment = mock(Environment.class);
508+
given(environment.getProperty("spring.application.name")).willReturn("appname");
509+
final ApplicationContext applicationContext = mock(ApplicationContext.class);
510+
given(applicationContext.getEnvironment()).willReturn(environment);
511+
return applicationContext;
512+
}
513+
479514
@Configuration
480515
public static class Config {
481516

0 commit comments

Comments
 (0)