6
6
import org .apache .kafka .clients .producer .KafkaProducer ;
7
7
import org .apache .kafka .clients .producer .ProducerConfig ;
8
8
import org .apache .kafka .clients .producer .ProducerRecord ;
9
+ import org .apache .kafka .common .config .SaslConfigs ;
9
10
import org .apache .kafka .common .config .SslConfigs ;
10
11
import org .apache .kafka .common .security .auth .SecurityProtocol ;
11
12
import react .auth .Authenticator ;
17
18
import play .mvc .Security ;
18
19
import react .graphql .PlayQueryContext ;
19
20
21
+ import java .util .Arrays ;
22
+ import java .util .Collections ;
23
+ import java .util .List ;
20
24
import java .util .Properties ;
21
25
22
26
public class TrackingController extends Controller {
23
27
28
+ private static final List <String > KAFKA_SSL_PROTOCOLS = Collections .unmodifiableList (
29
+ Arrays .asList (SecurityProtocol .SSL .name (),SecurityProtocol .SASL_SSL .name ()));
30
+
24
31
private final Boolean _isEnabled ;
25
32
private final Config _config ;
26
33
private final KafkaProducer <String , String > _producer ;
@@ -81,7 +88,7 @@ private KafkaProducer createKafkaProducer() {
81
88
82
89
final String securityProtocolConfig = "analytics.kafka.security.protocol" ;
83
90
if (_config .hasPath (securityProtocolConfig )
84
- && _config .getString (securityProtocolConfig ). equals ( SecurityProtocol . SSL )) {
91
+ && KAFKA_SSL_PROTOCOLS . contains ( _config .getString (securityProtocolConfig ))) {
85
92
props .put (CommonClientConfigs .SECURITY_PROTOCOL_CONFIG , _config .getString (securityProtocolConfig ));
86
93
props .put (SslConfigs .SSL_KEY_PASSWORD_CONFIG , _config .getString ("analytics.kafka.ssl.key.password" ));
87
94
@@ -95,6 +102,11 @@ private KafkaProducer createKafkaProducer() {
95
102
96
103
props .put (SslConfigs .SSL_PROTOCOL_CONFIG , _config .getString ("analytics.kafka.ssl.protocol" ));
97
104
props .put (SslConfigs .SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG , _config .getString ("analytics.kafka.ssl.endpoint.identification.algorithm" ));
105
+
106
+ if (_config .getString (securityProtocolConfig ).equals (SecurityProtocol .SASL_SSL .name ())) {
107
+ props .put (SaslConfigs .SASL_MECHANISM , _config .getString ("analytics.kafka.sasl.mechanism" ));
108
+ props .put (SaslConfigs .SASL_JAAS_CONFIG , _config .getString ("analytics.kafka.sasl.jaas.config" ));
109
+ }
98
110
}
99
111
100
112
return new KafkaProducer (props );
0 commit comments