diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index 88d25b65d934c..2198cb806c78e 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils} import org.apache.kafka.common.test.ClusterInstance import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, TransactionVersion} +import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsGroupVersion, TransactionVersion} import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag @@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion ): Unit = { if (apiVersion >= 3) { - assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size()) + assertEquals(6, apiVersionsResponse.data().finalizedFeatures().size()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel()) assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel()) - assertEquals(6, apiVersionsResponse.data().supportedFeatures().size()) + assertEquals(7, apiVersionsResponse.data().supportedFeatures().size()) assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion()) if (apiVersion < 4) { assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion()) @@ -88,6 +88,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion()) assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion()) + + assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).minVersion()) + assertEquals(StreamsGroupVersion.SGV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).maxVersion()) } val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) { ApiVersionsResponse.collectApis( diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 101b8f43bc48a..cfc51da1f56b2 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -325,7 +325,7 @@ Found problem: properties.putAll(defaultStaticQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) assertEquals("Unsupported feature: non.existent.feature. Supported features are: " + - "eligible.leader.replicas.version, group.version, kraft.version, share.version, transaction.version", + "eligible.leader.replicas.version, group.version, kraft.version, share.version, streams.group.version, transaction.version", assertThrows(classOf[FormatterException], () => runFormatCommand(new ByteArrayOutputStream(), properties, Seq("--feature", "non.existent.feature=20"))).getMessage) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index ece60a36dbaf9..3603e5736d567 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -63,7 +63,8 @@ public class GroupCoordinatorConfig { "The " + Group.GroupType.SHARE + " and " + Group.GroupType.STREAMS + " rebalance protocols are in early access and therefore must not be used in production."; public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of( Group.GroupType.CLASSIC.toString(), - Group.GroupType.CONSUMER.toString()); + Group.GroupType.CONSUMER.toString(), + Group.GroupType.STREAMS.toString()); public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " + diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java index f494ebddcf8dc..29205446b7bf0 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java @@ -387,7 +387,7 @@ public void testCannotDowngradeBeforeMinimumKraftVersion() { build(); manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel())); assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION, - "Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")), + "Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-29")), manager.updateFeatures( Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL), Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE), diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 880bea07a9e7e..f642383990ce8 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -368,7 +368,7 @@ public void testInvalidFeatureFlag() throws Exception { formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1); assertEquals("Unsupported feature: nonexistent.feature. Supported features " + "are: eligible.leader.replicas.version, group.version, kraft.version, " + - "share.version, test.feature.version, transaction.version", + "share.version, streams.group.featrue, test.feature.version, transaction.version", assertThrows(FormatterException.class, () -> formatter1.formatter.run()). getMessage()); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java index 25bb654577c7f..96abf7ac201bd 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/Feature.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/Feature.java @@ -48,6 +48,7 @@ public enum Feature { GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION), ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION), SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION), + STREAMS_GROUP_VERSION(StreamsGroupVersion.FEATURE_NAME, StreamsGroupVersion.values(), StreamsGroupVersion.LATEST_PRODUCTION), /** * Features defined only for unit tests and are not used in production. diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 7e64fa648f522..79dd307599aa2 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -117,6 +117,9 @@ public enum MetadataVersion { // Enables share groups. Note, share groups are for preview only in 4.1. (KIP-932). IBP_4_1_IV1(27, "4.1", "IV1", false), + // Enables "streams" groups. Note, streams groups are for early access only in 4.1. (KIP-1071). + IBP_4_1_IV2(28, "4.1", "IV2", false), + // Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of // IBP_4_2_IV0 accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be // a placeholder. @@ -127,7 +130,8 @@ public enum MetadataVersion { // *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE *** // *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON *** // *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. *** - IBP_4_2_IV0(28, "4.2", "IV0", false); + IBP_4_2_IV0(29, "4.2", "IV0", false); + // NOTES when adding a new version: // Update the default version in @ClusterTest annotation to point to the latest version diff --git a/server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java new file mode 100644 index 0000000000000..bd3a69ac8babb --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/StreamsGroupVersion.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Map; + +public enum StreamsGroupVersion implements FeatureVersion { + + // Version 0 does disable "streams" groups (KIP-1071). + SGV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()), + + // Version 1 enables "streams" groups (KIP-1071). + SGV_1(1, MetadataVersion.IBP_4_1_IV2, Map.of()); + + public static final String FEATURE_NAME = "streams.group.version"; + + // Disabled by default in 4.1 (early access only). + public static final StreamsGroupVersion LATEST_PRODUCTION = SGV_0; + + private final short featureLevel; + private final MetadataVersion bootstrapMetadataVersion; + private final Map dependencies; + + StreamsGroupVersion( + int featureLevel, + MetadataVersion bootstrapMetadataVersion, + Map dependencies + ) { + this.featureLevel = (short) featureLevel; + this.bootstrapMetadataVersion = bootstrapMetadataVersion; + this.dependencies = dependencies; + } + + @Override + public short featureLevel() { + return featureLevel; + } + + @Override + public String featureName() { + return FEATURE_NAME; + } + + @Override + public MetadataVersion bootstrapMetadataVersion() { + return bootstrapMetadataVersion; + } + + @Override + public Map dependencies() { + return dependencies; + } + + public boolean streamsGroupSupported() { + return featureLevel >= SGV_1.featureLevel; + } + + public static StreamsGroupVersion fromFeatureLevel(short version) { + return switch (version) { + case 0 -> SGV_0; + case 1 -> SGV_1; + default -> throw new RuntimeException("Unknown streams group feature level: " + (int) version); + }; + } +} diff --git a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java index 31ce9c596eea6..5edcfb08526ea 100644 --- a/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java +++ b/server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java @@ -30,6 +30,7 @@ import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION; import static org.apache.kafka.server.common.Feature.GROUP_VERSION; import static org.apache.kafka.server.common.Feature.SHARE_VERSION; +import static org.apache.kafka.server.common.Feature.STREAMS_GROUP_VERSION; import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -99,6 +100,7 @@ public void testDefaultFinalizedFeatures() { GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(), ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(), SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(), + STREAMS_GROUP_VERSION.featureName(), STREAMS_GROUP_VERSION.latestTesting(), "kraft.version", (short) 0, "test_feature_1", (short) 4, "test_feature_2", (short) 3, diff --git a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java index a1ef4ff2e3929..02c3932425f8f 100644 --- a/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java @@ -67,8 +67,10 @@ public void testDescribeWithKRaft(ClusterInstance cluster) { "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3))); assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); + assertEquals("Feature: streams.group.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); + "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6))); } // Use the first MetadataVersion that supports KIP-919 @@ -91,8 +93,10 @@ public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster "SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3))); assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4))); + assertEquals("Feature: streams.group.version\tSupportedMinVersion: 0\t" + + "SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" + - "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5))); + "SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6))); } @ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV3) @@ -118,7 +122,7 @@ public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) { ); // Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version) assertEquals("Could not disable metadata.version. The update failed for all features since the following " + - "feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput); + "feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-29", commandOutput); commandOutput = ToolsTestUtils.captureStandardOut(() -> assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(), @@ -182,6 +186,7 @@ public void testDowngradeWithReleaseVersion(ClusterInstance cluster) { "kraft.version was downgraded to 0.\n" + "metadata.version was downgraded to 18.\n" + "share.version was downgraded to 0.\n" + + "streams.group.version was downgraded to 0.\n" + "transaction.version was downgraded to 0.", commandOutput); }