@@ -1027,6 +1027,8 @@ class KafkaApis(val requestChannel: RequestChannel,
1027
1027
offsetFetchRequest : OffsetFetchRequestData .OffsetFetchRequestGroup ,
1028
1028
requireStable : Boolean
1029
1029
): CompletableFuture [OffsetFetchResponseData .OffsetFetchResponseGroup ] = {
1030
+ val useTopicIds = OffsetFetchRequest .useTopicIds(requestContext.apiVersion)
1031
+
1030
1032
groupCoordinator.fetchAllOffsets(
1031
1033
requestContext,
1032
1034
offsetFetchRequest,
@@ -1040,13 +1042,33 @@ class KafkaApis(val requestChannel: RequestChannel,
1040
1042
offsetFetchResponse
1041
1043
} else {
1042
1044
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
1043
- val (authorizedOffsets, _) = authHelper.partitionSeqByAuthorized (
1045
+ val authorizedNames = authHelper.filterByAuthorized (
1044
1046
requestContext,
1045
1047
DESCRIBE ,
1046
1048
TOPIC ,
1047
1049
offsetFetchResponse.topics.asScala
1048
1050
)(_.name)
1049
- offsetFetchResponse.setTopics(authorizedOffsets.asJava)
1051
+
1052
+ val topics = new mutable.ArrayBuffer [OffsetFetchResponseData .OffsetFetchResponseTopics ]
1053
+ offsetFetchResponse.topics.forEach { topic =>
1054
+ if (authorizedNames.contains(topic.name)) {
1055
+ if (useTopicIds) {
1056
+ // If the topic is not provided by the group coordinator, we set it
1057
+ // using the metadata cache.
1058
+ if (topic.topicId == Uuid .ZERO_UUID ) {
1059
+ metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name))
1060
+ }
1061
+ // If we don't have the topic id at all, we skip the topic because
1062
+ // we can not serialize it without it.
1063
+ if (topic.topicId != Uuid .ZERO_UUID ) {
1064
+ topics += topic
1065
+ }
1066
+ } else {
1067
+ topics += topic
1068
+ }
1069
+ }
1070
+ }
1071
+ offsetFetchResponse.setTopics(topics.asJava)
1050
1072
}
1051
1073
}
1052
1074
}
@@ -1056,14 +1078,53 @@ class KafkaApis(val requestChannel: RequestChannel,
1056
1078
offsetFetchRequest : OffsetFetchRequestData .OffsetFetchRequestGroup ,
1057
1079
requireStable : Boolean
1058
1080
): CompletableFuture [OffsetFetchResponseData .OffsetFetchResponseGroup ] = {
1081
+ val useTopicIds = OffsetFetchRequest .useTopicIds(requestContext.apiVersion)
1082
+
1083
+ if (useTopicIds) {
1084
+ offsetFetchRequest.topics.forEach { topic =>
1085
+ if (topic.topicId != Uuid .ZERO_UUID ) {
1086
+ metadataCache.getTopicName(topic.topicId).ifPresent(name => topic.setName(name))
1087
+ }
1088
+ }
1089
+ }
1090
+
1059
1091
// Clients are not allowed to see offsets for topics that are not authorized for Describe.
1060
- val (authorizedTopics, unauthorizedTopics) = authHelper.partitionSeqByAuthorized (
1092
+ val authorizedTopicNames = authHelper.filterByAuthorized (
1061
1093
requestContext,
1062
1094
DESCRIBE ,
1063
1095
TOPIC ,
1064
1096
offsetFetchRequest.topics.asScala
1065
1097
)(_.name)
1066
1098
1099
+ val authorizedTopics = new mutable.ArrayBuffer [OffsetFetchRequestData .OffsetFetchRequestTopics ]
1100
+ val errorTopics = new mutable.ArrayBuffer [OffsetFetchResponseData .OffsetFetchResponseTopics ]
1101
+
1102
+ def buildErrorResponse (
1103
+ topic : OffsetFetchRequestData .OffsetFetchRequestTopics ,
1104
+ error : Errors
1105
+ ): OffsetFetchResponseData .OffsetFetchResponseTopics = {
1106
+ val topicResponse = new OffsetFetchResponseData .OffsetFetchResponseTopics ()
1107
+ .setTopicId(topic.topicId)
1108
+ .setName(topic.name)
1109
+ topic.partitionIndexes.forEach { partitionIndex =>
1110
+ topicResponse.partitions.add(new OffsetFetchResponseData .OffsetFetchResponsePartitions ()
1111
+ .setPartitionIndex(partitionIndex)
1112
+ .setCommittedOffset(- 1 )
1113
+ .setErrorCode(error.code))
1114
+ }
1115
+ topicResponse
1116
+ }
1117
+
1118
+ offsetFetchRequest.topics.forEach { topic =>
1119
+ if (useTopicIds && topic.name.isEmpty) {
1120
+ errorTopics += buildErrorResponse(topic, Errors .UNKNOWN_TOPIC_ID )
1121
+ } else if (! authorizedTopicNames.contains(topic.name)) {
1122
+ errorTopics += buildErrorResponse(topic, Errors .TOPIC_AUTHORIZATION_FAILED )
1123
+ } else {
1124
+ authorizedTopics += topic
1125
+ }
1126
+ }
1127
+
1067
1128
groupCoordinator.fetchOffsets(
1068
1129
requestContext,
1069
1130
new OffsetFetchRequestData .OffsetFetchRequestGroup ()
@@ -1081,19 +1142,10 @@ class KafkaApis(val requestChannel: RequestChannel,
1081
1142
offsetFetchResponse
1082
1143
} else {
1083
1144
val topics = new util.ArrayList [OffsetFetchResponseData .OffsetFetchResponseTopics ](
1084
- offsetFetchResponse.topics.size + unauthorizedTopics .size
1145
+ offsetFetchResponse.topics.size + errorTopics .size
1085
1146
)
1086
1147
topics.addAll(offsetFetchResponse.topics)
1087
- unauthorizedTopics.foreach { topic =>
1088
- val topicResponse = new OffsetFetchResponseData .OffsetFetchResponseTopics ().setName(topic.name)
1089
- topic.partitionIndexes.forEach { partitionIndex =>
1090
- topicResponse.partitions.add(new OffsetFetchResponseData .OffsetFetchResponsePartitions ()
1091
- .setPartitionIndex(partitionIndex)
1092
- .setCommittedOffset(- 1 )
1093
- .setErrorCode(Errors .TOPIC_AUTHORIZATION_FAILED .code))
1094
- }
1095
- topics.add(topicResponse)
1096
- }
1148
+ topics.addAll(errorTopics.asJava)
1097
1149
offsetFetchResponse.setTopics(topics)
1098
1150
}
1099
1151
}
0 commit comments