diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 9188b1687a808..19b5421d8ce59 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; @@ -546,16 +547,16 @@ private Map extractPotentialVariables(Map configMap) { configProperties = configProviderProps; classNameFilter = ignored -> true; } - Map providers = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); + Map> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); - if (!providers.isEmpty()) { - ConfigTransformer configTransformer = new ConfigTransformer(providers); + if (!providerPlugins.isEmpty()) { + ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins); ConfigTransformerResult result = configTransformer.transform(indirectVariables); if (!result.data().isEmpty()) { resolvedOriginals.putAll(result.data()); } } - providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); return new ResolvingMap<>(resolvedOriginals, originals); } @@ -594,7 +595,7 @@ private Map configProviderProperties(String configProviderPrefix * @param classNameFilter Filter for config provider class names * @return map of config provider name and its instance. */ - private Map instantiateConfigProviders( + private Map> instantiateConfigProviders( Map indirectConfigs, Map providerConfigProperties, Predicate classNameFilter @@ -620,21 +621,22 @@ private Map instantiateConfigProviders( } } // Instantiate Config Providers - Map configProviderInstances = new HashMap<>(); + Map> configProviderPluginInstances = new HashMap<>(); for (Map.Entry entry : providerMap.entrySet()) { try { String prefix = CONFIG_PROVIDERS_CONFIG + "." + entry.getKey() + CONFIG_PROVIDERS_PARAM; Map configProperties = configProviderProperties(prefix, providerConfigProperties); ConfigProvider provider = Utils.newInstance(entry.getValue(), ConfigProvider.class); provider.configure(configProperties); - configProviderInstances.put(entry.getKey(), provider); + Plugin providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); + configProviderPluginInstances.put(entry.getKey(), providerPlugin); } catch (ClassNotFoundException e) { log.error("Could not load config provider class {}", entry.getValue(), e); throw new ConfigException(providerClassProperty(entry.getKey()), entry.getValue(), "Could not load config provider class or one of its dependencies"); } } - return configProviderInstances; + return configProviderPluginInstances; } private static String providerClassProperty(String providerName) { diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java index dbf6c7bbfcec1..cfb63a65f5394 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.provider.FileConfigProvider; +import org.apache.kafka.common.internals.Plugin; import java.util.ArrayList; import java.util.HashMap; @@ -56,15 +57,15 @@ public class ConfigTransformer { public static final Pattern DEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}"); private static final String EMPTY_PATH = ""; - private final Map configProviders; + private final Map> configProviderPlugins; /** * Creates a ConfigTransformer with the default pattern, of the form ${provider:[path:]key}. * - * @param configProviders a Map of provider names and {@link ConfigProvider} instances. + * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances. */ - public ConfigTransformer(Map configProviders) { - this.configProviders = configProviders; + public ConfigTransformer(Map> configProviderPlugins) { + this.configProviderPlugins = configProviderPlugins; } /** @@ -94,13 +95,13 @@ public ConfigTransformerResult transform(Map configs) { Map ttls = new HashMap<>(); for (Map.Entry>> entry : keysByProvider.entrySet()) { String providerName = entry.getKey(); - ConfigProvider provider = configProviders.get(providerName); + Plugin providerPlugin = configProviderPlugins.get(providerName); Map> keysByPath = entry.getValue(); - if (provider != null && keysByPath != null) { + if (providerPlugin != null && keysByPath != null) { for (Map.Entry> pathWithKeys : keysByPath.entrySet()) { String path = pathWithKeys.getKey(); Set keys = new HashSet<>(pathWithKeys.getValue()); - ConfigData configData = provider.get(path, keys); + ConfigData configData = providerPlugin.get().get(path, keys); Map data = configData.data(); Long ttl = configData.ttl(); if (ttl != null && ttl >= 0) { diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java index 81f0aac0d72d1..abe12237ca3e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java @@ -29,6 +29,10 @@ *

Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism. * To support this, implementations of this interface should also contain a service provider configuration file in * {@code META-INF/services/org.apache.kafka.common.config.provider.ConfigProvider}. + *

Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics. + * The following tags are automatically added to all metrics registered: config set to + * config.providers, class set to the ConfigProvider class name, + * and provider set to the provider name. */ public interface ConfigProvider extends Configurable, Closeable { diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java index 3a5ec650fab42..b9ced57b99a40 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.config; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,12 +46,12 @@ public class ConfigTransformerTest { @BeforeEach public void setup() { - configTransformer = new ConfigTransformer(Collections.singletonMap("test", new TestConfigProvider())); + configTransformer = new ConfigTransformer(Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"))); } @Test public void testReplaceVariable() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKey}")); Map data = result.data(); Map ttls = result.ttls(); assertEquals(TEST_RESULT, data.get(MY_KEY)); @@ -59,7 +60,7 @@ public void testReplaceVariable() { @Test public void testReplaceVariableWithTTL() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}")); Map data = result.data(); Map ttls = result.ttls(); assertEquals(TEST_RESULT_WITH_TTL, data.get(MY_KEY)); @@ -68,28 +69,28 @@ public void testReplaceVariableWithTTL() { @Test public void testReplaceMultipleVariablesInValue() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "hello, ${test:testPath:testKey}; goodbye, ${test:testPath:testKeyWithTTL}!!!")); Map data = result.data(); assertEquals("hello, testResult; goodbye, testResultWithTTL!!!", data.get(MY_KEY)); } @Test public void testNoReplacement() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:missingKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:missingKey}")); Map data = result.data(); assertEquals("${test:testPath:missingKey}", data.get(MY_KEY)); } @Test public void testSingleLevelOfIndirection() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testPath:testIndirection}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testPath:testIndirection}")); Map data = result.data(); assertEquals("${test:testPath:testResult}", data.get(MY_KEY)); } @Test public void testReplaceVariableNoPath() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "${test:testKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "${test:testKey}")); Map data = result.data(); Map ttls = result.ttls(); assertEquals(TEST_RESULT_NO_PATH, data.get(MY_KEY)); @@ -98,7 +99,7 @@ public void testReplaceVariableNoPath() { @Test public void testReplaceMultipleVariablesWithoutPathInValue() { - ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, "first ${test:testKey}; second ${test:testKey}")); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, "first ${test:testKey}; second ${test:testKey}")); Map data = result.data(); assertEquals("first testResultNoPath; second testResultNoPath", data.get(MY_KEY)); } diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java new file mode 100644 index 0000000000000..ddef7eda59599 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Monitorable; +import org.apache.kafka.common.metrics.PluginMetrics; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +public class MonitorableConfigProvider implements ConfigProvider, Monitorable { + public static final String NAME = "name"; + public static final String DESCRIPTION = "description"; + protected boolean configured = false; + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + MetricName metricName = metrics.metricName(NAME, DESCRIPTION, Map.of()); + metrics.addMetric(metricName, (Measurable) (config, now) -> 123); + } + + @Override + public ConfigData get(String path) { + return null; + } + + @Override + public ConfigData get(String path, Set keys) { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + configured = true; + } +} diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java index aba62cf8464ff..7afcee06bd9e5 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.WorkerConfig; @@ -269,18 +270,19 @@ List configProviders() { Map transform(Map props) { // transform worker config according to config.providers List providerNames = configProviders(); - Map providers = new HashMap<>(); + Map> providerPlugins = new HashMap<>(); for (String name : providerNames) { - ConfigProvider configProvider = plugins.newConfigProvider( + Plugin configProviderPlugin = plugins.newConfigProvider( this, - CONFIG_PROVIDERS_CONFIG + "." + name, - Plugins.ClassLoaderUsage.PLUGINS + name, + Plugins.ClassLoaderUsage.PLUGINS, + null ); - providers.put(name, configProvider); + providerPlugins.put(name, configProviderPlugin); } - ConfigTransformer transformer = new ConfigTransformer(providers); + ConfigTransformer transformer = new ConfigTransformer(providerPlugins); Map transformed = transformer.transform(props).data(); - providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); return transformed; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 4d5b23b63a826..fc0d38521aa35 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -209,16 +209,17 @@ public Worker( private WorkerConfigTransformer initConfigTransformer() { final List providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG); - Map providerMap = new HashMap<>(); + Map> providerPluginMap = new HashMap<>(); for (String providerName : providerNames) { - ConfigProvider configProvider = plugins.newConfigProvider( - config, - WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName, - ClassLoaderUsage.PLUGINS + Plugin configProviderPlugin = plugins.newConfigProvider( + config, + providerName, + ClassLoaderUsage.PLUGINS, + metrics.metrics() ); - providerMap.put(providerName, configProvider); + providerPluginMap.put(providerName, configProviderPlugin); } - return new WorkerConfigTransformer(this, providerMap); + return new WorkerConfigTransformer(this, providerPluginMap); } public WorkerConfigTransformer configTransformer() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java index df16955c941a2..596c0fb55e8ae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigTransformer; import org.apache.kafka.common.config.ConfigTransformerResult; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction; import org.apache.kafka.connect.util.Callback; @@ -42,12 +43,12 @@ public class WorkerConfigTransformer implements AutoCloseable { private final Worker worker; private final ConfigTransformer configTransformer; private final ConcurrentMap> requests = new ConcurrentHashMap<>(); - private final Map configProviders; + private final Map> configProviderPlugins; - public WorkerConfigTransformer(Worker worker, Map configProviders) { + public WorkerConfigTransformer(Worker worker, Map> configProviderPlugins) { this.worker = worker; - this.configProviders = configProviders; - this.configTransformer = new ConfigTransformer(configProviders); + this.configProviderPlugins = configProviderPlugins; + this.configTransformer = new ConfigTransformer(configProviderPlugins); } public Map transform(Map configs) { @@ -97,6 +98,6 @@ private void scheduleReload(String connectorName, String path, long ttl) { @Override public void close() { - configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider")); + configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 98f33ea582bf0..2574732d24e57 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.Connector; @@ -627,7 +629,8 @@ private U newVersionedPlugin( return plugin; } - public ConfigProvider newConfigProvider(AbstractConfig config, String providerPrefix, ClassLoaderUsage classLoaderUsage) { + public Plugin newConfigProvider(AbstractConfig config, String providerName, ClassLoaderUsage classLoaderUsage, Metrics metrics) { + String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; String classPropertyName = providerPrefix + ".class"; Map originalConfig = config.originalsStrings(); if (!originalConfig.containsKey(classPropertyName)) { @@ -643,7 +646,7 @@ public ConfigProvider newConfigProvider(AbstractConfig config, String providerPr try (LoaderSwap loaderSwap = safeLoaderSwapper().apply(plugin.getClass().getClassLoader())) { plugin.configure(configProviderConfig); } - return plugin; + return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java index c3a8f151750ec..e099e2cabce53 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.internals.Plugin; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -27,7 +28,6 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -66,13 +66,13 @@ public class WorkerConfigTransformerTest { @BeforeEach public void setup() { - configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider())); + configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"))); } @Test public void testReplaceVariable() { // Execution - Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + Map result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKey}")); // Assertions assertEquals(TEST_RESULT, result.get(MY_KEY)); @@ -97,7 +97,7 @@ public void testReplaceVariableWithTTLAndScheduleRestart() { when(herder.restartConnector(eq(1L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId); // Execution - Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + Map result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}")); // Assertions assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); @@ -112,14 +112,14 @@ public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() { when(herder.restartConnector(eq(10L), eq(MY_CONNECTOR), notNull())).thenReturn(requestId); // Execution - Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}")); + Map result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithTTL}")); // Assertions assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); verify(herder).restartConnector(eq(1L), eq(MY_CONNECTOR), notNull()); // Execution - result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithLongerTTL}")); + result = configTransformer.transform(MY_CONNECTOR, Map.of(MY_KEY, "${test:testPath:testKeyWithLongerTTL}")); // Assertions assertEquals(TEST_RESULT_WITH_LONGER_TTL, result.get(MY_KEY)); @@ -147,14 +147,14 @@ public ConfigData get(String path) { public ConfigData get(String path, Set keys) { if (path.equals(TEST_PATH)) { if (keys.contains(TEST_KEY)) { - return new ConfigData(Collections.singletonMap(TEST_KEY, TEST_RESULT)); + return new ConfigData(Map.of(TEST_KEY, TEST_RESULT)); } else if (keys.contains(TEST_KEY_WITH_TTL)) { - return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); + return new ConfigData(Map.of(TEST_KEY_WITH_TTL, TEST_RESULT_WITH_TTL), 1L); } else if (keys.contains(TEST_KEY_WITH_LONGER_TTL)) { - return new ConfigData(Collections.singletonMap(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); + return new ConfigData(Map.of(TEST_KEY_WITH_LONGER_TTL, TEST_RESULT_WITH_LONGER_TTL), 10L); } } - return new ConfigData(Collections.emptyMap()); + return new ConfigData(Map.of()); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 2f7af629f0665..4af8c0df96ae3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -37,10 +37,14 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.provider.MockFileConfigProvider; +import org.apache.kafka.common.config.provider.MonitorableConfigProvider; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.utils.LogCaptureAppender; @@ -91,6 +95,7 @@ import org.apache.maven.artifact.versioning.VersionRange; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.AdditionalAnswers; @@ -109,6 +114,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -373,10 +379,14 @@ public void testStartAndStopConnector(boolean enableTopicCreation) throws Throwa private void mockFileConfigProvider() { MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider(); mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId)); + Plugin providerPlugin = Plugin.wrapInstance(mockFileConfigProvider, + null, + WorkerConfig.CONFIG_PROVIDERS_CONFIG, + Map.of("provider", "file")); when(plugins.newConfigProvider(any(AbstractConfig.class), - eq("config.providers.file"), - any(ClassLoaderUsage.class))) - .thenReturn(mockFileConfigProvider); + eq("file"), + any(ClassLoaderUsage.class), + any(Metrics.class))).thenReturn(providerPlugin); } @ParameterizedTest @@ -2891,6 +2901,54 @@ private void testStartTaskWithTooManyTaskConfigs(boolean enforced) { } } + @Test + public void testMonitorableConfigProvider() { + setup(false); + Map props = new HashMap<>(this.workerProps); + props.put("config.providers", "monitorable,monitorable2"); + config = new StandaloneConfig(props); + mockKafkaClusterId(); + when(plugins.newConfigProvider(any(AbstractConfig.class), any(String.class), any(ClassLoaderUsage.class), any(Metrics.class))) + .thenAnswer(invocation -> { + String providerName = invocation.getArgument(1); + Metrics metrics = invocation.getArgument(3); + return Plugin.wrapInstance(new MonitorableConfigProvider(), metrics, + WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); + }); + + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); + Metrics metrics = worker.metrics().metrics(); + assertMetrics(metrics, + 1, + expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable"))); + assertMetrics(metrics, + 1, + expectedTags(WorkerConfig.CONFIG_PROVIDERS_CONFIG, MonitorableConfigProvider.class.getSimpleName(), Map.of("provider", "monitorable2"))); + } + + private static Map expectedTags(String config, String clazz, Map extraTags) { + Map tags = new LinkedHashMap<>(); + tags.put("config", config); + tags.put("class", clazz); + tags.putAll(extraTags); + return tags; + } + + private void assertMetrics(Metrics metrics, int expected, Map expectedTags) { + int found = 0; + for (MetricName metricName : metrics.metrics().keySet()) { + if (metricName.group().equals("plugins")) { + Map tags = metricName.tags(); + if (expectedTags.equals(tags)) { + assertEquals(MonitorableConfigProvider.NAME, metricName.name()); + assertEquals(MonitorableConfigProvider.DESCRIPTION, metricName.description()); + found++; + } + } + } + assertEquals(expected, found); + } + private void assertTasksMaxExceededMessage(String connector, int numTasks, int maxTasks, String message) { String expectedPrefix = "The connector " + connector + " has generated " @@ -3216,5 +3274,4 @@ public void stop() { } } - } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index ca4c29931d088..9c722402cc582 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -22,6 +22,10 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.config.provider.MonitorableConfigProvider; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.PluginMetrics; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.components.Versioned; @@ -374,7 +378,8 @@ public void newConverterShouldConfigureWithPluginClassLoader() { @Test public void newConfigProviderShouldConfigureWithPluginClassLoader() { - String providerPrefix = "some.provider"; + String providerName = "customProvider"; + String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; props.put(providerPrefix + ".class", TestPlugin.SAMPLING_CONFIG_PROVIDER.className()); PluginClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugin.SAMPLING_CONFIG_PROVIDER.className()); @@ -383,16 +388,27 @@ public void newConfigProviderShouldConfigureWithPluginClassLoader() { createConfig(); } - ConfigProvider plugin = plugins.newConfigProvider( + Plugin plugin = plugins.newConfigProvider( config, - providerPrefix, - ClassLoaderUsage.PLUGINS + providerName, + ClassLoaderUsage.PLUGINS, + null ); - assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); - Map samples = ((SamplingTestPlugin) plugin).flatten(); + assertInstanceOf(SamplingTestPlugin.class, plugin.get(), "Cannot collect samples"); + Map samples = ((SamplingTestPlugin) plugin.get()).flatten(); assertTrue(samples.containsKey("configure")); - assertPluginClassLoaderAlwaysActive(plugin); + assertPluginClassLoaderAlwaysActive(plugin.get()); + } + + @Test + public void newConfigProviderShouldCallWithPluginMetricsAfterConfigure() { + String providerName = "monitorable"; + String providerPrefix = WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName; + props.put(providerPrefix + ".class", CustomMonitorableConfigProvider.class.getName()); + createConfig(); + Plugin plugin = plugins.newConfigProvider(config, providerName, ClassLoaderUsage.PLUGINS, new Metrics()); + assertInstanceOf(CustomMonitorableConfigProvider.class, plugin.get()); } @Test @@ -790,4 +806,13 @@ public void configure(Map configs) { super.configure(configs); } } + + public static class CustomMonitorableConfigProvider extends MonitorableConfigProvider { + + @Override + public void withPluginMetrics(PluginMetrics metrics) { + assertTrue(configured); + } + } + }