Skip to content

KAFKA-19173: Add Feature for "streams" group #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

arvi18
Copy link

@arvi18 arvi18 commented Apr 21, 2025

Add new StreamsGroupFeature, disabled by default, and add "streams" as
default value to group.coordinator.rebalance.protocols.

Summary by CodeRabbit

  • New Features

    • Introduced support for "streams" groups, including a new feature flag and versioning for "streams.group.version".
    • Added "STREAMS" to the default enabled rebalance protocols.
  • Bug Fixes

    • Updated test cases and error messages to include "streams.group.version" and reflect the new supported metadata version range.
  • Documentation

    • Adjusted documentation strings to mention the early access status of the "STREAMS" rebalance protocol.
  • Tests

    • Enhanced and updated test assertions to verify the presence and correct behavior of the new "streams.group.version" feature and updated metadata versioning.

Add new StreamsGroupFeature, disabled by default,
and add "streams" as default value to `group.coordinator.rebalance.protocols`.
Copy link

coderabbitai bot commented Apr 21, 2025

Walkthrough

This update introduces support for the new "streams group" feature in Kafka, reflected across multiple modules. The changes add the StreamsGroupVersion enum and integrate it as a supported feature, update the default rebalance protocols to include "STREAMS", and adjust metadata versioning to account for the new feature. Corresponding test cases and error messages are updated to verify the presence and correct handling of the new feature and to reflect the updated supported version ranges.

Changes

File(s) Change Summary
server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java Introduced a new enum StreamsGroupVersion implementing FeatureVersion, with constants for disabling/enabling streams groups, feature metadata, and utility methods.
server-common/src/main/java/org/apache/kafka/server/common/Feature.java Added new enum constant STREAMS_GROUP_VERSION initialized with details from StreamsGroupVersion.
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java Added new metadata version constant IBP_4_1_IV2 (feature level 28) for early access streams groups; updated IBP_4_2_IV0 feature level from 28 to 29.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java Updated default rebalance protocols to include STREAMS group type in GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT.
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala Increased expected finalized/supported feature counts, added assertions for StreamsGroupVersion, and updated imports.
server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java Included STREAMS_GROUP_VERSION in the set of default finalized features tested.
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java Updated assertions and expected outputs to include "streams.group.version" in describe and downgrade tests; adjusted for new supported metadata version range.
metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java Updated expected error message to reflect supported metadata version range change from "7-28" to "7-29".
metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java Changed expected error message for unsupported feature flag to mention "streams.group.featrue" instead of "share.version".
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala Updated expected error message to include "streams.group.version" in supported features list.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Broker
    participant FeatureManager

    Client->>Broker: Request API Versions
    Broker->>FeatureManager: Get Supported Features
    FeatureManager-->>Broker: List includes StreamsGroupVersion
    Broker-->>Client: ApiVersionsResponse (includes StreamsGroupVersion details)
Loading
sequenceDiagram
    participant Admin
    participant Broker
    participant GroupCoordinatorConfig

    Admin->>Broker: Configure rebalance protocols
    Broker->>GroupCoordinatorConfig: Read default protocols
    GroupCoordinatorConfig-->>Broker: CLASSIC, CONSUMER, STREAMS
    Broker-->>Admin: Protocols set (STREAMS now supported)
Loading

Poem

In the warren of features, a new stream flows,
"Streams group" hops in, as every rabbit knows.
With enums and versions, the code now aligns,
Protocols rebalance, and everything shines.
Tests are updated, the burrow feels bright,
New features supported—oh, what a delight!
🐇✨

Tip

⚡💬 Agentic Chat (Pro Plan, General Availability)
  • We're introducing multi-step agentic chat in review comments and issue comments, within and outside of PR's. This feature enhances review and issue discussions with the CodeRabbit agentic chat by enabling advanced interactions, including the ability to create pull requests directly from comments and add commits to existing pull requests.
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@visz11
Copy link
Collaborator

visz11 commented Apr 21, 2025

@coderabbitai full review

Copy link

coderabbitai bot commented Apr 21, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed79212 and 687cecf.

📒 Files selected for processing (10)
  • core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala (3 hunks)
  • core/src/test/scala/unit/kafka/tools/StorageToolTest.scala (1 hunks)
  • group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java (1 hunks)
  • metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java (1 hunks)
  • metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java (1 hunks)
  • server-common/src/main/java/org/apache/kafka/server/common/Feature.java (1 hunks)
  • server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java (2 hunks)
  • server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java (1 hunks)
  • server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java (2 hunks)
  • tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java (4 hunks)
🔇 Additional comments (23)
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala (1)

328-328: Feature list updated to include the new streams group feature.

The error message in testFormatWithInvalidFeature has been updated to include "streams.group.version" in the list of supported features, which aligns with the introduction of the new streams group feature.

metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java (1)

390-390: Version range updated to accommodate the new feature.

