-
Notifications
You must be signed in to change notification settings - Fork 0
KAFKA-19173: Add Feature
for "streams" group
#5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Add new StreamsGroupFeature, disabled by default, and add "streams" as default value to `group.coordinator.rebalance.protocols`.
WalkthroughThis update introduces support for the new "streams group" feature in Kafka, reflected across multiple modules. The changes add the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Broker
participant FeatureManager
Client->>Broker: Request API Versions
Broker->>FeatureManager: Get Supported Features
FeatureManager-->>Broker: List includes StreamsGroupVersion
Broker-->>Client: ApiVersionsResponse (includes StreamsGroupVersion details)
sequenceDiagram
participant Admin
participant Broker
participant GroupCoordinatorConfig
Admin->>Broker: Configure rebalance protocols
Broker->>GroupCoordinatorConfig: Read default protocols
GroupCoordinatorConfig-->>Broker: CLASSIC, CONSUMER, STREAMS
Broker-->>Admin: Protocols set (STREAMS now supported)
Poem
Tip ⚡💬 Agentic Chat (Pro Plan, General Availability)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
(3 hunks)core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
(1 hunks)group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
(1 hunks)metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
(1 hunks)metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
(1 hunks)server-common/src/main/java/org/apache/kafka/server/common/Feature.java
(1 hunks)server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
(2 hunks)server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java
(1 hunks)server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java
(2 hunks)tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java
(4 hunks)
🔇 Additional comments (23)
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala (1)
328-328
: Feature list updated to include the new streams group feature.The error message in
testFormatWithInvalidFeature
has been updated to include "streams.group.version" in the list of supported features, which aligns with the introduction of the new streams group feature.metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java (1)
390-390
: Version range updated to accommodate the new feature.The error message in
testCannotDowngradeBeforeMinimumKraftVersion
has been updated to reflect the extended version range from "7-28" to "7-29". This change correctly aligns with the addition of the new streams group feature that incremented the metadata version.server-common/src/main/java/org/apache/kafka/server/common/Feature.java (1)
51-51
: Addition of the STREAMS_GROUP_VERSION feature.The new feature constant has been added correctly, following the same pattern as the other features. This implements the core functionality mentioned in the PR objectives of adding support for the "streams group" feature.
server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java (2)
33-33
: New import for the StreamsGroupVersion featureThe import statement correctly includes the new
STREAMS_GROUP_VERSION
feature that was introduced as part of KIP-1071.
103-103
: Added StreamsGroupVersion to default finalized featuresThe test is updated to include
STREAMS_GROUP_VERSION
in the expected default finalized features, which correctly reflects the addition of the new streams group functionality.core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala (4)
28-28
: Updated import to include StreamsGroupVersionThe import statement correctly includes the new
StreamsGroupVersion
enum to access it in the test.
67-67
: Updated number of finalized featuresThe expected count of finalized features has been increased from 5 to 6, correctly accounting for the addition of the StreamsGroupVersion feature in the finalized features list.
71-71
: Updated number of supported featuresThe expected count of supported features has been increased from 6 to 7, correctly accounting for the addition of the StreamsGroupVersion feature in the supported features list.
92-93
: Added assertions for StreamsGroupVersion featureNew assertions verify that the StreamsGroupVersion feature has a minimum version of 0 and a maximum version equal to SGV_1.featureLevel(), which ensures proper version range checking for the newly added streams group feature.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java (1)
66-67
: Updated default rebalance protocols to include streamsThe default rebalance protocols now include the STREAMS group type, which aligns with the PR objective to integrate streams groups into the group coordinator configuration by default. This change enables streams group functionality while keeping it in early access as described in the documentation.
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java (2)
120-121
: Added new metadata version for streams groupsA new metadata version
IBP_4_1_IV2
has been added with feature level 28 to mark the introduction of streams groups for early access in Kafka 4.1. The comment appropriately documents that this feature is for early access only.
133-133
: Updated feature level for IBP_4_2_IV0The feature level of the placeholder version
IBP_4_2_IV0
has been incremented from 28 to 29 to maintain the correct sequence after adding the newIBP_4_1_IV2
version. This ensures proper version ordering in the enum.tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java (6)
70-71
: LGTM: Properly adding streams.group.version feature to testsThe assertion correctly checks for the presence and expected output format of the new streams.group.version feature that's being introduced.
73-73
: Index adjustment for transaction.version is correctThe index has been appropriately adjusted to account for the new streams.group.version feature.
96-97
: LGTM: Consistent implementation for bootstrap controllers testThe assertion correctly verifies the streams.group.version feature in the bootstrap controllers test, maintaining consistency with the bootstrap servers test.
99-99
: Index adjustment is correctly applied here as wellThe adjusted index for transaction.version is consistent with the changes in the other test method.
125-125
: Metadata version range correctly updatedThe metadata version range has been properly updated to 7-29 to reflect the addition of a new metadata version that supports the streams group feature.
189-189
: LGTM: Downgrade test properly includes the new featureThe test output now correctly includes the verification that streams.group.version is downgraded to 0 when downgrading to an earlier release version.
server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java (5)
21-33
: LGTM: Well-structured feature version enum implementationThe StreamsGroupVersion enum correctly implements the FeatureVersion interface and defines appropriate versions with clear comments explaining their purpose. The feature is properly disabled by default (LATEST_PRODUCTION = SGV_0), which aligns with the PR objective.
34-46
: LGTM: Clean implementation of enum constructor and stateThe implementation properly stores the feature level, bootstrap metadata version, and dependencies for each enum constant.
48-66
: LGTM: Interface method implementations are correctAll required methods from the FeatureVersion interface are correctly implemented.
68-70
: LGTM: Helper method for checking feature supportThe streamsGroupSupported() method provides a clean way to determine if the streams group feature is enabled.
72-78
: LGTM: Proper feature level conversion methodThe fromFeatureLevel method correctly handles all defined versions and throws an appropriate exception for unknown levels.
@@ -368,7 +368,7 @@ public void testInvalidFeatureFlag() throws Exception { | |||
formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); | |||
assertEquals("Unsupported feature: nonexistent.feature. Supported features " + | |||
"are: eligible.leader.replicas.version, group.version, kraft.version, " + | |||
"share.version, test.feature.version, transaction.version", | |||
"share.version, streams.group.featrue, test.feature.version, transaction.version", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo in feature name in error message.
There's a typo in the feature name included in the error message: "streams.group.featrue" should be "streams.group.version" to match the actual feature name used elsewhere in the codebase.
- "share.version, streams.group.featrue, test.feature.version, transaction.version",
+ "share.version, streams.group.version, test.feature.version, transaction.version",
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"share.version, streams.group.featrue, test.feature.version, transaction.version", | |
"share.version, streams.group.version, test.feature.version, transaction.version", |
Add new StreamsGroupFeature, disabled by default, and add "streams" as
default value to
group.coordinator.rebalance.protocols
.Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests