From 563f3472c54cd6a5020627fa3692750f7504fd60 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Mon, 7 Apr 2025 12:17:55 +0800 Subject: [PATCH 01/10] KAFKA-18894: Add KIP-877 support for ConfigProvider --- .../kafka/common/config/AbstractConfig.java | 22 +++-- .../common/config/ConfigTransformer.java | 12 ++- .../config/provider/ConfigProvider.java | 4 + .../apache/kafka/common/internals/Plugin.java | 6 ++ .../common/config/ConfigTransformerTest.java | 17 ++-- .../connect/mirror/MirrorMakerConfig.java | 16 +-- .../apache/kafka/connect/runtime/Worker.java | 15 +-- .../runtime/WorkerConfigTransformer.java | 11 ++- .../connect/runtime/isolation/Plugins.java | 7 +- .../runtime/WorkerConfigTransformerTest.java | 20 ++-- .../kafka/connect/runtime/WorkerTest.java | 98 ++++++++++++++++++- .../runtime/isolation/PluginsTest.java | 17 ++-- 12 files changed, 181 insertions(+), 64 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 9188b1687a808..bb6d51e82c9ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -546,16 +547,16 @@ private Map extractPotentialVariables(Map configMap) { configProperties = configProviderProps; classNameFilter = ignored -> true; } - Map providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); + Map> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); - if (!providers.isEmpty()) { - ConfigTransformer configTransformer = new ConfigTransformer(providers); + if (!providerPlugins.isEmpty()) { + ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins); ConfigTransformerResult result = configTransformer.transform(indirectVariables); if (!result.data().isEmpty()) { resolvedOriginals.putAll(result.data()); } } - providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider")); return new ResolvingMap<>(resolvedOriginals, originals); } @@ -587,14 +588,14 @@ private Map configProviderProperties(String configProviderPrefix * config.providers : A comma-separated list of names for providers. * config.providers.{name}.class : The Java class name for a provider. * config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization. - * returns a map of config provider name and its instance. + * returns a map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}. * * @param indirectConfigs The map of potential variable configs * @param providerConfigProperties The map of config provider configs * @param classNameFilter Filter for config provider class names - * @return map of config provider name and its instance. + * @return map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}. */ - private Map instantiateConfigProviders( + private Map> instantiateConfigProviders( Map indirectConfigs, Map providerConfigProperties, Predicate classNameFilter @@ -620,21 +621,22 @@ private Map instantiateConfigProviders( } } // Instantiate Config Providers - Map configProviderInstances = new HashMap<>(); + Map> configProviderPluginInstances = new HashMap<>(); for (Map.Entry entry : providerMap.entrySet()) { try { String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM; Map configProperties = configProviderProperties(prefix, providerConfigProperties); ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); provider.configure(configProperties); - configProviderInstances.put(entry.getKey(), provider); + Plugin providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); + configProviderPluginInstances.put(entry.getKey(), providerPlugin); } catch (ClassNotFoundException e) { log.error("Could not load config provider class {}", entry.getValue(), e); throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies"); } } - return configProviderInstances; + return configProviderPluginInstances; } private static String providerClassProperty(String providerName) { diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java index dbf6c7bbfcec1..b4659a07da274 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.provider.FileConfigProvider; +import org.apache.kafka.common.internals.Plugin; import java.util.ArrayList; import java.util.HashMap; @@ -56,15 +57,16 @@ public class ConfigTransformer { public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}"); private static final String EMPTY_PATH = ""; - private final Map configProviders; + private final Map> configProviderPlugins; /** * Creates a ConfigTransformer with the default pattern, of the form ${provider:[path:]key}. * - * @param configProviders a Map of provider names and {@link ConfigProvider} instances. + * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances, where each instance + * is wrapped in a {@link org.apache.kafka.common.internals.Plugin}. */ - public ConfigTransformer(Map configProviders) { - this.configProviders = configProviders; + public ConfigTransformer(Map> configProviderPlugins) { + this.configProviderPlugins = configProviderPlugins; } /** @@ -94,7 +96,7 @@ public ConfigTransformerResult transform(Map configs) { Map ttls = new HashMap<>(); for (Map.Entry>> entry : keysByProvider.entrySet()) { String providerName = entry.getKey(); - ConfigProvider provider = configProviders.get(providerName); + ConfigProvider provider = configProviderPlugins.get(providerName).get(); Map> keysByPath = entry.getValue(); if (provider != null && keysByPath != null) { for (Map.Entry> pathWithKeys : keysByPath.entrySet()) { diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java index 81f0aac0d72d1..abe12237ca3e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java @@ -29,6 +29,10 @@ *

Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism. * To support this, implementations of this interface should also contain a service provider configuration file in * {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}. + *

Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics. + * The following tags are automatically added to all metrics registered: config set to + * config.providers, class set to the ConfigProvider class name, + * and provider set to the provider name. */ public interface ConfigProvider extends Configurable, Closeable { diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java index 620cd0c07ec0f..587f9b74a8d91 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java @@ -44,6 +44,12 @@ public static Plugin wrapInstance(T instance, Metrics metrics, String key return wrapInstance(instance, metrics, () -> tags(key, instance)); } + public static Plugin wrapInstance(T instance, Metrics metrics, String key, Map extraTags) { + Map tags = tags(key, instance); + tags.putAll(extraTags); + return wrapInstance(instance, metrics, () -> tags); + } + private static Map tags(String key, T instance) { Map tags = new LinkedHashMap<>(); tags.put("config", key); diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java index 3a5ec650fab42..b9ced57b99a40 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.config; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,12 +46,12 @@ public class ConfigTransformerTest { @BeforeEach public void setup() { - configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider())); + configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"))); } @Test public void testReplaceVariable() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}")); Map data = result.data(); Map ttls = result.ttls(); assertEquals(TEST_RESULT, data.get(MY_KEY)); @@ -59,7 +60,7 @@ public void testReplaceVariable() { @Test public void testReplaceVariableWithTTL() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}")); Map data = result.data(); Map ttls = result.ttls(); assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY)); @@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() { @Test public void testReplaceMultipleVariablesInValue() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!")); Map data = result.data(); assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY)); } @Test public void testNoReplacement() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}")); Map data = result.data(); assertEquals("${test:testPath:missingKey}", data.get(MY_KEY)); } @Test public void testSingleLevelOfIndirection() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}")); Map data = result.data(); assertEquals("${test:testPath:testResult}", data.get(MY_KEY)); } @Test public void testReplaceVariableNoPath() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}")); Map data = result.data(); Map ttls = result.ttls(); assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY)); @@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() { @Test public void testReplaceMultipleVariablesWithoutPathInValue() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}")); Map data = result.data(); assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY)); } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index aba62cf8464ff..9bc197af7d8a9 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -269,18 +270,19 @@ List configProviders() { Map transform(Map props) { // transform worker config according to config.providers List providerNames = configProviders(); - Map providers = new HashMap<>(); + Map> providerPlugins = new HashMap<>(); for (String name : providerNames) { - ConfigProvider configProvider = plugins.newConfigProvider( + Plugin configProviderPlugin = plugins.newConfigProvider( this, - CONFIG_PROVIDERS_CONFIG + "." + name, - Plugins.ClassLoaderUsage.PLUGINS + name, + Plugins.ClassLoaderUsage.PLUGINS, + null ); - providers.put(name, configProvider); + providerPlugins.put(name, configProviderPlugin); } - ConfigTransformer transformer = new ConfigTransformer(providers); + ConfigTransformer transformer = new ConfigTransformer(providerPlugins); Map transformed = transformer.transform(props).data(); - providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider")); return transformed; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 4d5b23b63a826..fc0d38521aa35 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -209,16 +209,17 @@ public Worker( private WorkerConfigTransformer initConfigTransformer() { final List providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG); - Map providerMap = new HashMap<>(); + Map> providerPluginMap = new HashMap<>(); for (String providerName : providerNames) { - ConfigProvider configProvider = plugins.newConfigProvider( - config, - WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName, - ClassLoaderUsage.PLUGINS + Plugin configProviderPlugin = plugins.newConfigProvider( + config, + providerName, + ClassLoaderUsage.PLUGINS, + metrics.metrics() ); - providerMap.put(providerName, configProvider); + providerPluginMap.put(providerName, configProviderPlugin); } - return new WorkerConfigTransformer(this, providerMap); + return new WorkerConfigTransformer(this, providerPluginMap); } public WorkerConfigTransformer configTransformer() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index df16955c941a2..35dfb0a6ad36a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.ConfigTransformerResult; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction; import org.apache.kafka.connect.util.Callback; @@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable { private final Worker worker; private final ConfigTransformer configTransformer; private final ConcurrentMap> requests = new ConcurrentHashMap<>(); - private final Map configProviders; + private final Map> configProviderPlugins; - public WorkerConfigTransformer(Worker worker, Map configProviders) { + public WorkerConfigTransformer(Worker worker, Map> configProviderPlugins) { this.worker = worker; - this.configProviders = configProviders; - this.configTransformer = new ConfigTransformer(configProviders); + this.configProviderPlugins = configProviderPlugins; + this.configTransformer = new ConfigTransformer(configProviderPlugins); } public Map transform(Map configs) { @@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) { @Override public void close() { - configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider")); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 98f33ea582bf0..2574732d24e57 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.Connector; @@ -627,7 +629,8 @@ private U newVersionedPlugin( return plugin; } - public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) { + public Plugin newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) { + String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; String classPropertyName = providerPrefix + ".class"; Map originalConfig = config.originalsStrings(); if (!originalConfig.containsKey(classPropertyName)) { @@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) { plugin.configure(configProviderConfig); } - return plugin; + return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java index c3a8f151750ec..e099e2cabce53 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +28,6 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -66,13 +66,13 @@ public class WorkerConfigTransformerTest { @BeforeEach public void setup() { - configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider())); + configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"))); } @Test public void testReplaceVariable() { // Execution - Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + Map result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKey}")); // Assertions assertEquals(TEST_RESULT, result.get(MY_KEY)); @@ -97,7 +97,7 @@ public void testReplaceVariableWithTTLAndScheduleRestart() { when(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId); // Execution - Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + Map result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}")); // Assertions assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); @@ -112,14 +112,14 @@ public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() { when(herder.restartConnector(eq(10L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId); // Execution - Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + Map result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}")); // Assertions assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); verify(herder).restartConnector(eq(1L), eq(MY_CONNECTOR), notNull()); // Execution - result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}")); + result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithLongerTTL}")); // Assertions assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY)); @@ -147,14 +147,14 @@ public ConfigData get(String path) { public ConfigData get(String path, Set keys) { if (path.equals(TEST_PATH)) { if (keys.contains(TEST_KEY)) { - return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT)); + return new ConfigData(Map.of(TEST_KEY, TEST_RESULT)); } else if (keys.contains(TEST_KEY_WITH_TTL)) { - return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); + return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) { - return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); + return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); } } - return new ConfigData(Collections.emptyMap()); + return new ConfigData(Map.of()); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 2f7af629f0665..b95a34c39cf09 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -36,12 +36,19 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.provider.MockFileConfigProvider; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Monitorable; +import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; @@ -91,6 +98,7 @@ import org.apache.maven.artifact.versioning.VersionRange; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.AdditionalAnswers; @@ -103,6 +111,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.quality.Strictness; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.Arrays; import java.util.Collection; @@ -110,6 +119,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -373,10 +383,14 @@ public void testStartAndStopConnector(boolean enableTopicCreation) throws Throwa private void mockFileConfigProvider() { MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider(); mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId)); + Plugin providerPlugin = Plugin.wrapInstance(mockFileConfigProvider, + null, + WorkerConfig.CONFIG_PROVIDERS_CONFIG, + Map.of("provider", "file")); when(plugins.newConfigProvider(any(AbstractConfig.class), - eq("config.providers.file"), - any(ClassLoaderUsage.class))) - .thenReturn(mockFileConfigProvider); + eq("file"), + any(ClassLoaderUsage.class), + any(Metrics.class))).thenReturn(providerPlugin); } @ParameterizedTest @@ -2891,6 +2905,56 @@ private void testStartTaskWithTooManyTaskConfigs(boolean enforced) { } } + @Test + public void testMonitorableConfigProvider() { + setup(false); + Map props = new HashMap<>(this.workerProps); + props.put("config.providers", "monitorable,monitorable2"); + props.put("config.providers.monitorable.class", MonitorableConfigProvider.class.getName()); + props.put("config.providers.monitorable2.class", MonitorableConfigProvider.class.getName()); + config = new StandaloneConfig(props); + mockKafkaClusterId(); + when(plugins.newConfigProvider(any(AbstractConfig.class), any(String.class), any(ClassLoaderUsage.class), any(Metrics.class))) + .thenAnswer(invocation -> { + String providerName = invocation.getArgument(1); + Metrics metrics = invocation.getArgument(3); + return Plugin.wrapInstance(new MonitorableConfigProvider(), metrics, + WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); + }); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + Metrics metrics = worker.metrics().metrics(); + assertMetrics(metrics, + 1, + expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable"))); + assertMetrics(metrics, + 1, + expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2"))); + } + + private static Map expectedTags(String config, String clazz, Map extraTags) { + Map tags = new LinkedHashMap<>(); + tags.put("config", config); + tags.put("class", clazz); + tags.putAll(extraTags); + return tags; + } + + private void assertMetrics(Metrics metrics, int expected, Map expectedTags) { + int found = 0; + for (MetricName metricName : metrics.metrics().keySet()) { + if (metricName.group().equals("plugins")) { + Map tags = metricName.tags(); + if (expectedTags.equals(tags)) { + assertEquals(MonitorableConfigProvider.NAME, metricName.name()); + assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description()); + found++; + } + } + } + assertEquals(expected, found); + } + private void assertTasksMaxExceededMessage(String connector, int numTasks, int maxTasks, String message) { String expectedPrefix = "The connector " + connector + " has generated " @@ -3217,4 +3281,32 @@ public void stop() { } + public static class MonitorableConfigProvider implements ConfigProvider, Monitorable { + private static final String NAME = "name"; + private static final String DESCRIPTION = "description"; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of()); + metrics.addMetric(metricName, (Measurable) (config, now) -> 123); + } + + @Override + public ConfigData get(String path) { + return null; + } + + @Override + public ConfigData get(String path, Set keys) { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index ca4c29931d088..6ec2ce9677dc3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; @@ -374,7 +375,8 @@ public void newConverterShouldConfigureWithPluginClassLoader() { @Test public void newConfigProviderShouldConfigureWithPluginClassLoader() { - String providerPrefix = "some.provider"; + String providerName = "customProvider"; + String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; props.put(providerPrefix + ".class", TestPlugin.SAMPLING_CONFIG_PROVIDER.className()); PluginClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className()); @@ -383,16 +385,17 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() { createConfig(); } - ConfigProvider plugin = plugins.newConfigProvider( + Plugin plugin = plugins.newConfigProvider( config, - providerPrefix, - ClassLoaderUsage.PLUGINS + providerName, + ClassLoaderUsage.PLUGINS, + null ); - assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); - Map samples = ((SamplingTestPlugin) plugin).flatten(); + assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples"); + Map samples = ((SamplingTestPlugin) plugin.get()).flatten(); assertTrue(samples.containsKey("configure")); - assertPluginClassLoaderAlwaysActive(plugin); + assertPluginClassLoaderAlwaysActive(plugin.get()); } @Test From 08dff599d976d17c89a23c9c32b134561cb7cf9f Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 01:52:45 +0800 Subject: [PATCH 02/10] spotlessApply --- .../test/java/org/apache/kafka/connect/runtime/WorkerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index b95a34c39cf09..50081409f4a84 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -118,8 +118,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; From 9e221e30c29690b2359e4d4fe3cb28952910e01f Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 02:52:17 +0800 Subject: [PATCH 03/10] Update msg in Utils.closeQuietly --- .../java/org/apache/kafka/common/config/AbstractConfig.java | 2 +- .../java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java | 2 +- .../apache/kafka/connect/runtime/WorkerConfigTransformer.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index bb6d51e82c9ef..77921d7545a7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -556,7 +556,7 @@ private Map extractPotentialVariables(Map configMap) { resolvedOriginals.putAll(result.data()); } } - providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); return new ResolvingMap<>(resolvedOriginals, originals); } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index 9bc197af7d8a9..7afcee06bd9e5 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -282,7 +282,7 @@ Map transform(Map props) { } ConfigTransformer transformer = new ConfigTransformer(providerPlugins); Map transformed = transformer.transform(props).data(); - providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); return transformed; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index 35dfb0a6ad36a..596c0fb55e8ae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -98,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) { @Override public void close() { - configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); } } From 1dd8f41f953bb6dbca8aaf3b6519817ff311f868 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 03:26:43 +0800 Subject: [PATCH 04/10] Add a test to verify withPluginMetrics is called after configure --- .../runtime/isolation/PluginsTest.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 6ec2ce9677dc3..06d7844d071c8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -19,10 +19,14 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Monitorable; +import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; @@ -52,6 +56,7 @@ import org.junit.jupiter.api.Test; import java.io.File; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; @@ -398,6 +403,16 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() { assertPluginClassLoaderAlwaysActive(plugin.get()); } + @Test + public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { + String providerName = "monitorable"; + String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; + props.put(providerPrefix + ".class", MonitorableConfigProvider .class.getName()); + createConfig(); + Plugin plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics()); + assertInstanceOf(MonitorableConfigProvider.class, plugin.get()); + } + @Test public void newHeaderConverterShouldConfigureWithPluginClassLoader() { props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestPlugin.SAMPLING_HEADER_CONVERTER.className()); @@ -793,4 +808,33 @@ public void configure(Map configs) { super.configure(configs); } } + + public static class MonitorableConfigProvider implements ConfigProvider, Monitorable { + private boolean configured = false; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + assertTrue(configured); + } + + @Override + public ConfigData get(String path) { + return null; + } + + @Override + public ConfigData get(String path, Set keys) { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + configured = true; + } + } + } From 204c8a95d0bd67483afbd2732befdbabe8b450f2 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 03:57:03 +0800 Subject: [PATCH 05/10] Refactor the test --- .../provider/MonitorableConfigProvider.java | 58 +++++++++++++++++++ .../kafka/connect/runtime/WorkerTest.java | 35 +---------- .../runtime/isolation/PluginsTest.java | 30 ++-------- 3 files changed, 63 insertions(+), 60 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java new file mode 100644 index 0000000000000..e1759acc4e9fc --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Monitorable; +import org.apache.kafka.common.metrics.PluginMetrics; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public class MonitorableConfigProvider implements ConfigProvider, Monitorable { + public static final String NAME = "name"; + public static final String DESCRIPTION = "description"; + protected boolean configured = false; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of()); + metrics.addMetric(metricName, (Measurable) (config, now) -> 123); + } + + @Override + public ConfigData get(String path) { + return null; + } + + @Override + public ConfigData get(String path, Set keys) { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + configured = true; + } +} \ No newline at end of file diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 50081409f4a84..d065f0ffa9f70 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -36,19 +36,16 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.provider.MockFileConfigProvider; +import org.apache.kafka.common.config.provider.MonitorableConfigProvider; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.metrics.Monitorable; -import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; @@ -111,7 +108,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.quality.Strictness; -import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.Arrays; import java.util.Collection; @@ -3280,33 +3276,4 @@ public void stop() { } } - - public static class MonitorableConfigProvider implements ConfigProvider, Monitorable { - private static final String NAME = "name"; - private static final String DESCRIPTION = "description"; - - @Override - public void withPluginMetrics(PluginMetrics metrics) { - MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of()); - metrics.addMetric(metricName, (Measurable) (config, now) -> 123); - } - - @Override - public ConfigData get(String path) { - return null; - } - - @Override - public ConfigData get(String path, Set keys) { - return null; - } - - @Override - public void close() throws IOException { - } - - @Override - public void configure(Map configs) { - } - } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 06d7844d071c8..946467240c3de 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -19,13 +19,12 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.config.provider.MonitorableConfigProvider; import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Monitorable; import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.Utils; @@ -56,7 +55,6 @@ import org.junit.jupiter.api.Test; import java.io.File; -import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; @@ -407,10 +405,10 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() { public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { String providerName = "monitorable"; String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; - props.put(providerPrefix + ".class", MonitorableConfigProvider .class.getName()); + props.put(providerPrefix + ".class", CustomMonitorableConfigProvider .class.getName()); createConfig(); Plugin plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics()); - assertInstanceOf(MonitorableConfigProvider.class, plugin.get()); + assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get()); } @Test @@ -809,32 +807,12 @@ public void configure(Map configs) { } } - public static class MonitorableConfigProvider implements ConfigProvider, Monitorable { - private boolean configured = false; + public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider { @Override public void withPluginMetrics(PluginMetrics metrics) { assertTrue(configured); } - - @Override - public ConfigData get(String path) { - return null; - } - - @Override - public ConfigData get(String path, Set keys) { - return null; - } - - @Override - public void close() throws IOException { - } - - @Override - public void configure(Map configs) { - configured = true; - } } } From 96fd5a67e588eaf5ba56e6dc98d4b99ef8300117 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 05:37:58 +0800 Subject: [PATCH 06/10] Refactor --- .../test/java/org/apache/kafka/connect/runtime/WorkerTest.java | 2 -- .../org/apache/kafka/connect/runtime/isolation/PluginsTest.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index d065f0ffa9f70..4af8c0df96ae3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -2906,8 +2906,6 @@ public void testMonitorableConfigProvider() { setup(false); Map props = new HashMap<>(this.workerProps); props.put("config.providers", "monitorable,monitorable2"); - props.put("config.providers.monitorable.class", MonitorableConfigProvider.class.getName()); - props.put("config.providers.monitorable2.class", MonitorableConfigProvider.class.getName()); config = new StandaloneConfig(props); mockKafkaClusterId(); when(plugins.newConfigProvider(any(AbstractConfig.class), any(String.class), any(ClassLoaderUsage.class), any(Metrics.class))) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 946467240c3de..9c722402cc582 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -405,7 +405,7 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() { public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { String providerName = "monitorable"; String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; - props.put(providerPrefix + ".class", CustomMonitorableConfigProvider .class.getName()); + props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName()); createConfig(); Plugin plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics()); assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get()); From 6ef4001d0346d750b0a51052c5c852d20aec85e3 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 05:40:11 +0800 Subject: [PATCH 07/10] Add a newline at the end of the file --- .../kafka/common/config/provider/MonitorableConfigProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java index e1759acc4e9fc..ddef7eda59599 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java +++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java @@ -55,4 +55,4 @@ public void close() throws IOException { public void configure(Map configs) { configured = true; } -} \ No newline at end of file +} From 44ecfdb5c63f4f63d9e35d4ade9f7a787f400ee2 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 11:11:09 +0800 Subject: [PATCH 08/10] Fix NPE in transform method --- .../java/org/apache/kafka/common/config/AbstractConfig.java | 1 + .../org/apache/kafka/common/config/ConfigTransformer.java | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 77921d7545a7e..369c546af9919 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -624,6 +624,7 @@ private Map> instantiateConfigProviders( Map> configProviderPluginInstances = new HashMap<>(); for (Map.Entry entry : providerMap.entrySet()) { try { + System.out.println("Checking:" + entry); String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM; Map configProperties = configProviderProperties(prefix, providerConfigProperties); ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java index b4659a07da274..adf97c5aaad3b 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java @@ -96,13 +96,13 @@ public ConfigTransformerResult transform(Map configs) { Map ttls = new HashMap<>(); for (Map.Entry>> entry : keysByProvider.entrySet()) { String providerName = entry.getKey(); - ConfigProvider provider = configProviderPlugins.get(providerName).get(); + Plugin providerPlugin = configProviderPlugins.get(providerName); Map> keysByPath = entry.getValue(); - if (provider != null && keysByPath != null) { + if (providerPlugin != null && keysByPath != null) { for (Map.Entry> pathWithKeys : keysByPath.entrySet()) { String path = pathWithKeys.getKey(); Set keys = new HashSet<>(pathWithKeys.getValue()); - ConfigData configData = provider.get(path, keys); + ConfigData configData = providerPlugin.get().get(path, keys); Map data = configData.data(); Long ttl = configData.ttl(); if (ttl != null && ttl >= 0) { From e26a06f634d0eae0f57a2d47df2c12e91e362e2d Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Tue, 8 Apr 2025 11:13:58 +0800 Subject: [PATCH 09/10] Remove debug print --- .../main/java/org/apache/kafka/common/config/AbstractConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 369c546af9919..77921d7545a7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -624,7 +624,6 @@ private Map> instantiateConfigProviders( Map> configProviderPluginInstances = new HashMap<>(); for (Map.Entry entry : providerMap.entrySet()) { try { - System.out.println("Checking:" + entry); String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM; Map configProperties = configProviderProperties(prefix, providerConfigProperties); ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); From b92a2bb2ddf809bfa0455bfb237ea55bb70016f0 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Sat, 12 Apr 2025 10:02:08 +0800 Subject: [PATCH 10/10] Address m1a2st comments --- .../java/org/apache/kafka/common/config/AbstractConfig.java | 4 ++-- .../org/apache/kafka/common/config/ConfigTransformer.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 77921d7545a7e..19b5421d8ce59 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -588,12 +588,12 @@ private Map configProviderProperties(String configProviderPrefix * config.providers : A comma-separated list of names for providers. * config.providers.{name}.class : The Java class name for a provider. * config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization. - * returns a map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}. + * returns a map of config provider name and its instance. * * @param indirectConfigs The map of potential variable configs * @param providerConfigProperties The map of config provider configs * @param classNameFilter Filter for config provider class names - * @return map of config provider name and its instance wrapped in a {@link org.apache.kafka.common.internals.Plugin}. + * @return map of config provider name and its instance. */ private Map> instantiateConfigProviders( Map indirectConfigs, diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java index adf97c5aaad3b..cfb63a65f5394 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java @@ -62,8 +62,7 @@ public class ConfigTransformer { /** * Creates a ConfigTransformer with the default pattern, of the form ${provider:[path:]key}. * - * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances, where each instance - * is wrapped in a {@link org.apache.kafka.common.internals.Plugin}. + * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances. */ public ConfigTransformer(Map> configProviderPlugins) { this.configProviderPlugins = configProviderPlugins;