Skip to content

Commit 3fdd4e6

Browse files
committed
wip
1 parent c542dc3 commit 3fdd4e6

File tree

5 files changed

+169
-47
lines changed

5 files changed

+169
-47
lines changed

clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.common.requests;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.Uuid;
2021
import org.apache.kafka.common.errors.UnsupportedVersionException;
2122
import org.apache.kafka.common.message.OffsetFetchRequestData;
2223
import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
@@ -71,7 +72,8 @@ public Builder(String groupId,
7172
boolean requireStable,
7273
List<TopicPartition> partitions,
7374
boolean throwOnFetchStableOffsetsUnsupported) {
74-
super(ApiKeys.OFFSET_FETCH);
75+
// It can only be used with topic names.
76+
super(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.oldestVersion(), (short) 9);
7577

7678
OffsetFetchRequestData.OffsetFetchRequestGroup group =
7779
new OffsetFetchRequestData.OffsetFetchRequestGroup()
@@ -103,7 +105,8 @@ public Builder(String groupId,
103105
public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
104106
boolean requireStable,
105107
boolean throwOnFetchStableOffsetsUnsupported) {
106-
super(ApiKeys.OFFSET_FETCH);
108+
// It can only be used with topic names.
109+
super(ApiKeys.OFFSET_FETCH, ApiKeys.OFFSET_FETCH.oldestVersion(), (short) 9);
107110

108111
List<OffsetFetchRequestGroup> groups = new ArrayList<>();
109112
for (Entry<String, List<TopicPartition>> entry : groupIdToTopicPartitionMap.entrySet()) {
@@ -134,6 +137,12 @@ public Builder(Map<String, List<TopicPartition>> groupIdToTopicPartitionMap,
134137
this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
135138
}
136139

140+
public Builder(OffsetFetchRequestData data, boolean throwOnFetchStableOffsetsUnsupported) {
141+
super(ApiKeys.OFFSET_FETCH);
142+
this.data = data;
143+
this.throwOnFetchStableOffsetsUnsupported = throwOnFetchStableOffsetsUnsupported;
144+
}
145+
137146
@Override
138147
public OffsetFetchRequest build(short version) {
139148
if (data.groups().size() > 1 && version < 8) {

clients/src/main/resources/common/message/OffsetFetchRequest.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
"about": "The member epoch if using the new consumer protocol (KIP-848)." },
6464
{ "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
6565
"about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
66-
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", "ignorable": true,
66+
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
6767
"about": "The topic name."},
6868
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
6969
"about": "The topic ID." },

clients/src/main/resources/common/message/OffsetFetchResponse.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@
8181
"about": "The group ID." },
8282
{ "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
8383
"about": "The responses per topic.", "fields": [
84-
{ "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName", "ignorable": true,
84+
{ "name": "Name", "type": "string", "versions": "8-9", "entityType": "topicName", "ignorable": true,
8585
"about": "The topic name." },
8686
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true,
8787
"about": "The topic ID." },

core/src/main/scala/kafka/server/KafkaApis.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -1056,7 +1056,7 @@ class KafkaApis(val requestChannel: RequestChannel,
10561056
// If the topic is not provided by the group coordinator, we set it
10571057
// using the metadata cache.
10581058
if (topic.topicId == Uuid.ZERO_UUID) {
1059-
metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name))
1059+
topic.setTopicId(metadataCache.getTopicId(topic.name))
10601060
}
10611061
// If we don't have the topic id at all, we skip the topic because
10621062
// we can not serialize it without it.

0 commit comments

Comments
 (0)