Skip to content

Commit f8ce714

Browse files
committed
Merge pull request #14021 from garyrussell:kafkaStreams
* pr/14021: Polish "Add Kafka Streams auto-configuration" Add Kafka Streams auto-configuration
2 parents f9207dd + 6d4bab9 commit f8ce714

File tree

10 files changed

+540
-12
lines changed

10 files changed

+540
-12
lines changed

spring-boot-project/spring-boot-autoconfigure/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@
132132
<artifactId>jest</artifactId>
133133
<optional>true</optional>
134134
</dependency>
135+
<dependency>
136+
<groupId>org.apache.kafka</groupId>
137+
<artifactId>kafka-streams</artifactId>
138+
<optional>true</optional>
139+
</dependency>
135140
<dependency>
136141
<groupId>org.flywaydb</groupId>
137142
<artifactId>flyway-core</artifactId>

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@
5252
@Configuration
5353
@ConditionalOnClass(KafkaTemplate.class)
5454
@EnableConfigurationProperties(KafkaProperties.class)
55-
@Import(KafkaAnnotationDrivenConfiguration.class)
55+
@Import({ KafkaAnnotationDrivenConfiguration.class,
56+
KafkaStreamsAnnotationDrivenConfiguration.class })
5657
public class KafkaAutoConfiguration {
5758

5859
private final KafkaProperties properties;

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 153 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class KafkaProperties {
5757

5858
/**
5959
* Comma-delimited list of host:port pairs to use for establishing the initial
60-
* connection to the Kafka cluster.
60+
* connection to the Kafka cluster. Applies to all components unless overridden.
6161
*/
6262
private List<String> bootstrapServers = new ArrayList<>(
6363
Collections.singletonList("localhost:9092"));
@@ -79,6 +79,8 @@ public class KafkaProperties {
7979

8080
private final Admin admin = new Admin();
8181

82+
private final Streams streams = new Streams();
83+
8284
private final Listener listener = new Listener();
8385

8486
private final Ssl ssl = new Ssl();
@@ -123,6 +125,10 @@ public Admin getAdmin() {
123125
return this.admin;
124126
}
125127

128+
public Streams getStreams() {
129+
return this.streams;
130+
}
131+
126132
public Ssl getSsl() {
127133
return this.ssl;
128134
}
@@ -193,6 +199,19 @@ public Map<String, Object> buildAdminProperties() {
193199
return properties;
194200
}
195201

202+
/**
203+
* Create an initial map of streams properties from the state of this instance.
204+
* <p>
205+
* This allows you to add additional properties, if necessary.
206+
* @return the streams properties initialized with the customizations defined on this
207+
* instance
208+
*/
209+
public Map<String, Object> buildStreamsProperties() {
210+
Map<String, Object> properties = buildCommonProperties();
211+
properties.putAll(this.streams.buildProperties());
212+
return properties;
213+
}
214+
196215
public static class Consumer {
197216

198217
private final Ssl ssl = new Ssl();
@@ -211,7 +230,7 @@ public static class Consumer {
211230

212231
/**
213232
* Comma-delimited list of host:port pairs to use for establishing the initial
214-
* connection to the Kafka cluster.
233+
* connection to the Kafka cluster. Overrides the global property, for consumers.
215234
*/
216235
private List<String> bootstrapServers;
217236

@@ -421,7 +440,7 @@ public static class Producer {
421440

422441
/**
423442
* Comma-delimited list of host:port pairs to use for establishing the initial
424-
* connection to the Kafka cluster.
443+
* connection to the Kafka cluster. Overrides the global property, for producers.
425444
*/
426445
private List<String> bootstrapServers;
427446

@@ -631,6 +650,136 @@ public Map<String, Object> buildProperties() {
631650

632651
}
633652

653+
/**
654+
* High (and some medium) priority Streams properties and a general properties bucket.
655+
*/
656+
public static class Streams {
657+
658+
private final Ssl ssl = new Ssl();
659+
660+
/**
661+
* Kafka streams application.id property; default spring.application.name.
662+
*/
663+
private String applicationId;
664+
665+
/**
666+
* Whether or not to auto-start the streams factory bean.
667+
*/
668+
private boolean autoStartup = true;
669+
670+
/**
671+
* Comma-delimited list of host:port pairs to use for establishing the initial
672+
* connection to the Kafka cluster. Overrides the global property, for streams.
673+
*/
674+
private List<String> bootstrapServers;
675+
676+
/**
677+
* Maximum number of memory bytes to be used for buffering across all threads.
678+
*/
679+
private Integer cacheMaxBytesBuffering;
680+
681+
/**
682+
* ID to pass to the server when making requests. Used for server-side logging.
683+
*/
684+
private String clientId;
685+
686+
/**
687+
* The replication factor for change log topics and repartition topics created by
688+
* the stream processing application.
689+
*/
690+
private Integer replicationFactor;
691+
692+
/**
693+
* Directory location for the state store.
694+
*/
695+
private String stateDir;
696+
697+
/**
698+
* Additional Kafka properties used to configure the streams.
699+
*/
700+
private final Map<String, String> properties = new HashMap<>();
701+
702+
public Ssl getSsl() {
703+
return this.ssl;
704+
}
705+
706+
public String getApplicationId() {
707+
return this.applicationId;
708+
}
709+
710+
public void setApplicationId(String applicationId) {
711+
this.applicationId = applicationId;
712+
}
713+
714+
public boolean isAutoStartup() {
715+
return this.autoStartup;
716+
}
717+
718+
public void setAutoStartup(boolean autoStartup) {
719+
this.autoStartup = autoStartup;
720+
}
721+
722+
public List<String> getBootstrapServers() {
723+
return this.bootstrapServers;
724+
}
725+
726+
public void setBootstrapServers(List<String> bootstrapServers) {
727+
this.bootstrapServers = bootstrapServers;
728+
}
729+
730+
public Integer getCacheMaxBytesBuffering() {
731+
return this.cacheMaxBytesBuffering;
732+
}
733+
734+
public void setCacheMaxBytesBuffering(Integer cacheMaxBytesBuffering) {
735+
this.cacheMaxBytesBuffering = cacheMaxBytesBuffering;
736+
}
737+
738+
public String getClientId() {
739+
return this.clientId;
740+
}
741+
742+
public void setClientId(String clientId) {
743+
this.clientId = clientId;
744+
}
745+
746+
public Integer getReplicationFactor() {
747+
return this.replicationFactor;
748+
}
749+
750+
public void setReplicationFactor(Integer replicationFactor) {
751+
this.replicationFactor = replicationFactor;
752+
}
753+
754+
public String getStateDir() {
755+
return this.stateDir;
756+
}
757+
758+
public void setStateDir(String stateDir) {
759+
this.stateDir = stateDir;
760+
}
761+
762+
public Map<String, String> getProperties() {
763+
return this.properties;
764+
}
765+
766+
public Map<String, Object> buildProperties() {
767+
Properties properties = new Properties();
768+
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
769+
map.from(this::getApplicationId).to(properties.in("application.id"));
770+
map.from(this::getBootstrapServers)
771+
.to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
772+
map.from(this::getCacheMaxBytesBuffering)
773+
.to(properties.in("cache.max.bytes.buffering"));
774+
map.from(this::getClientId)
775+
.to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
776+
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
777+
map.from(this::getStateDir).to(properties.in("state.dir"));
778+
return properties.with(this.ssl, this.properties);
779+
}
780+
781+
}
782+
634783
public static class Template {
635784

636785
/**
@@ -1011,6 +1160,7 @@ public void setOptions(Map<String, String> options) {
10111160

10121161
}
10131162

1163+
@SuppressWarnings("serial")
10141164
private static class Properties extends HashMap<String, Object> {
10151165

10161166
public <V> java.util.function.Consumer<V> in(String key) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.streams.StreamsBuilder;
22+
import org.apache.kafka.streams.StreamsConfig;
23+
24+
import org.springframework.beans.factory.InitializingBean;
25+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
26+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
27+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
28+
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.core.env.Environment;
32+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
33+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
34+
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
35+
36+
/**
37+
* Configuration for Kafka Streams annotation-driven support.
38+
*
39+
* @author Gary Russell
40+
* @author Stephane Nicoll
41+
*/
42+
@Configuration
43+
@ConditionalOnClass(StreamsBuilder.class)
44+
@ConditionalOnBean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
45+
class KafkaStreamsAnnotationDrivenConfiguration {
46+
47+
private final KafkaProperties properties;
48+
49+
KafkaStreamsAnnotationDrivenConfiguration(KafkaProperties properties) {
50+
this.properties = properties;
51+
}
52+
53+
@ConditionalOnMissingBean
54+
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
55+
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment) {
56+
Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
57+
if (this.properties.getStreams().getApplicationId() == null) {
58+
String applicationName = environment.getProperty("spring.application.name");
59+
if (applicationName != null) {
60+
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
61+
applicationName);
62+
}
63+
else {
64+
throw new InvalidConfigurationPropertyValueException(
65+
"spring.kafka.streams.application-id", null,
66+
"This property is mandatory and fallback 'spring.application.name' is not set either.");
67+
}
68+
}
69+
return new KafkaStreamsConfiguration(streamsProperties);
70+
}
71+
72+
@Bean
73+
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
74+
StreamsBuilderFactoryBean factoryBean) {
75+
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
76+
}
77+
78+
// Separate class required to avoid BeanCurrentlyInCreationException
79+
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
80+
81+
private final KafkaProperties properties;
82+
83+
private final StreamsBuilderFactoryBean factoryBean;
84+
85+
KafkaStreamsFactoryBeanConfigurer(KafkaProperties properties,
86+
StreamsBuilderFactoryBean factoryBean) {
87+
this.properties = properties;
88+
this.factoryBean = factoryBean;
89+
}
90+
91+
@Override
92+
public void afterPropertiesSet() {
93+
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
94+
}
95+
96+
}
97+
98+
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
import org.springframework.boot.test.util.TestPropertyValues;
2929
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3030
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.kafka.annotation.EnableKafkaStreams;
3133
import org.springframework.kafka.annotation.KafkaListener;
3234
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3335
import org.springframework.kafka.core.KafkaTemplate;
36+
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
3437
import org.springframework.kafka.support.KafkaHeaders;
3538
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
3639
import org.springframework.messaging.handler.annotation.Header;
@@ -41,6 +44,7 @@
4144
* Integration tests for {@link KafkaAutoConfiguration}.
4245
*
4346
* @author Gary Russell
47+
* @author Stephane Nicoll
4448
*/
4549
public class KafkaAutoConfigurationIntegrationTests {
4650

@@ -83,6 +87,14 @@ public void testEndToEnd() throws Exception {
8387
producer.close();
8488
}
8589

90+
@Test
91+
public void testStreams() {
92+
load(KafkaStreamsConfig.class, "spring.application.name:my-app",
93+
"spring.kafka.bootstrap-servers:" + getEmbeddedKafkaBrokersAsString());
94+
assertThat(this.context.getBean(StreamsBuilderFactoryBean.class).isAutoStartup())
95+
.isTrue();
96+
}
97+
8698
private void load(Class<?> config, String... environment) {
8799
this.context = doLoad(new Class<?>[] { config }, environment);
88100
}
@@ -101,7 +113,8 @@ private String getEmbeddedKafkaBrokersAsString() {
101113
return embeddedKafka.getEmbeddedKafka().getBrokersAsString();
102114
}
103115

104-
public static class KafkaConfig {
116+
@Configuration
117+
static class KafkaConfig {
105118

106119
@Bean
107120
public Listener listener() {
@@ -115,6 +128,12 @@ public NewTopic adminCreated() {
115128

116129
}
117130

131+
@Configuration
132+
@EnableKafkaStreams
133+
static class KafkaStreamsConfig {
134+
135+
}
136+
118137
public static class Listener {
119138

120139
private final CountDownLatch latch = new CountDownLatch(1);

0 commit comments

Comments
 (0)