diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 907cba953fb21..23acdc43b5736 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; @@ -71,7 +72,8 @@ public Builder(String groupId, boolean requireStable, List partitions, boolean throwOnFetchStableOffsetsUnsupported) { - super(ApiKeys.OFFSET_FETCH); + // It can only be used with topic names. + super(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.oldestVersion(), (short) 9); OffsetFetchRequestData.OffsetFetchRequestGroup group = new OffsetFetchRequestData.OffsetFetchRequestGroup() @@ -103,7 +105,8 @@ public Builder(String groupId, public Builder(Map> groupIdToTopicPartitionMap, boolean requireStable, boolean throwOnFetchStableOffsetsUnsupported) { - super(ApiKeys.OFFSET_FETCH); + // It can only be used with topic names. + super(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.oldestVersion(), (short) 9); List groups = new ArrayList<>(); for (Entry> entry : groupIdToTopicPartitionMap.entrySet()) { @@ -134,6 +137,12 @@ public Builder(Map> groupIdToTopicPartitionMap, this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; } + public Builder(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) { + super(ApiKeys.OFFSET_FETCH); + this.data = data; + this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported; + } + @Override public OffsetFetchRequest build(short version) { if (data.groups().size() > 1 && version < 8) { @@ -350,4 +359,8 @@ public boolean isAllPartitionsForGroup(String groupId) { public OffsetFetchRequestData data() { return data; } + + public static boolean useTopicIds(short version) { + return version >= 10; + } } diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json index 88f5b568d724c..0fac6ad1c573f 100644 --- a/clients/src/main/resources/common/message/OffsetFetchRequest.json +++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json @@ -38,8 +38,11 @@ // // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). It adds // the MemberId and MemberEpoch fields. Those are filled in and validated when the new consumer protocol is used. - "validVersions": "1-9", + // + // Version 10 adds support for topic ids (KIP-848). + "validVersions": "1-10", "flexibleVersions": "6+", + "latestVersionUnstable": true, "fields": [ { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId", "about": "The group to fetch offsets for." }, @@ -60,8 +63,10 @@ "about": "The member epoch if using the new consumer protocol (KIP-848)." }, { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+", "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true, "about": "The topic name."}, + { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, + "about": "The topic ID." }, { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+", "about": "The partition indexes we would like to fetch offsets for." } ]} diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json b/clients/src/main/resources/common/message/OffsetFetchResponse.json index 9f0a5157cc424..c55466cdda72f 100644 --- a/clients/src/main/resources/common/message/OffsetFetchResponse.json +++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json @@ -38,7 +38,9 @@ // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is // the same as version 8 but can return STALE_MEMBER_EPOCH and UNKNOWN_MEMBER_ID errors when the new consumer group // protocol is used. - "validVersions": "1-9", + // + // Version 10 adds support for topic ids (KIP-848). + "validVersions": "1-10", "flexibleVersions": "6+", // Supported errors: // - GROUP_AUTHORIZATION_FAILED (version 0+) @@ -49,6 +51,7 @@ // - UNSTABLE_OFFSET_COMMIT (version 7+) // - UNKNOWN_MEMBER_ID (version 9+) // - STALE_MEMBER_EPOCH (version 9+) + // - UNKNOWN_TOPIC_ID (version 10+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, @@ -78,8 +81,10 @@ "about": "The group ID." }, { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+", "about": "The responses per topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", + { "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true, "about": "The topic name." }, + { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, + "about": "The topic ID." }, { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+", "about": "The responses per partition.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "8+", diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6a22963ac7d6a..b257ebfccd03c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1027,6 +1027,8 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { + val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) + groupCoordinator.fetchAllOffsets( requestContext, offsetFetchRequest, @@ -1040,13 +1042,33 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchResponse } else { // Clients are not allowed to see offsets for topics that are not authorized for Describe. - val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized( + val authorizedNames = authHelper.filterByAuthorized( requestContext, DESCRIBE, TOPIC, offsetFetchResponse.topics.asScala )(_.name) - offsetFetchResponse.setTopics(authorizedOffsets.asJava) + + val topics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] + offsetFetchResponse.topics.forEach { topic => + if (authorizedNames.contains(topic.name)) { + if (useTopicIds) { + // If the topic is not provided by the group coordinator, we set it + // using the metadata cache. + if (topic.topicId == Uuid.ZERO_UUID) { + topic.setTopicId(metadataCache.getTopicId(topic.name)) + } + // If we don't have the topic id at all, we skip the topic because + // we can not serialize it without it. + if (topic.topicId != Uuid.ZERO_UUID) { + topics += topic + } + } else { + topics += topic + } + } + } + offsetFetchResponse.setTopics(topics.asJava) } } } @@ -1056,14 +1078,53 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchRequest: OffsetFetchRequestData.OffsetFetchRequestGroup, requireStable: Boolean ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = { + val useTopicIds = OffsetFetchRequest.useTopicIds(requestContext.apiVersion) + + if (useTopicIds) { + offsetFetchRequest.topics.forEach { topic => + if (topic.topicId != Uuid.ZERO_UUID) { + metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name)) + } + } + } + // Clients are not allowed to see offsets for topics that are not authorized for Describe. - val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized( + val authorizedTopicNames = authHelper.filterByAuthorized( requestContext, DESCRIBE, TOPIC, offsetFetchRequest.topics.asScala )(_.name) + val authorizedTopics = new mutable.ArrayBuffer[OffsetFetchRequestData.OffsetFetchRequestTopics] + val errorTopics = new mutable.ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseTopics] + + def buildErrorResponse( + topic: OffsetFetchRequestData.OffsetFetchRequestTopics, + error: Errors + ): OffsetFetchResponseData.OffsetFetchResponseTopics = { + val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(topic.topicId) + .setName(topic.name) + topic.partitionIndexes.forEach { partitionIndex => + topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(partitionIndex) + .setCommittedOffset(-1) + .setErrorCode(error.code)) + } + topicResponse + } + + offsetFetchRequest.topics.forEach { topic => + if (useTopicIds && topic.name.isEmpty) { + errorTopics += buildErrorResponse(topic, Errors.UNKNOWN_TOPIC_ID) + } else if (!authorizedTopicNames.contains(topic.name)) { + errorTopics += buildErrorResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) + } else { + authorizedTopics += topic + } + } + groupCoordinator.fetchOffsets( requestContext, new OffsetFetchRequestData.OffsetFetchRequestGroup() @@ -1081,19 +1142,10 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchResponse } else { val topics = new util.ArrayList[OffsetFetchResponseData.OffsetFetchResponseTopics]( - offsetFetchResponse.topics.size + unauthorizedTopics.size + offsetFetchResponse.topics.size + errorTopics.size ) topics.addAll(offsetFetchResponse.topics) - unauthorizedTopics.foreach { topic => - val topicResponse = new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name) - topic.partitionIndexes.forEach { partitionIndex => - topicResponse.partitions.add(new OffsetFetchResponseData.OffsetFetchResponsePartitions() - .setPartitionIndex(partitionIndex) - .setCommittedOffset(-1) - .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)) - } - topics.add(topicResponse) - } + topics.addAll(errorTopics.asJava) offsetFetchResponse.setTopics(topics) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index c644043168438..2d52dd6301ac5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -8136,17 +8136,37 @@ class KafkaApisTest extends Logging { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = { + val foo = "foo" + val bar = "bar" + val fooId = Uuid.randomUuid() + addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2) + def makeRequest(version: Short): RequestChannel.Request = { - val groups = Map( - "group-1" -> List( - new TopicPartition("foo", 0), - new TopicPartition("foo", 1) - ).asJava, - "group-2" -> null, - "group-3" -> null, - "group-4" -> null, - ).asJava - buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) + buildRequest( + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(foo) + .setTopicId(fooId) + .setPartitionIndexes(List[Integer](0, 1).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(null), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-3") + .setTopics(null), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-4") + .setTopics(null), + ).asJava), + false + ).build(version) + ) } if (version < 8) { @@ -8162,6 +8182,7 @@ class KafkaApisTest extends Logging { .setGroupId("group-1") .setTopics(List( new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID) .setName("foo") .setPartitionIndexes(List[Integer](0, 1).asJava)).asJava), false @@ -8194,13 +8215,14 @@ class KafkaApisTest extends Logging { false )).thenReturn(group4Future) kafkaApis = createKafkaApis() - kafkaApis.handleOffsetFetchRequest(requestChannelRequest) + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId("group-1") .setTopics(List( new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") + .setTopicId(fooId) + .setName(foo) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8213,11 +8235,30 @@ class KafkaApisTest extends Logging { ).asJava) ).asJava) + val expectedGroup1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID) + .setName(if (version < 10) foo else "") + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(1) + .setCommittedOffset(200) + .setCommittedLeaderEpoch(2) + ).asJava) + ).asJava) + + val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId("group-2") .setTopics(List( new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("bar") + .setName(bar) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8242,7 +8283,7 @@ class KafkaApisTest extends Logging { .setGroupId("group-4") .setErrorCode(Errors.INVALID_GROUP_ID.code) - val expectedGroups = List(group1Response, group2Response, group3Response, group4Response) + val expectedGroups = List(expectedGroup1Response, group2Response, group3Response, group4Response) group1Future.complete(group1Response) group2Future.complete(group2Response) @@ -8250,13 +8291,161 @@ class KafkaApisTest extends Logging { group4Future.complete(group4Response) val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) - assertEquals(expectedGroups.toSet, response.data.groups().asScala.toSet) + assertEquals(expectedGroups.toSet, response.data.groups.asScala.toSet) } } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + def testHandleOffsetFetchWithUnknownTopicIds(version: Short): Unit = { + // We only test with topic ids. + if (version < 10) return + + val foo = "foo" + val bar = "bar" + val fooId = Uuid.randomUuid() + val barId = Uuid.randomUuid() + addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2) + + def makeRequest(version: Short): RequestChannel.Request = { + buildRequest( + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(foo) + .setTopicId(fooId) + .setPartitionIndexes(List[Integer](0).asJava), + // bar does not exist so it must return UNKNOWN_TOPIC_ID. + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(bar) + .setTopicId(barId) + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(null) + ).asJava), + false + ).build(version) + ) + } + + val requestChannelRequest = makeRequest(version) + + val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() + when(groupCoordinator.fetchOffsets( + requestChannelRequest.context, + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setTopicId(fooId) + .setName("foo") + .setPartitionIndexes(List[Integer](0).asJava)).asJava), + false + )).thenReturn(group1Future) + + val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() + when(groupCoordinator.fetchAllOffsets( + requestChannelRequest.context, + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(null), + false + )).thenReturn(group2Future) + + kafkaApis = createKafkaApis() + kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching) + + val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(fooId) + .setName(foo) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1) + ).asJava) + ).asJava) + + val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group-2") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(foo) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1) + ).asJava), + // bar does not exist so it must be filtered out. + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(bar) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1) + ).asJava) + ).asJava) + + val expectedResponse = new OffsetFetchResponseData() + .setGroups(List( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(fooId) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1) + ).asJava), + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(barId) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(-1) + .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + ).asJava) + ).asJava), + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group-2") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(fooId) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1) + ).asJava) + ).asJava) + ).asJava) + + group1Future.complete(group1Response) + group2Future.complete(group2Response) + + val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest) + assertEquals(expectedResponse, response.data) + } + @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = { + // The single group builder does not support topic ids. + if (version >= 10) return + def makeRequest(version: Short): RequestChannel.Request = { buildRequest(new OffsetFetchRequest.Builder( "group-1", @@ -8331,10 +8520,14 @@ class KafkaApisTest extends Logging { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) def testHandleOffsetFetchAllOffsetsWithSingleGroup(version: Short): Unit = { - // Version 0 gets offsets from Zookeeper. Version 1 does not support fetching all - // offsets request. We are not interested in testing these here. + // Version 1 does not support fetching all offsets request. We are not + // interested in testing these here. if (version < 2) return + val foo = "foo" + val fooId = Uuid.randomUuid() + addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2) + def makeRequest(version: Short): RequestChannel.Request = { buildRequest(new OffsetFetchRequest.Builder( "group-1", @@ -8361,7 +8554,7 @@ class KafkaApisTest extends Logging { .setGroupId("group-1") .setTopics(List( new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") + .setName(foo) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8376,7 +8569,25 @@ class KafkaApisTest extends Logging { val expectedOffsetFetchResponse = if (version >= 8) { new OffsetFetchResponseData() - .setGroups(List(group1Response).asJava) + .setGroups(List( + new OffsetFetchResponseData.OffsetFetchResponseGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName(if (version < 10) foo else "") + .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID) + .setPartitions(List( + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1), + new OffsetFetchResponseData.OffsetFetchResponsePartitions() + .setPartitionIndex(1) + .setCommittedOffset(200) + .setCommittedLeaderEpoch(2) + ).asJava) + ).asJava) + ).asJava) } else { new OffsetFetchResponseData() .setTopics(List( @@ -8401,25 +8612,61 @@ class KafkaApisTest extends Logging { assertEquals(expectedOffsetFetchResponse, response.data) } - @Test - def testHandleOffsetFetchAuthorization(): Unit = { + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + def testHandleOffsetFetchAuthorization(version: Short): Unit = { + // We don't test the non batched API. + if (version < 8) return + + val foo = "foo" + val bar = "bar" + val fooId = Uuid.randomUuid() + val barId = Uuid.randomUuid() + addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2) + addTopicToMetadataCache(bar, topicId = barId, numPartitions = 2) + def makeRequest(version: Short): RequestChannel.Request = { - val groups = Map( - "group-1" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava, - "group-2" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava, - "group-3" -> null, - "group-4" -> null, - ).asJava - buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) + buildRequest( + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(foo) + .setTopicId(fooId) + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(bar) + .setTopicId(barId) + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(foo) + .setTopicId(fooId) + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(bar) + .setTopicId(barId) + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-3") + .setTopics(null), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-4") + .setTopics(null), + ).asJava), + false + ).build(version) + ) } - val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion) + val requestChannelRequest = makeRequest(version) val authorizer: Authorizer = mock(classOf[Authorizer]) @@ -8449,7 +8696,8 @@ class KafkaApisTest extends Logging { new OffsetFetchRequestData.OffsetFetchRequestGroup() .setGroupId("group-1") .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName("bar") + .setName(bar) + .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID) .setPartitionIndexes(List[Integer](0).asJava)).asJava), false )).thenReturn(group1Future) @@ -8470,7 +8718,8 @@ class KafkaApisTest extends Logging { .setGroupId("group-1") .setTopics(List( new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("bar") + .setName(bar) + .setTopicId(barId) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8484,7 +8733,8 @@ class KafkaApisTest extends Logging { .setTopics(List( // foo should be filtered out. new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") + .setName(foo) + .setTopicId(fooId) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8492,7 +8742,8 @@ class KafkaApisTest extends Logging { .setCommittedLeaderEpoch(1) ).asJava), new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("bar") + .setName(bar) + .setTopicId(barId) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8508,7 +8759,8 @@ class KafkaApisTest extends Logging { .setGroupId("group-1") .setTopics(List( new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("bar") + .setName(if (version < 10) bar else "") + .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8516,7 +8768,8 @@ class KafkaApisTest extends Logging { .setCommittedLeaderEpoch(1) ).asJava), new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("foo") + .setName(if (version < 10) foo else "") + .setTopicId(if (version >= 10) fooId else Uuid.ZERO_UUID) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8533,7 +8786,8 @@ class KafkaApisTest extends Logging { .setGroupId("group-3") .setTopics(List( new OffsetFetchResponseData.OffsetFetchResponseTopics() - .setName("bar") + .setName(if (version < 10) bar else "") + .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID) .setPartitions(List( new OffsetFetchResponseData.OffsetFetchResponsePartitions() .setPartitionIndex(0) @@ -8554,23 +8808,55 @@ class KafkaApisTest extends Logging { assertEquals(expectedOffsetFetchResponse, response.data) } - @Test - def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = { + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH) + def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(version: Short): Unit = { + // We don't test the non batched API. + if (version < 8) return + + val foo = "foo" + val bar = "bar" + val fooId = Uuid.randomUuid() + val barId = Uuid.randomUuid() + addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2) + addTopicToMetadataCache(bar, topicId = barId, numPartitions = 2) + def makeRequest(version: Short): RequestChannel.Request = { - val groups = Map( - "group-1" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava, - "group-2" -> List( - new TopicPartition("foo", 0), - new TopicPartition("bar", 0) - ).asJava - ).asJava - buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) + buildRequest( + new OffsetFetchRequest.Builder( + new OffsetFetchRequestData() + .setGroups(List( + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-1") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(foo) + .setTopicId(fooId) + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(bar) + .setTopicId(barId) + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava), + new OffsetFetchRequestData.OffsetFetchRequestGroup() + .setGroupId("group-2") + .setTopics(List( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(foo) + .setTopicId(fooId) + .setPartitionIndexes(List[Integer](0).asJava), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName(bar) + .setTopicId(barId) + .setPartitionIndexes(List[Integer](0).asJava) + ).asJava) + ).asJava), + false + ).build(version) + ) } - val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion) + val requestChannelRequest = makeRequest(version) val authorizer: Authorizer = mock(classOf[Authorizer]) @@ -8598,7 +8884,8 @@ class KafkaApisTest extends Logging { new OffsetFetchRequestData.OffsetFetchRequestGroup() .setGroupId("group-1") .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName("bar") + .setName(bar) + .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID) .setPartitionIndexes(List[Integer](0).asJava)).asJava), false )).thenReturn(group1Future) @@ -8609,7 +8896,8 @@ class KafkaApisTest extends Logging { new OffsetFetchRequestData.OffsetFetchRequestGroup() .setGroupId("group-2") .setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() - .setName("bar") + .setName(bar) + .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID) .setPartitionIndexes(List[Integer](0).asJava)).asJava), false )).thenReturn(group1Future) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 2b50071a7f771..eb5f1b07b8216 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.StaleMemberEpochException; @@ -739,7 +740,9 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchOffsets( request.topics().forEach(topic -> { final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse = - new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic.name()); + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setTopicId(topic.topicId()) + .setName(topic.name()); topicResponses.add(topicResponse); final TimelineHashMap topicOffsets = groupOffsets == null ? @@ -809,7 +812,11 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( final TimelineHashMap topicOffsets = topicEntry.getValue(); final OffsetFetchResponseData.OffsetFetchResponseTopics topicResponse = - new OffsetFetchResponseData.OffsetFetchResponseTopics().setName(topic); + new OffsetFetchResponseData.OffsetFetchResponseTopics() + // It is set to zero for now but it will be set to the persisted + // topic id along the committed offset, if present. + .setTopicId(Uuid.ZERO_UUID) + .setName(topic); topicResponses.add(topicResponse); topicOffsets.entrySet(lastCommittedOffset).forEach(partitionEntry -> { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 6f788d84fd009..90d1a74122f88 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -1765,6 +1765,44 @@ public void testFetchOffsetsWithUnknownGroup() { assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE)); } + @Test + public void testFetchOffsetsWithTopicIds() { + Uuid fooId = Uuid.randomUuid(); + Uuid barId = Uuid.randomUuid(); + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true); + + context.commitOffset("group", "foo", 0, 100L, 1); + context.commitOffset("group", "bar", 0, 200L, 1); + + List request = Arrays.asList( + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("foo") + .setTopicId(fooId) + .setPartitionIndexes(List.of(0)), + new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setTopicId(barId) + .setPartitionIndexes(List.of(0)) + ); + + assertEquals(List.of( + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("foo") + .setTopicId(fooId) + .setPartitions(List.of( + mkOffsetPartitionResponse(0, 100L, 1, "metadata") + )), + new OffsetFetchResponseData.OffsetFetchResponseTopics() + .setName("bar") + .setTopicId(barId) + .setPartitions(List.of( + mkOffsetPartitionResponse(0, 200L, 1, "metadata") + )) + ), context.fetchOffsets("group", request, Long.MAX_VALUE)); + } + @Test public void testFetchOffsetsAtDifferentCommittedOffset() { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();