Skip to content

Commit 687cecf

Browse files
committed
KAFKA-19173: Add Feature for "streams" group
Add new StreamsGroupFeature, disabled by default, and add "streams" as default value to `group.coordinator.rebalance.protocols`.
1 parent 810beef commit 687cecf

File tree

10 files changed

+106
-11
lines changed

10 files changed

+106
-11
lines changed

core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala

+6-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.kafka.common.protocol.ApiKeys
2525
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
2626
import org.apache.kafka.common.test.ClusterInstance
2727
import org.apache.kafka.common.utils.Utils
28-
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, TransactionVersion}
28+
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, ShareVersion, StreamsGroupVersion, TransactionVersion}
2929
import org.apache.kafka.test.TestUtils
3030
import org.junit.jupiter.api.Assertions._
3131
import org.junit.jupiter.api.Tag
@@ -64,11 +64,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
6464
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
6565
): Unit = {
6666
if (apiVersion >= 3) {
67-
assertEquals(5, apiVersionsResponse.data().finalizedFeatures().size())
67+
assertEquals(6, apiVersionsResponse.data().finalizedFeatures().size())
6868
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
6969
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())
7070

71-
assertEquals(6, apiVersionsResponse.data().supportedFeatures().size())
71+
assertEquals(7, apiVersionsResponse.data().supportedFeatures().size())
7272
assertEquals(MetadataVersion.MINIMUM_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
7373
if (apiVersion < 4) {
7474
assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
@@ -88,6 +88,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
8888

8989
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).minVersion())
9090
assertEquals(ShareVersion.SV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(ShareVersion.FEATURE_NAME).maxVersion())
91+
92+
assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).minVersion())
93+
assertEquals(StreamsGroupVersion.SGV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(StreamsGroupVersion.FEATURE_NAME).maxVersion())
9194
}
9295
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
9396
ApiVersionsResponse.collectApis(

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ Found problem:
325325
properties.putAll(defaultStaticQuorumProperties)
326326
properties.setProperty("log.dirs", availableDirs.mkString(","))
327327
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
328-
"eligible.leader.replicas.version, group.version, kraft.version, share.version, transaction.version",
328+
"eligible.leader.replicas.version, group.version, kraft.version, share.version, streams.group.version, transaction.version",
329329
assertThrows(classOf[FormatterException], () =>
330330
runFormatCommand(new ByteArrayOutputStream(), properties,
331331
Seq("--feature", "non.existent.feature=20"))).getMessage)

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ public class GroupCoordinatorConfig {
6363
"The " + Group.GroupType.SHARE + " and " + Group.GroupType.STREAMS + " rebalance protocols are in early access and therefore must not be used in production.";
6464
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
6565
Group.GroupType.CLASSIC.toString(),
66-
Group.GroupType.CONSUMER.toString());
66+
Group.GroupType.CONSUMER.toString(),
67+
Group.GroupType.STREAMS.toString());
6768
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
6869
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
6970
"wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " +

metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ public void testCannotDowngradeBeforeMinimumKraftVersion() {
387387
build();
388388
manager.replay(new FeatureLevelRecord().setName(MetadataVersion.FEATURE_NAME).setFeatureLevel(MetadataVersion.MINIMUM_VERSION.featureLevel()));
389389
assertEquals(ControllerResult.of(List.of(), new ApiError(Errors.INVALID_UPDATE_VERSION,
390-
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-28")),
390+
"Invalid update version 6 for feature metadata.version. Local controller 0 only supports versions 7-29")),
391391
manager.updateFeatures(
392392
Map.of(MetadataVersion.FEATURE_NAME, MetadataVersionTestUtils.IBP_3_3_IV2_FEATURE_LEVEL),
393393
Map.of(MetadataVersion.FEATURE_NAME, FeatureUpdate.UpgradeType.UNSAFE_DOWNGRADE),

metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ public void testInvalidFeatureFlag() throws Exception {
368368
formatter1.formatter.setFeatureLevel("nonexistent.feature", (short) 1);
369369
assertEquals("Unsupported feature: nonexistent.feature. Supported features " +
370370
"are: eligible.leader.replicas.version, group.version, kraft.version, " +
371-
"share.version, test.feature.version, transaction.version",
371+
"share.version, streams.group.featrue, test.feature.version, transaction.version",
372372
assertThrows(FormatterException.class,
373373
() -> formatter1.formatter.run()).
374374
getMessage());

server-common/src/main/java/org/apache/kafka/server/common/Feature.java

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public enum Feature {
4848
GROUP_VERSION(GroupVersion.FEATURE_NAME, GroupVersion.values(), GroupVersion.LATEST_PRODUCTION),
4949
ELIGIBLE_LEADER_REPLICAS_VERSION(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.values(), EligibleLeaderReplicasVersion.LATEST_PRODUCTION),
5050
SHARE_VERSION(ShareVersion.FEATURE_NAME, ShareVersion.values(), ShareVersion.LATEST_PRODUCTION),
51+
STREAMS_GROUP_VERSION(StreamsGroupVersion.FEATURE_NAME, StreamsGroupVersion.values(), StreamsGroupVersion.LATEST_PRODUCTION),
5152

5253
/**
5354
* Features defined only for unit tests and are not used in production.

server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ public enum MetadataVersion {
117117
// Enables share groups. Note, share groups are for preview only in 4.1. (KIP-932).
118118
IBP_4_1_IV1(27, "4.1", "IV1", false),
119119

120+
// Enables "streams" groups. Note, streams groups are for early access only in 4.1. (KIP-1071).
121+
IBP_4_1_IV2(28, "4.1", "IV2", false),
122+
120123
// Insert any additional IBP_4_1_IVx versions above this comment, and bump the feature level of
121124
// IBP_4_2_IV0 accordingly. When 4.2 development begins, IBP_4_2_IV0 will cease to be
122125
// a placeholder.
@@ -127,7 +130,8 @@ public enum MetadataVersion {
127130
// *** SHARE GROUPS BECOME PRODUCTION-READY IN THE FUTURE. ITS DEFINITION ALLOWS A SHARE ***
128131
// *** GROUPS FEATURE TO BE DEFINED IN 4.1 BUT TURNED OFF BY DEFAULT, ABLE TO BE TURNED ON ***
129132
// *** DYNAMICALLY TO TRY OUT THE PREVIEW CAPABILITY. ***
130-
IBP_4_2_IV0(28, "4.2", "IV0", false);
133+
IBP_4_2_IV0(29, "4.2", "IV0", false);
134+
131135

132136
// NOTES when adding a new version:
133137
// Update the default version in @ClusterTest annotation to point to the latest version
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.common;
18+
19+
import java.util.Map;
20+
21+
public enum StreamsGroupVersion implements FeatureVersion {
22+
23+
// Version 0 does disable "streams" groups (KIP-1071).
24+
SGV_0(0, MetadataVersion.MINIMUM_VERSION, Map.of()),
25+
26+
// Version 1 enables "streams" groups (KIP-1071).
27+
SGV_1(1, MetadataVersion.IBP_4_1_IV2, Map.of());
28+
29+
public static final String FEATURE_NAME = "streams.group.version";
30+
31+
// Disabled by default in 4.1 (early access only).
32+
public static final StreamsGroupVersion LATEST_PRODUCTION = SGV_0;
33+
34+
private final short featureLevel;
35+
private final MetadataVersion bootstrapMetadataVersion;
36+
private final Map<String, Short> dependencies;
37+
38+
StreamsGroupVersion(
39+
int featureLevel,
40+
MetadataVersion bootstrapMetadataVersion,
41+
Map<String, Short> dependencies
42+
) {
43+
this.featureLevel = (short) featureLevel;
44+
this.bootstrapMetadataVersion = bootstrapMetadataVersion;
45+
this.dependencies = dependencies;
46+
}
47+
48+
@Override
49+
public short featureLevel() {
50+
return featureLevel;
51+
}
52+
53+
@Override
54+
public String featureName() {
55+
return FEATURE_NAME;
56+
}
57+
58+
@Override
59+
public MetadataVersion bootstrapMetadataVersion() {
60+
return bootstrapMetadataVersion;
61+
}
62+
63+
@Override
64+
public Map<String, Short> dependencies() {
65+
return dependencies;
66+
}
67+
68+
public boolean streamsGroupSupported() {
69+
return featureLevel >= SGV_1.featureLevel;
70+
}
71+
72+
public static StreamsGroupVersion fromFeatureLevel(short version) {
73+
return switch (version) {
74+
case 0 -> SGV_0;
75+
case 1 -> SGV_1;
76+
default -> throw new RuntimeException("Unknown streams group feature level: " + (int) version);
77+
};
78+
}
79+
}

server/src/test/java/org/apache/kafka/server/BrokerFeaturesTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.apache.kafka.server.common.Feature.ELIGIBLE_LEADER_REPLICAS_VERSION;
3131
import static org.apache.kafka.server.common.Feature.GROUP_VERSION;
3232
import static org.apache.kafka.server.common.Feature.SHARE_VERSION;
33+
import static org.apache.kafka.server.common.Feature.STREAMS_GROUP_VERSION;
3334
import static org.apache.kafka.server.common.Feature.TRANSACTION_VERSION;
3435
import static org.junit.jupiter.api.Assertions.assertEquals;
3536
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -99,6 +100,7 @@ public void testDefaultFinalizedFeatures() {
99100
GROUP_VERSION.featureName(), GROUP_VERSION.latestTesting(),
100101
ELIGIBLE_LEADER_REPLICAS_VERSION.featureName(), ELIGIBLE_LEADER_REPLICAS_VERSION.latestTesting(),
101102
SHARE_VERSION.featureName(), SHARE_VERSION.latestTesting(),
103+
STREAMS_GROUP_VERSION.featureName(), STREAMS_GROUP_VERSION.latestTesting(),
102104
"kraft.version", (short) 0,
103105
"test_feature_1", (short) 4,
104106
"test_feature_2", (short) 3,

tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ public void testDescribeWithKRaft(ClusterInstance cluster) {
6767
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.3-IV3\t", outputWithoutEpoch(features.get(3)));
6868
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
6969
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
70+
assertEquals("Feature: streams.group.version\tSupportedMinVersion: 0\t" +
71+
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
7072
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
71-
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
73+
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6)));
7274
}
7375

7476
// Use the first MetadataVersion that supports KIP-919
@@ -91,8 +93,10 @@ public void testDescribeWithKRaftAndBootstrapControllers(ClusterInstance cluster
9193
"SupportedMaxVersion: 4.2-IV0\tFinalizedVersionLevel: 3.7-IV0\t", outputWithoutEpoch(features.get(3)));
9294
assertEquals("Feature: share.version\tSupportedMinVersion: 0\t" +
9395
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(4)));
96+
assertEquals("Feature: streams.group.version\tSupportedMinVersion: 0\t" +
97+
"SupportedMaxVersion: 1\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
9498
assertEquals("Feature: transaction.version\tSupportedMinVersion: 0\t" +
95-
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(5)));
99+
"SupportedMaxVersion: 2\tFinalizedVersionLevel: 0\t", outputWithoutEpoch(features.get(6)));
96100
}
97101

98102
@ClusterTest(types = {Type.KRAFT}, metadataVersion = MetadataVersion.IBP_3_3_IV3)
@@ -118,7 +122,7 @@ public void testDowngradeMetadataVersionWithKRaft(ClusterInstance cluster) {
118122
);
119123
// Change expected message to reflect possible MetadataVersion range 1-N (N increases when adding a new version)
120124
assertEquals("Could not disable metadata.version. The update failed for all features since the following " +
121-
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-28", commandOutput);
125+
"feature had an error: Invalid update version 0 for feature metadata.version. Local controller 3000 only supports versions 7-29", commandOutput);
122126

123127
commandOutput = ToolsTestUtils.captureStandardOut(() ->
124128
assertEquals(1, FeatureCommand.mainNoExit("--bootstrap-server", cluster.bootstrapServers(),
@@ -182,6 +186,7 @@ public void testDowngradeWithReleaseVersion(ClusterInstance cluster) {
182186
"kraft.version was downgraded to 0.\n" +
183187
"metadata.version was downgraded to 18.\n" +
184188
"share.version was downgraded to 0.\n" +
189+
"streams.group.version was downgraded to 0.\n" +
185190
"transaction.version was downgraded to 0.", commandOutput);
186191
}
187192

0 commit comments

Comments
 (0)