-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-18894: Add KIP-877 support for ConfigProvider #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Thanks for this patch, please use |
WalkthroughThis set of changes refactors the handling of configuration providers in the Kafka codebase by introducing the use of Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client Code (e.g. Worker)
participant Plugins as Plugins
participant Plugin as Plugin<ConfigProvider>
participant Provider as ConfigProvider
Client->>Plugins: newConfigProvider(config, providerName, classLoader, metrics)
Plugins->>Provider: Instantiate provider class
Plugins->>Provider: Configure with parameters
Plugins->>Provider: If Monitorable, call withPluginMetrics(metrics)
Plugins->>Plugin: Plugin.wrapInstance(provider, metrics, ...)
Plugins-->>Client: Return Plugin<ConfigProvider>
Client->>Plugin: get() (when needed)
Plugin-->>Client: Return ConfigProvider instance
Client->>Provider: Use provider methods (e.g., get, close)
Client->>Plugin: close() (on shutdown)
Poem
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.31.1)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.javaTip ⚡💬 Agentic Chat (Pro Plan, General Availability)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (5)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
109-109
: This line still uses Collections.singletonMap instead of Map.of.This line was not updated to use
Map.of()
like the other instances in the file.- ConfigTransformerResult result = configTransformer.transform(Collections.singletonMap(MY_KEY, null)); + ConfigTransformerResult result = configTransformer.transform(Map.of(MY_KEY, null));clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
98-105
: Cache the provider instance once per loop iterationInside the inner loop we repeatedly call
providerPlugin.get()
.
Although inexpensive, this does anAtomicReference#get()
each time and clutters the hot path.
Grabbing the provider once makes the intent clearer and avoids redundant indirections:-ConfigData configData = providerPlugin.get().get(path, keys); +ConfigProvider provider = providerPlugin.get(); +ConfigData configData = provider.get(path, keys);(NB: no functional change – purely readability & micro‑perf.)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
100-102
: Close the underlying providers, not just the plugin wrapper
Plugin
already delegatesclose()
to the wrapped object, so this works today, but
using the wrapper hides possible provider‑specific exceptions and makes stack‑traces
less obvious. Prefer closing the inner provider (and then the wrapper) or at least
logging the provider’sclazz()
to aid debugging.Not blocking – informational only.
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
625-633
: Pass the actual class‑loader toPlugin.wrapInstance
Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG)
recordsnull
as the plugin’s class‑loader.
Some subsystems (e.g. metrics tagging and class‑loader leak detection) rely on
that field; keeping itnull
forfeits that information.-Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); +Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance( + provider, + provider.getClass().getClassLoader(), + CONFIG_PROVIDERS_CONFIG);This is a one‑liner, keeps behaviour unchanged, and improves diagnostics.
550-560
: Avoid double close when callers already shut the plugins
resolveConfigVariables
closes every provider plugin immediately after use.
Higher‑level callers (e.g.WorkerConfigTransformer
) also close the same
plugins in their ownclose()
methods, which is safe but redundant.Not urgent, yet trimming the extra
Utils.closeQuietly
here reduces log noise
and makes the lifecycle simpler.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
(4 hunks)clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
(3 hunks)clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java
(1 hunks)clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java
(5 hunks)clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java
(1 hunks)connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
(2 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
(1 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
(3 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
(3 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
Plugins
(59-696)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin
(33-92)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics
(6299-6301)
🔇 Additional comments (29)
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)
32-35
: Documentation update for metrics support is clear and comprehensive.The added JavaDoc clearly explains how to implement the
Monitorable
interface for enabling metrics in config providers. It also specifies the three tags that will be automatically added to all metrics registered by the config provider:config
,class
, andprovider
. This documentation complements the broader changes for KIP-877 support.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (4)
19-21
: Import for Plugin class added correctly.The
Plugin
import is necessary for the new approach of wrapping config providers.
69-69
: Config provider instance is now properly wrapped with Plugin.wrapInstance.The test setup now correctly wraps the
TestConfigProvider
instance in aPlugin
wrapper, which aligns with the broader implementation changes for KIP-877 support. The provider is properly wrapped with the"config.providers"
category.
75-75
: Collections.singletonMap replaced with Map.of.This modernizes the code by using the newer, more concise
Map.of()
method instead of the olderCollections.singletonMap()
.Also applies to: 100-100, 115-115, 122-122
150-158
: Collections.singletonMap replaced with Map.of in test data methods.The implementation of the test data methods in
TestConfigProvider
has been updated to use the more modernMap.of()
API, which is a good cleanup.clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (3)
20-20
: Import for Plugin class added correctly.The
Plugin
import is needed for the new approach of wrapping config providers.
49-49
: Config provider instance is now properly wrapped with Plugin.wrapInstance.The test setup now correctly wraps the
TestConfigProvider
instance in aPlugin
wrapper, consistent with the implementation changes for KIP-877. The provider is properly wrapped with the"config.providers"
category.
54-54
: Collections.singletonMap replaced with Map.of in test methods.This modernizes the code by using the newer, more concise
Map.of()
method instead of the olderCollections.singletonMap()
.Also applies to: 63-63, 72-72, 79-79, 86-86, 93-93, 102-102
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (5)
26-26
: Import for Plugin class added correctly.The
Plugin
import is necessary for the new approach of wrapping config providers.
273-273
: Updated map type to use Plugin wrappers.The map type has been correctly updated from
Map<String, ConfigProvider>
toMap<String, Plugin<ConfigProvider>>
to reflect the new design that uses plugin wrappers.
275-282
: Updated to use the new Plugin-based ConfigProvider API.The code has been updated to use the new signature of
plugins.newConfigProvider()
which now:
- Takes the provider name directly instead of a concatenated config key
- Adds the
Plugins.ClassLoaderUsage.PLUGINS
parameter- Passes
null
for the metrics parameter (which is fine for this context)- Returns a
Plugin<ConfigProvider>
wrapper that's stored in the mapThis correctly implements the KIP-877 changes for ConfigProvider support.
283-283
: Updated ConfigTransformer constructor to use plugins map.The
ConfigTransformer
constructor is correctly updated to accept the map of Plugin-wrapped ConfigProviders.
285-285
: Updated cleanup to close plugin wrappers.The cleanup code now correctly closes the plugin wrappers instead of the raw providers. The
Utils.closeQuietly
method ensures that any exceptions during closure are properly handled.connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (3)
22-23
: New imports for Plugin wrapping supportThese imports are required for the KIP-877 implementation, which wraps ConfigProvider instances as Plugin objects to support metrics and lifecycle management.
632-633
: Method signature and parameter changes to support KIP-877The method now returns a
Plugin<ConfigProvider>
instead of a rawConfigProvider
and accepts a newMetrics
parameter. The parameter was also renamed fromproviderPrefix
toproviderName
for clarity, with the actual prefix being constructed inside the method.
649-649
:❌ Incorrect review comment
Add null check for metrics parameter
The metrics parameter can be null (as seen in tests), but there's no null check before using it in
Plugin.wrapInstance()
. This could lead to a NullPointerException if metrics is null in production code.- return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName)); + return Plugin.wrapInstance(plugin, metrics, WorkerConfig.CONFIG_PROVIDERS_CONFIG, Map.of("provider", providerName != null ? providerName : "unknown"));Check if
Plugin.wrapInstance
handles null metrics gracefully:
🏁 Script executed:
#!/bin/bash # Check if Plugin.wrapInstance handles null metrics rg -A 5 -B 5 "wrapInstance" --glob "*.java"Length of output: 42611
Plugin.wrapInstance already guards against null metrics
ThewrapInstance
implementation inclients/src/main/java/org/apache/kafka/common/internals/Plugin.java
explicitly checksmetrics != null
before using it, so passing a nullmetrics
parameter will not trigger a NullPointerException. The suggested diff is both unnecessary and addressesproviderName
rather than themetrics
parameter.No changes required.
Likely an incorrect or invalid review comment.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (4)
25-28
: Add imports for monitorable config provider supportAdded imports for the new MonitorableConfigProvider class and related metrics classes to test the KIP-877 implementation.
381-402
: Updated test to handle Plugin-wrapped ConfigProviderThe test has been correctly updated to work with the new Plugin-wrapped ConfigProvider return type. Parameter renaming from
providerPrefix
toproviderName
aligns with the method signature change.
404-412
: Test for metrics integration with config providersThis new test verifies an important aspect of KIP-877 - that the
withPluginMetrics
method is called after configuration when creating a config provider. This ensures metrics are properly registered.
810-816
: Test implementation for monitorable config providerThe CustomMonitorableConfigProvider class provides a minimal implementation to verify that withPluginMetrics is called after configure. The assertion checks that the configured flag is true when withPluginMetrics is called.
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (3)
19-28
: Imports for the MonitorableConfigProvider implementationThe imports correctly include all the necessary classes for implementing both ConfigProvider and Monitorable interfaces.
29-38
: Implementation of Monitorable interfaceThe class implements the Monitorable interface which is key for KIP-877. The withPluginMetrics method registers a simple metric with a constant value, which is sufficient for testing purposes.
40-57
: ConfigProvider implementationThe ConfigProvider implementation is minimal but sufficient for testing. It tracks configuration state with the
configured
flag that is used by tests to verify the order of operations.connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (6)
40-46
: New imports added for KIP-877 support.These imports are necessary to support the new functionality for wrapping ConfigProviders in Plugin instances and monitoring them via metrics. This aligns with the KIP-877 implementation goal stated in the PR.
382-385
: Modernized code with Map.of() instead of Collections.singletonMap().Great update to use the more modern
Map.of()
method instead ofCollections.singletonMap()
, which aligns with the code cleanup goal mentioned in the PR objectives.
379-390
: Updated mockFileConfigProvider to use Plugin wrapper.The method now properly wraps the MockFileConfigProvider in a Plugin wrapper and correctly mocks the plugin system to expect a provider name and Metrics argument, returning the wrapped plugin instance. This change supports the KIP-877 implementation for ConfigProvider.
2904-2927
: Added test for monitorable config providers with metrics integration.This new test validates that the Worker correctly registers metrics for monitorable config providers. It properly sets up the configuration with two providers, mocks the plugin system to return wrapped instances with metrics, and verifies the expected metrics are registered in the worker's metrics registry.
2929-2935
: Added helper method for constructing expected metric tags.Clean implementation of a helper method to create the expected tags map with a consistent structure. This improves test readability and maintainability.
2937-2950
: Added helper method for verifying metrics in the registry.Well-implemented helper method that counts and asserts metrics in the worker's metrics registry that match the expected tags. The verification includes checking both the metric name and description against the expected values from MonitorableConfigProvider.
private WorkerConfigTransformer initConfigTransformer() { | ||
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG); | ||
Map<String, ConfigProvider> providerMap = new HashMap<>(); | ||
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>(); | ||
for (String providerName : providerNames) { | ||
ConfigProvider configProvider = plugins.newConfigProvider( | ||
config, | ||
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName, | ||
ClassLoaderUsage.PLUGINS | ||
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | ||
config, | ||
providerName, | ||
ClassLoaderUsage.PLUGINS, | ||
metrics.metrics() | ||
); | ||
providerMap.put(providerName, configProvider); | ||
providerPluginMap.put(providerName, configProviderPlugin); | ||
} | ||
return new WorkerConfigTransformer(this, providerMap); | ||
return new WorkerConfigTransformer(this, providerPluginMap); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard against resource leaks when provider instantiation fails
plugins.newConfigProvider(...)
may throw (e.g. bad class‑name, constructor error).
If that happens after a few providers are already created, those plugin instances stay open because initConfigTransformer()
aborts before workerConfigTransformer
is constructed and the close()
call that normally releases them is never reached.
Consider wrapping the loop in a try/catch that closes any already‑created plugins before re‑throwing, e.g.:
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>();
try {
for (String providerName : providerNames) {
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider(
config, providerName, ClassLoaderUsage.PLUGINS, metrics.metrics());
providerPluginMap.put(providerName, configProviderPlugin);
}
} catch (Throwable t) {
+ providerPluginMap.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin"));
throw t;
}
return new WorkerConfigTransformer(this, Collections.unmodifiableMap(providerPluginMap));
This keeps the constructor exception‑safe and prevents dangling threads/metrics.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private WorkerConfigTransformer initConfigTransformer() { | |
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG); | |
Map<String, ConfigProvider> providerMap = new HashMap<>(); | |
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>(); | |
for (String providerName : providerNames) { | |
ConfigProvider configProvider = plugins.newConfigProvider( | |
config, | |
WorkerConfig.CONFIG_PROVIDERS_CONFIG + "." + providerName, | |
ClassLoaderUsage.PLUGINS | |
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
config, | |
providerName, | |
ClassLoaderUsage.PLUGINS, | |
metrics.metrics() | |
); | |
providerMap.put(providerName, configProvider); | |
providerPluginMap.put(providerName, configProviderPlugin); | |
} | |
return new WorkerConfigTransformer(this, providerMap); | |
return new WorkerConfigTransformer(this, providerPluginMap); | |
} | |
private WorkerConfigTransformer initConfigTransformer() { | |
final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG); | |
Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>(); | |
try { | |
for (String providerName : providerNames) { | |
Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( | |
config, | |
providerName, | |
ClassLoaderUsage.PLUGINS, | |
metrics.metrics() | |
); | |
providerPluginMap.put(providerName, configProviderPlugin); | |
} | |
} catch (Throwable t) { | |
providerPluginMap.values().forEach(p -> | |
Utils.closeQuietly(p, "config provider plugin")); | |
throw t; | |
} | |
return new WorkerConfigTransformer( | |
this, | |
Collections.unmodifiableMap(providerPluginMap) | |
); | |
} |
Add KIP-877 support for
ConfigProvider.
Cleanup: Collections.singletonMap() -> Map.of()
Jira: https://issues.apache.org/jira/browse/KAFKA-18894
Summary by CodeRabbit