diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 892bcc847fb67..ce734fbcda61b 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -89,6 +89,11 @@ public class SslConfigs { public static final String SSL_KEYSTORE_LOCATION_DOC = "The location of the key store file. " + "This is optional for client and can be used for two-way authentication for client."; + public static final String SSL_KEYSTORE_ALIAS_CONFIG = "ssl.keystore.alias"; + public static final String SSL_KEYSTORE_ALIAS_DOC = "This config is used to pick named alias from the keystore to build the SSL engine and authenticate the client with broker. " + + "This is an optional config and used only when you have multiple keys in the keystore and you need to control which key needs to be presented to server."; + + public static final String SSL_KEYSTORE_PASSWORD_CONFIG = "ssl.keystore.password"; public static final String SSL_KEYSTORE_PASSWORD_DOC = "The store password for the key store file. " + "This is optional for client and only needed if 'ssl.keystore.location' is configured. " @@ -142,6 +147,7 @@ public static void addClientSslSupport(ConfigDef config) { .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC) + .define(SslConfigs.SSL_KEYSTORE_ALIAS_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_ALIAS_DOC) .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC) .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC) .define(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_KEY_DOC) diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java index de23000215e60..999167e9bb8b4 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java @@ -26,10 +26,36 @@ import org.apache.kafka.common.security.auth.SslEngineFactory; import org.apache.kafka.common.utils.SecurityUtils; import org.apache.kafka.common.utils.Utils; - +import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Reconfigurable; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.Mode; + +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.security.PrivateKey; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedKeyManager; +import javax.net.ssl.X509KeyManager; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import javax.net.ssl.X509ExtendedKeyManager; +import javax.net.ssl.X509KeyManager; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -47,6 +73,8 @@ import java.security.cert.CertificateFactory; import java.security.spec.InvalidKeySpecException; import java.security.spec.PKCS8EncodedKeySpec; +import java.security.Principal; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -171,7 +199,7 @@ public void configure(Map configs) { (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG), (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)); - this.sslContext = createSSLContext(keystore, truststore); + this.sslContext = createSSLContext(keystore, truststore, configs); } @Override @@ -234,7 +262,7 @@ private static SecureRandom createSecureRandom(String key) { } } - private SSLContext createSSLContext(SecurityStore keystore, SecurityStore truststore) { + private SSLContext createSSLContext(SecurityStore keystore, SecurityStore truststore, Map configs) { try { SSLContext sslContext; if (provider != null) @@ -258,7 +286,7 @@ private SSLContext createSSLContext(SecurityStore keystore, SecurityStore trusts String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm(); TrustManager[] trustManagers = getTrustManagers(truststore, tmfAlgorithm); - sslContext.init(keyManagers, trustManagers, this.secureRandomImplementation); + sslContext.init(applyAliasToKM(keyManagers, (String)configs.get(SslConfigs.SSL_KEYSTORE_ALIAS_CONFIG)), trustManagers, this.secureRandomImplementation); log.debug("Created SSL context with keystore {}, truststore {}, provider {}.", keystore, truststore, sslContext.getProvider().getName()); return sslContext; @@ -267,6 +295,63 @@ private SSLContext createSSLContext(SecurityStore keystore, SecurityStore trusts } } + private KeyManager[] applyAliasToKM(KeyManager[] kms, final String alias) { + if(alias == null || alias.isEmpty()){ + return kms; + } + + log.info("Applying the custom KeyManagers for alias: {}", alias); + + KeyManager[] updatedKMs = new KeyManager[kms.length]; + + int i=0; + for(KeyManager km : kms){ + final X509KeyManager origKM = (X509KeyManager)km; + X509ExtendedKeyManager exKM = new X509ExtendedKeyManager() { + /* (non-Javadoc) + * @see javax.net.ssl.X509ExtendedKeyManager#chooseEngineClientAlias(java.lang.String[], java.security.Principal[], javax.net.ssl.SSLEngine) + */ + @Override + public String chooseEngineClientAlias(String[] arg0, + Principal[] arg1, SSLEngine arg2) { + return alias; + } + + @Override + public String[] getServerAliases(String arg0, Principal[] arg1) { + return origKM.getServerAliases(arg0, arg1); + } + + @Override + public PrivateKey getPrivateKey(String arg0) { + return origKM.getPrivateKey(arg0); + } + + @Override + public String[] getClientAliases(String arg0, Principal[] arg1) { + return origKM.getClientAliases(arg0, arg1); + } + + @Override + public X509Certificate[] getCertificateChain(String arg0) { + return origKM.getCertificateChain(arg0); + } + + @Override + public String chooseServerAlias(String arg0, Principal[] arg1, Socket arg2) { + return origKM.chooseServerAlias(arg0, arg1, arg2); + } + + @Override + public String chooseClientAlias(String[] arg0, Principal[] arg1, Socket arg2) { + return alias; + } + }; + updatedKMs[i++] = exKM; + } + return updatedKMs; + } + protected TrustManager[] getTrustManagers(SecurityStore truststore, String tmfAlgorithm) throws NoSuchAlgorithmException, KeyStoreException { TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm); KeyStore ts = truststore == null ? null : truststore.get(); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java index 75c1d6c0e7a6f..109e441d67b16 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactoryTest.java @@ -23,6 +23,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import javax.net.ssl.KeyManager; +import java.lang.reflect.Method; import java.security.KeyStore; import java.util.Arrays; import java.util.Collections; @@ -33,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; @SuppressWarnings("this-escape") public class DefaultSslEngineFactoryTest { @@ -324,6 +327,8 @@ private String pemFilePath(String pem) throws Exception { return TestUtils.tempFile(pem).getAbsolutePath(); } + + private Password pemAsConfigValue(String... pemValues) { StringBuilder builder = new StringBuilder(); for (String pem : pemValues) { @@ -332,4 +337,27 @@ private Password pemAsConfigValue(String... pemValues) { } return new Password(builder.toString().trim()); } + + @Test + void testApplyAliasToKM() throws Exception { + DefaultSslEngineFactory instance = new DefaultSslEngineFactory(); + // Mock KeyManager array + KeyManager mockKeyManager = mock(KeyManager.class); + KeyManager[] kms = new KeyManager[]{mockKeyManager}; + + // Define the alias + String alias = "testAlias"; + + // Use reflection to access the private method + Method method = DefaultSslEngineFactory.class.getDeclaredMethod("applyAliasToKM", KeyManager[].class, String.class); + method.setAccessible(true); + + // Invoke the method + KeyManager[] result = (KeyManager[]) method.invoke(instance, (Object) kms, alias); + + // Validate results (Modify based on actual method behavior) + assertNotNull(result); + assertEquals(1, result.length); + } + }