Skip to content

KAFKA-19173: Add Feature for "streams" group #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, TransactionVersion}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsGroupVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
Expand Down Expand Up @@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (apiVersion >= 3) {
assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(6, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())

assertEquals(6, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(7, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
Expand All @@ -88,6 +88,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {

assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())

assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).minVersion())
assertEquals(StreamsGroupVersion.SGV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
"eligible.leader.replicas.version, group.version, kraft.version, share.version, transaction.version",
"eligible.leader.replicas.version, group.version, kraft.version, share.version, streams.group.version, transaction.version",
assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public class GroupCoordinatorConfig {
"The " + Group.GroupType.SHARE + " and " + Group.GroupType.STREAMS + " rebalance protocols are in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
Group.GroupType.CLASSIC.toString(),
Group.GroupType.CONSUMER.toString());
Group.GroupType.CONSUMER.toString(),
Group.GroupType.STREAMS.toString());
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void testCannotDowngradeBeforeMinimumKraftVersion() {
build();
manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION,
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")),
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-29")),
manager.updateFeatures(
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public void testInvalidFeatureFlag() throws Exception {
formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1);
assertEquals("Unsupported feature: nonexistent.feature. Supported features " +
"are: eligible.leader.replicas.version, group.version, kraft.version, " +
"share.version, test.feature.version, transaction.version",
"share.version, streams.group.featrue, test.feature.version, transaction.version",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix typo in feature name in error message.

There's a typo in the feature name included in the error message: "streams.group.featrue" should be "streams.group.version" to match the actual feature name used elsewhere in the codebase.

-                    "share.version, streams.group.featrue, test.feature.version, transaction.version",
+                    "share.version, streams.group.version, test.feature.version, transaction.version",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"share.version, streams.group.featrue, test.feature.version, transaction.version",
"share.version, streams.group.version, test.feature.version, transaction.version",

assertThrows(FormatterException.class,
() -> formatter1.formatter.run()).
getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public enum Feature {
GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION),
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION),
STREAMS_GROUP_VERSION(StreamsGroupVersion.FEATURE_NAME, StreamsGroupVersion.values(), StreamsGroupVersion.LATEST_PRODUCTION),

/**
* Features defined only for unit tests and are not used in production.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ public enum MetadataVersion {
// Enables share groups. Note, share groups are for preview only in 4.1. (KIP-932).
IBP_4_1_IV1(27, "4.1", "IV1", false),

// Enables "streams" groups. Note, streams groups are for early access only in 4.1. (KIP-1071).
IBP_4_1_IV2(28, "4.1", "IV2", false),

// Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of
// IBP_4_2_IV0 accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be
// a placeholder.
Expand All @@ -127,7 +130,8 @@ public enum MetadataVersion {
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE ***
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. ***
IBP_4_2_IV0(28, "4.2", "IV0", false);
IBP_4_2_IV0(29, "4.2", "IV0", false);


// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;

import java.util.Map;

public enum StreamsGroupVersion implements FeatureVersion {

// Version 0 does disable "streams" groups (KIP-1071).
SGV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),

// Version 1 enables "streams" groups (KIP-1071).
SGV_1(1, MetadataVersion.IBP_4_1_IV2, Map.of());

public static final String FEATURE_NAME = "streams.group.version";

// Disabled by default in 4.1 (early access only).
public static final StreamsGroupVersion LATEST_PRODUCTION = SGV_0;

private final short featureLevel;
private final MetadataVersion bootstrapMetadataVersion;
private final Map<String, Short> dependencies;

StreamsGroupVersion(
int featureLevel,
MetadataVersion bootstrapMetadataVersion,
Map<String, Short> dependencies
) {
this.featureLevel = (short) featureLevel;
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
this.dependencies = dependencies;
}

@Override
public short featureLevel() {
return featureLevel;
}

@Override
public String featureName() {
return FEATURE_NAME;
}

@Override
public MetadataVersion bootstrapMetadataVersion() {
return bootstrapMetadataVersion;
}

@Override
public Map<String, Short> dependencies() {
return dependencies;
}

public boolean streamsGroupSupported() {
return featureLevel >= SGV_1.featureLevel;
}

public static StreamsGroupVersion fromFeatureLevel(short version) {
return switch (version) {
case 0 -> SGV_0;
case 1 -> SGV_1;
default -> throw new RuntimeException("Unknown streams group feature level: " + (int) version);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
import static org.apache.kafka.server.common.Feature.STREAMS_GROUP_VERSION;
import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -99,6 +100,7 @@ public void testDefaultFinalizedFeatures() {
GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
STREAMS_GROUP_VERSION.featureName(), STREAMS_GROUP_VERSION.latestTesting(),
"kraft.version", (short) 0,
"test_feature_1", (short) 4,
"test_feature_2", (short) 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ public void testDescribeWithKRaft(ClusterInstance cluster) {
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
assertEquals("Feature: streams.group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6)));
}

// Use the first MetadataVersion that supports KIP-919
Expand All @@ -91,8 +93,10 @@ public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3)));
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
assertEquals("Feature: streams.group.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6)));
}

@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV3)
Expand All @@ -118,7 +122,7 @@ public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) {
);
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
assertEquals("Could not disable metadata.version. The update failed for all features since the following " +
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput);
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-29", commandOutput);

commandOutput = ToolsTestUtils.captureStandardOut(() ->
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
Expand Down Expand Up @@ -182,6 +186,7 @@ public void testDowngradeWithReleaseVersion(ClusterInstance cluster) {
"kraft.version was downgraded to 0.\n" +
"metadata.version was downgraded to 18.\n" +
"share.version was downgraded to 0.\n" +
"streams.group.version was downgraded to 0.\n" +
"transaction.version was downgraded to 0.", commandOutput);
}

Expand Down