23
23
import org .apache .kafka .clients .CommonClientConfigs ;
24
24
import org .apache .kafka .clients .consumer .ConsumerConfig ;
25
25
import org .apache .kafka .clients .producer .ProducerConfig ;
26
+ import org .apache .kafka .common .config .SslConfigs ;
26
27
27
28
import org .springframework .beans .factory .ObjectProvider ;
28
29
import org .springframework .boot .autoconfigure .AutoConfiguration ;
32
33
import org .springframework .boot .autoconfigure .condition .ConditionalOnMissingBean ;
33
34
import org .springframework .boot .autoconfigure .condition .ConditionalOnProperty ;
34
35
import org .springframework .boot .autoconfigure .condition .ConditionalOnSingleCandidate ;
36
+ import org .springframework .boot .autoconfigure .kafka .KafkaConnectionDetails .Configuration ;
35
37
import org .springframework .boot .autoconfigure .kafka .KafkaProperties .Jaas ;
36
38
import org .springframework .boot .autoconfigure .kafka .KafkaProperties .Retry .Topic .Backoff ;
37
39
import org .springframework .boot .context .properties .EnableConfigurationProperties ;
38
40
import org .springframework .boot .context .properties .PropertyMapper ;
41
+ import org .springframework .boot .ssl .SslBundle ;
39
42
import org .springframework .boot .ssl .SslBundles ;
40
43
import org .springframework .context .annotation .Bean ;
41
44
import org .springframework .context .annotation .Import ;
54
57
import org .springframework .kafka .transaction .KafkaTransactionManager ;
55
58
import org .springframework .retry .backoff .BackOffPolicyBuilder ;
56
59
import org .springframework .retry .backoff .SleepingBackOffPolicy ;
60
+ import org .springframework .util .StringUtils ;
57
61
58
62
/**
59
63
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
@@ -84,8 +88,9 @@ public class KafkaAutoConfiguration {
84
88
85
89
@ Bean
86
90
@ ConditionalOnMissingBean (KafkaConnectionDetails .class )
87
- PropertiesKafkaConnectionDetails kafkaConnectionDetails (KafkaProperties properties ) {
88
- return new PropertiesKafkaConnectionDetails (properties );
91
+ PropertiesKafkaConnectionDetails kafkaConnectionDetails (KafkaProperties properties ,
92
+ ObjectProvider <SslBundles > sslBundles ) {
93
+ return new PropertiesKafkaConnectionDetails (properties , sslBundles .getIfAvailable ());
89
94
}
90
95
91
96
@ Bean
@@ -111,9 +116,9 @@ public LoggingProducerListener<Object, Object> kafkaProducerListener() {
111
116
112
117
@ Bean
113
118
@ ConditionalOnMissingBean (ConsumerFactory .class )
114
- public DefaultKafkaConsumerFactory <?, ?> kafkaConsumerFactory (KafkaConnectionDetails connectionDetails ,
115
- ObjectProvider <DefaultKafkaConsumerFactoryCustomizer > customizers , ObjectProvider < SslBundles > sslBundles ) {
116
- Map <String , Object > properties = this .properties .buildConsumerProperties (sslBundles . getIfAvailable () );
119
+ DefaultKafkaConsumerFactory <?, ?> kafkaConsumerFactory (KafkaConnectionDetails connectionDetails ,
120
+ ObjectProvider <DefaultKafkaConsumerFactoryCustomizer > customizers ) {
121
+ Map <String , Object > properties = this .properties .buildConsumerProperties ();
117
122
applyKafkaConnectionDetailsForConsumer (properties , connectionDetails );
118
123
DefaultKafkaConsumerFactory <Object , Object > factory = new DefaultKafkaConsumerFactory <>(properties );
119
124
customizers .orderedStream ().forEach ((customizer ) -> customizer .customize (factory ));
@@ -122,9 +127,9 @@ public LoggingProducerListener<Object, Object> kafkaProducerListener() {
122
127
123
128
@ Bean
124
129
@ ConditionalOnMissingBean (ProducerFactory .class )
125
- public DefaultKafkaProducerFactory <?, ?> kafkaProducerFactory (KafkaConnectionDetails connectionDetails ,
126
- ObjectProvider <DefaultKafkaProducerFactoryCustomizer > customizers , ObjectProvider < SslBundles > sslBundles ) {
127
- Map <String , Object > properties = this .properties .buildProducerProperties (sslBundles . getIfAvailable () );
130
+ DefaultKafkaProducerFactory <?, ?> kafkaProducerFactory (KafkaConnectionDetails connectionDetails ,
131
+ ObjectProvider <DefaultKafkaProducerFactoryCustomizer > customizers ) {
132
+ Map <String , Object > properties = this .properties .buildProducerProperties ();
128
133
applyKafkaConnectionDetailsForProducer (properties , connectionDetails );
129
134
DefaultKafkaProducerFactory <?, ?> factory = new DefaultKafkaProducerFactory <>(properties );
130
135
String transactionIdPrefix = this .properties .getProducer ().getTransactionIdPrefix ();
@@ -160,8 +165,8 @@ public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException
160
165
161
166
@ Bean
162
167
@ ConditionalOnMissingBean
163
- public KafkaAdmin kafkaAdmin (KafkaConnectionDetails connectionDetails , ObjectProvider < SslBundles > sslBundles ) {
164
- Map <String , Object > properties = this .properties .buildAdminProperties (sslBundles . getIfAvailable () );
168
+ KafkaAdmin kafkaAdmin (KafkaConnectionDetails connectionDetails ) {
169
+ Map <String , Object > properties = this .properties .buildAdminProperties (null );
165
170
applyKafkaConnectionDetailsForAdmin (properties , connectionDetails );
166
171
KafkaAdmin kafkaAdmin = new KafkaAdmin (properties );
167
172
KafkaProperties .Admin admin = this .properties .getAdmin ();
@@ -193,26 +198,26 @@ public RetryTopicConfiguration kafkaRetryTopicConfiguration(KafkaTemplate<?, ?>
193
198
194
199
private void applyKafkaConnectionDetailsForConsumer (Map <String , Object > properties ,
195
200
KafkaConnectionDetails connectionDetails ) {
196
- properties . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , connectionDetails .getConsumerBootstrapServers () );
197
- if (!( connectionDetails instanceof PropertiesKafkaConnectionDetails )) {
198
- properties . put ( CommonClientConfigs . SECURITY_PROTOCOL_CONFIG , "PLAINTEXT" );
199
- }
201
+ Configuration consumer = connectionDetails .getConsumer ( );
202
+ properties . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , consumer . getBootstrapServers ());
203
+ applySecurityProtocol ( properties , connectionDetails . getSecurityProtocol () );
204
+ applySslBundle ( properties , consumer . getSslBundle ());
200
205
}
201
206
202
207
private void applyKafkaConnectionDetailsForProducer (Map <String , Object > properties ,
203
208
KafkaConnectionDetails connectionDetails ) {
204
- properties . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , connectionDetails .getProducerBootstrapServers () );
205
- if (!( connectionDetails instanceof PropertiesKafkaConnectionDetails )) {
206
- properties . put ( CommonClientConfigs . SECURITY_PROTOCOL_CONFIG , "PLAINTEXT" );
207
- }
209
+ Configuration producer = connectionDetails .getProducer ( );
210
+ properties . put ( ProducerConfig . BOOTSTRAP_SERVERS_CONFIG , producer . getBootstrapServers ());
211
+ applySecurityProtocol ( properties , producer . getSecurityProtocol () );
212
+ applySslBundle ( properties , producer . getSslBundle ());
208
213
}
209
214
210
215
private void applyKafkaConnectionDetailsForAdmin (Map <String , Object > properties ,
211
216
KafkaConnectionDetails connectionDetails ) {
212
- properties . put ( CommonClientConfigs . BOOTSTRAP_SERVERS_CONFIG , connectionDetails .getAdminBootstrapServers () );
213
- if (!( connectionDetails instanceof PropertiesKafkaConnectionDetails )) {
214
- properties . put ( CommonClientConfigs . SECURITY_PROTOCOL_CONFIG , "PLAINTEXT" );
215
- }
217
+ Configuration admin = connectionDetails .getAdmin ( );
218
+ properties . put ( CommonClientConfigs . BOOTSTRAP_SERVERS_CONFIG , admin . getBootstrapServers ());
219
+ applySecurityProtocol ( properties , admin . getSecurityProtocol () );
220
+ applySslBundle ( properties , admin . getSslBundle ());
216
221
}
217
222
218
223
private static void setBackOffPolicy (RetryTopicConfigurationBuilder builder , Backoff retryTopicBackoff ) {
@@ -231,4 +236,17 @@ private static void setBackOffPolicy(RetryTopicConfigurationBuilder builder, Bac
231
236
}
232
237
}
233
238
239
+ static void applySslBundle (Map <String , Object > properties , SslBundle sslBundle ) {
240
+ if (sslBundle != null ) {
241
+ properties .put (SslConfigs .SSL_ENGINE_FACTORY_CLASS_CONFIG , SslBundleSslEngineFactory .class .getName ());
242
+ properties .put (SslBundle .class .getName (), sslBundle );
243
+ }
244
+ }
245
+
246
+ static void applySecurityProtocol (Map <String , Object > properties , String securityProtocol ) {
247
+ if (StringUtils .hasLength (securityProtocol )) {
248
+ properties .put (CommonClientConfigs .SECURITY_PROTOCOL_CONFIG , securityProtocol );
249
+ }
250
+ }
251
+
234
252
}
0 commit comments