The error message in testCannotDowngradeBeforeMinimumKraftVersion has been updated to reflect the extended version range from "7-28" to "7-29". This change correctly aligns with the addition of the new streams group feature that incremented the metadata version.

server-common/src/main/java/org/apache/kafka/server/common/Feature.java (1)

51-51: Addition of the STREAMS_GROUP_VERSION feature.

The new feature constant has been added correctly, following the same pattern as the other features. This implements the core functionality mentioned in the PR objectives of adding support for the "streams group" feature.

server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java (2)

33-33: New import for the StreamsGroupVersion feature

The import statement correctly includes the new STREAMS_GROUP_VERSION feature that was introduced as part of KIP-1071.


103-103: Added StreamsGroupVersion to default finalized features

The test is updated to include STREAMS_GROUP_VERSION in the expected default finalized features, which correctly reflects the addition of the new streams group functionality.

core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala (4)

28-28: Updated import to include StreamsGroupVersion

The import statement correctly includes the new StreamsGroupVersion enum to access it in the test.


67-67: Updated number of finalized features

The expected count of finalized features has been increased from 5 to 6, correctly accounting for the addition of the StreamsGroupVersion feature in the finalized features list.


71-71: Updated number of supported features

The expected count of supported features has been increased from 6 to 7, correctly accounting for the addition of the StreamsGroupVersion feature in the supported features list.


92-93: Added assertions for StreamsGroupVersion feature

New assertions verify that the StreamsGroupVersion feature has a minimum version of 0 and a maximum version equal to SGV_1.featureLevel(), which ensures proper version range checking for the newly added streams group feature.

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java (1)

66-67: Updated default rebalance protocols to include streams

The default rebalance protocols now include the STREAMS group type, which aligns with the PR objective to integrate streams groups into the group coordinator configuration by default. This change enables streams group functionality while keeping it in early access as described in the documentation.

server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java (2)

120-121: Added new metadata version for streams groups

A new metadata version IBP_4_1_IV2 has been added with feature level 28 to mark the introduction of streams groups for early access in Kafka 4.1. The comment appropriately documents that this feature is for early access only.


133-133: Updated feature level for IBP_4_2_IV0

The feature level of the placeholder version IBP_4_2_IV0 has been incremented from 28 to 29 to maintain the correct sequence after adding the new IBP_4_1_IV2 version. This ensures proper version ordering in the enum.

tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java (6)

70-71: LGTM: Properly adding streams.group.version feature to tests

The assertion correctly checks for the presence and expected output format of the new streams.group.version feature that's being introduced.


73-73: Index adjustment for transaction.version is correct

The index has been appropriately adjusted to account for the new streams.group.version feature.


96-97: LGTM: Consistent implementation for bootstrap controllers test

The assertion correctly verifies the streams.group.version feature in the bootstrap controllers test, maintaining consistency with the bootstrap servers test.


99-99: Index adjustment is correctly applied here as well

The adjusted index for transaction.version is consistent with the changes in the other test method.


125-125: Metadata version range correctly updated

The metadata version range has been properly updated to 7-29 to reflect the addition of a new metadata version that supports the streams group feature.


189-189: LGTM: Downgrade test properly includes the new feature

The test output now correctly includes the verification that streams.group.version is downgraded to 0 when downgrading to an earlier release version.

server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java (5)

21-33: LGTM: Well-structured feature version enum implementation

The StreamsGroupVersion enum correctly implements the FeatureVersion interface and defines appropriate versions with clear comments explaining their purpose. The feature is properly disabled by default (LATEST_PRODUCTION = SGV_0), which aligns with the PR objective.


34-46: LGTM: Clean implementation of enum constructor and state

The implementation properly stores the feature level, bootstrap metadata version, and dependencies for each enum constant.


48-66: LGTM: Interface method implementations are correct

All required methods from the FeatureVersion interface are correctly implemented.


68-70: LGTM: Helper method for checking feature support

The streamsGroupSupported() method provides a clean way to determine if the streams group feature is enabled.


72-78: LGTM: Proper feature level conversion method

The fromFeatureLevel method correctly handles all defined versions and throws an appropriate exception for unknown levels.

@@ -368,7 +368,7 @@ public void testInvalidFeatureFlag() throws Exception {
formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported features " +
"are: eligible.leader.replicas.version, group.version, kraft.version, " +
"share.version, test.feature.version, transaction.version",
"share.version, streams.group.featrue, test.feature.version, transaction.version",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix typo in feature name in error message.

There's a typo in the feature name included in the error message: "streams.group.featrue" should be "streams.group.version" to match the actual feature name used elsewhere in the codebase.

-                    "share.version, streams.group.featrue, test.feature.version, transaction.version",
+                    "share.version, streams.group.version, test.feature.version, transaction.version",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"share.version, streams.group.featrue, test.feature.version, transaction.version",
"share.version, streams.group.version, test.feature.version, transaction.version",

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants