Skip to content

Commit 85ed902

Browse files
committed
Merge pull request #7672 from garyrussell/GH-7646
* pr/7672: Polish Kafka properties Support arbitrary Kafka properties
2 parents ef671e7 + 1f7b3ca commit 85ed902

File tree

5 files changed

+214
-43
lines changed

5 files changed

+214
-43
lines changed

spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.boot.context.properties.ConfigurationProperties;
3434
import org.springframework.core.io.Resource;
3535
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
36+
import org.springframework.util.CollectionUtils;
3637

3738
/**
3839
* Configuration properties for Spring for Apache Kafka.
@@ -47,18 +48,6 @@
4748
@ConfigurationProperties(prefix = "spring.kafka")
4849
public class KafkaProperties {
4950

50-
private final Consumer consumer = new Consumer();
51-
52-
private final Producer producer = new Producer();
53-
54-
private final Listener listener = new Listener();
55-
56-
private final Template template = new Template();
57-
58-
private final Ssl ssl = new Ssl();
59-
60-
// Apache Kafka Common Properties
61-
6251
/**
6352
* Comma-delimited list of host:port pairs to use for establishing the initial
6453
* connection to the Kafka cluster.
@@ -71,25 +60,20 @@ public class KafkaProperties {
7160
*/
7261
private String clientId;
7362

74-
public Consumer getConsumer() {
75-
return this.consumer;
76-
}
63+
/**
64+
* Additional properties used to configure the client.
65+
*/
66+
private Map<String, String> properties = new HashMap<String, String>();
7767

78-
public Producer getProducer() {
79-
return this.producer;
80-
}
68+
private final Consumer consumer = new Consumer();
8169

82-
public Listener getListener() {
83-
return this.listener;
84-
}
70+
private final Producer producer = new Producer();
8571

86-
public Ssl getSsl() {
87-
return this.ssl;
88-
}
72+
private final Listener listener = new Listener();
8973

90-
public Template getTemplate() {
91-
return this.template;
92-
}
74+
private final Ssl ssl = new Ssl();
75+
76+
private final Template template = new Template();
9377

9478
public List<String> getBootstrapServers() {
9579
return this.bootstrapServers;
@@ -107,6 +91,34 @@ public void setClientId(String clientId) {
10791
this.clientId = clientId;
10892
}
10993

94+
public Map<String, String> getProperties() {
95+
return this.properties;
96+
}
97+
98+
public void setProperties(Map<String, String> properties) {
99+
this.properties = properties;
100+
}
101+
102+
public Consumer getConsumer() {
103+
return this.consumer;
104+
}
105+
106+
public Producer getProducer() {
107+
return this.producer;
108+
}
109+
110+
public Listener getListener() {
111+
return this.listener;
112+
}
113+
114+
public Ssl getSsl() {
115+
return this.ssl;
116+
}
117+
118+
public Template getTemplate() {
119+
return this.template;
120+
}
121+
110122
private Map<String, Object> buildCommonProperties() {
111123
Map<String, Object> properties = new HashMap<String, Object>();
112124
if (this.bootstrapServers != null) {
@@ -135,6 +147,9 @@ private Map<String, Object> buildCommonProperties() {
135147
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
136148
this.ssl.getTruststorePassword());
137149
}
150+
if (!CollectionUtils.isEmpty(this.properties)) {
151+
properties.putAll(this.properties);
152+
}
138153
return properties;
139154
}
140155

@@ -147,9 +162,9 @@ private Map<String, Object> buildCommonProperties() {
147162
* instance
148163
*/
149164
public Map<String, Object> buildConsumerProperties() {
150-
Map<String, Object> props = buildCommonProperties();
151-
props.putAll(this.consumer.buildProperties());
152-
return props;
165+
Map<String, Object> properties = buildCommonProperties();
166+
properties.putAll(this.consumer.buildProperties());
167+
return properties;
153168
}
154169

155170
/**
@@ -161,9 +176,9 @@ public Map<String, Object> buildConsumerProperties() {
161176
* instance
162177
*/
163178
public Map<String, Object> buildProducerProperties() {
164-
Map<String, Object> props = buildCommonProperties();
165-
props.putAll(this.producer.buildProperties());
166-
return props;
179+
Map<String, Object> properties = buildCommonProperties();
180+
properties.putAll(this.producer.buildProperties());
181+
return properties;
167182
}
168183

169184
private static String resourceToPath(Resource resource) {
@@ -240,6 +255,11 @@ public static class Consumer {
240255
*/
241256
private Class<?> valueDeserializer = StringDeserializer.class;
242257

258+
/**
259+
* Maximum number of records returned in a single call to poll().
260+
*/
261+
private Integer maxPollRecords;
262+
243263
public Ssl getSsl() {
244264
return this.ssl;
245265
}
@@ -332,6 +352,14 @@ public void setValueDeserializer(Class<?> valueDeserializer) {
332352
this.valueDeserializer = valueDeserializer;
333353
}
334354

355+
public Integer getMaxPollRecords() {
356+
return this.maxPollRecords;
357+
}
358+
359+
public void setMaxPollRecords(Integer maxPollRecords) {
360+
this.maxPollRecords = maxPollRecords;
361+
}
362+
335363
public Map<String, Object> buildProperties() {
336364
Map<String, Object> properties = new HashMap<String, Object>();
337365
if (this.autoCommitInterval != null) {
@@ -395,6 +423,10 @@ public Map<String, Object> buildProperties() {
395423
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
396424
this.valueDeserializer);
397425
}
426+
if (this.maxPollRecords != null) {
427+
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
428+
this.maxPollRecords);
429+
}
398430
return properties;
399431
}
400432

spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,16 @@ public void closeContext() {
6060

6161
@Test
6262
public void consumerProperties() {
63-
load("spring.kafka.bootstrap-servers=foo:1234",
63+
load("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.properties.foo=bar",
64+
"spring.kafka.properties.baz=qux",
65+
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
6466
"spring.kafka.ssl.key-password=p1",
6567
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
6668
"spring.kafka.ssl.keystore-password=p2",
6769
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
6870
"spring.kafka.ssl.truststore-password=p3",
6971
"spring.kafka.consumer.auto-commit-interval=123",
72+
"spring.kafka.consumer.max-poll-records=42",
7073
"spring.kafka.consumer.auto-offset-reset=earliest",
7174
"spring.kafka.consumer.client-id=ccid", // test override common
7275
"spring.kafka.consumer.enable-auto-commit=false",
@@ -109,6 +112,10 @@ public void consumerProperties() {
109112
.isEqualTo(LongDeserializer.class);
110113
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
111114
.isEqualTo(IntegerDeserializer.class);
115+
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)).isEqualTo(42);
116+
assertThat(configs.get("foo")).isEqualTo("bar");
117+
assertThat(configs.get("baz")).isEqualTo("qux");
118+
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
112119
}
113120

114121
@Test

spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,7 @@ content into your application; rather pick only the properties that you need.
878878
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
879879
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
880880
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
881+
spring.kafka.consumer.max-poll-messages= # Maximum number of records returned in a single call to poll().
881882
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
882883
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
883884
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
@@ -893,6 +894,7 @@ content into your application; rather pick only the properties that you need.
893894
spring.kafka.producer.key-serializer= # Serializer class for keys.
894895
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
895896
spring.kafka.producer.value-serializer= # Serializer class for values.
897+
spring.kafka.properties.*= # Additional properties used to configure the client.
896898
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
897899
spring.kafka.ssl.keystore-location= # Location of the key store file.
898900
spring.kafka.ssl.keystore-password= # Store password for the key store file.

spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4643,18 +4643,23 @@ auto configuration supports all HIGH importance properties, some selected MEDIUM
46434643
and any that do not have a default value.
46444644

46454645
Only a subset of the properties supported by Kafka are available via the `KafkaProperties`
4646-
class. If you wish to configure the producer or consumer with additional properties, you
4647-
can override the producer factory and/or consumer factory bean, adding additional
4648-
properties, for example:
4646+
class. If you wish to configure the producer or consumer with additional properties that
4647+
are not directly supported, use the following:
4648+
4649+
[source,properties,indent=0]
4650+
----
4651+
spring.kafka.properties.foo.bar=baz
4652+
----
4653+
4654+
This sets the common `foo.bar` Kafka property to `baz`.
4655+
4656+
These properties will be shared by both the consumer and producer factory beans.
4657+
If you wish to customize these components with different properties, such as to use a
4658+
different metrics reader for each, you can override the bean definitions, as follows:
46494659

46504660
[source,java,indent=0]
46514661
----
4652-
@Bean
4653-
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
4654-
Map<String, Object> producerProperties = properties.buildProducerProperties();
4655-
producerProperties.put("some.property", "some.value");
4656-
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
4657-
}
4662+
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
46584663
----
46594664

46604665

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2016-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.kafka;
18+
19+
import java.util.List;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.clients.CommonClientConfigs;
23+
import org.apache.kafka.common.metrics.KafkaMetric;
24+
import org.apache.kafka.common.metrics.MetricsReporter;
25+
26+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.kafka.core.ConsumerFactory;
30+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
31+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
32+
import org.springframework.kafka.core.ProducerFactory;
33+
34+
/**
35+
* Example custom kafka configuration beans used when the user wants to apply different
36+
* common properties to the producer and consumer.
37+
*
38+
* @author Gary Russell
39+
* @since 1.5
40+
*/
41+
public class KafkaSpecialProducerConsumerConfigExample {
42+
43+
// tag::configuration[]
44+
@Configuration
45+
public static class CustomKafkaBeans {
46+
47+
/**
48+
* Customized ProducerFactory bean.
49+
* @param properties the kafka properties.
50+
* @return the bean.
51+
*/
52+
@Bean
53+
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
54+
Map<String, Object> producerProperties = properties.buildProducerProperties();
55+
producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
56+
MyProducerMetricsReporter.class);
57+
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
58+
}
59+
60+
/**
61+
* Customized ConsumerFactory bean.
62+
* @param properties the kafka properties.
63+
* @return the bean.
64+
*/
65+
@Bean
66+
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
67+
Map<String, Object> consumererProperties = properties
68+
.buildConsumerProperties();
69+
consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
70+
MyConsumerMetricsReporter.class);
71+
return new DefaultKafkaConsumerFactory<Object, Object>(consumererProperties);
72+
}
73+
74+
}
75+
// end::configuration[]
76+
77+
public static class MyConsumerMetricsReporter implements MetricsReporter {
78+
79+
@Override
80+
public void configure(Map<String, ?> configs) {
81+
}
82+
83+
@Override
84+
public void init(List<KafkaMetric> metrics) {
85+
}
86+
87+
@Override
88+
public void metricChange(KafkaMetric metric) {
89+
}
90+
91+
@Override
92+
public void metricRemoval(KafkaMetric metric) {
93+
}
94+
95+
@Override
96+
public void close() {
97+
}
98+
99+
}
100+
101+
public static class MyProducerMetricsReporter implements MetricsReporter {
102+
103+
@Override
104+
public void configure(Map<String, ?> configs) {
105+
}
106+
107+
@Override
108+
public void init(List<KafkaMetric> metrics) {
109+
}
110+
111+
@Override
112+
public void metricChange(KafkaMetric metric) {
113+
}
114+
115+
@Override
116+
public void metricRemoval(KafkaMetric metric) {
117+
}
118+
119+
@Override
120+
public void close() {
121+
}
122+
123+
}
124+
125+
}

0 commit comments

Comments
 (0)