@@ -8295,6 +8295,151 @@ class KafkaApisTest extends Logging {
8295
8295
}
8296
8296
}
8297
8297
8298
+ @ParameterizedTest
8299
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
8300
+ def testHandleOffsetFetchWithUnknownTopicIds(version: Short): Unit = {
8301
+ // We only test with topic ids.
8302
+ if (version < 10) return
8303
+
8304
+ val foo = "foo"
8305
+ val bar = "bar"
8306
+ val fooId = Uuid.randomUuid()
8307
+ val barId = Uuid.randomUuid()
8308
+ addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
8309
+
8310
+ def makeRequest(version: Short): RequestChannel.Request = {
8311
+ buildRequest(
8312
+ new OffsetFetchRequest.Builder(
8313
+ new OffsetFetchRequestData()
8314
+ .setGroups(List(
8315
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8316
+ .setGroupId("group-1")
8317
+ .setTopics(List(
8318
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8319
+ .setName(foo)
8320
+ .setTopicId(fooId)
8321
+ .setPartitionIndexes(List[Integer](0).asJava),
8322
+ // bar does not exist so it must return UNKNOWN_TOPIC_ID.
8323
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8324
+ .setName(bar)
8325
+ .setTopicId(barId)
8326
+ .setPartitionIndexes(List[Integer](0).asJava)
8327
+ ).asJava),
8328
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8329
+ .setGroupId("group-2")
8330
+ .setTopics(null)
8331
+ ).asJava),
8332
+ false
8333
+ ).build(version)
8334
+ )
8335
+ }
8336
+
8337
+ val requestChannelRequest = makeRequest(version)
8338
+
8339
+ val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
8340
+ when(groupCoordinator.fetchOffsets(
8341
+ requestChannelRequest.context,
8342
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8343
+ .setGroupId("group-1")
8344
+ .setTopics(List(
8345
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8346
+ .setTopicId(fooId)
8347
+ .setName("foo")
8348
+ .setPartitionIndexes(List[Integer](0).asJava)).asJava),
8349
+ false
8350
+ )).thenReturn(group1Future)
8351
+
8352
+ val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
8353
+ when(groupCoordinator.fetchAllOffsets(
8354
+ requestChannelRequest.context,
8355
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8356
+ .setGroupId("group-2")
8357
+ .setTopics(null),
8358
+ false
8359
+ )).thenReturn(group2Future)
8360
+
8361
+ kafkaApis = createKafkaApis()
8362
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
8363
+
8364
+ val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
8365
+ .setGroupId("group-1")
8366
+ .setTopics(List(
8367
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8368
+ .setTopicId(fooId)
8369
+ .setName(foo)
8370
+ .setPartitions(List(
8371
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8372
+ .setPartitionIndex(0)
8373
+ .setCommittedOffset(100)
8374
+ .setCommittedLeaderEpoch(1)
8375
+ ).asJava)
8376
+ ).asJava)
8377
+
8378
+ val group2Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
8379
+ .setGroupId("group-2")
8380
+ .setTopics(List(
8381
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8382
+ .setName(foo)
8383
+ .setPartitions(List(
8384
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8385
+ .setPartitionIndex(0)
8386
+ .setCommittedOffset(100)
8387
+ .setCommittedLeaderEpoch(1)
8388
+ ).asJava),
8389
+ // bar does not exist so it must be filtered out.
8390
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8391
+ .setName(bar)
8392
+ .setPartitions(List(
8393
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8394
+ .setPartitionIndex(0)
8395
+ .setCommittedOffset(100)
8396
+ .setCommittedLeaderEpoch(1)
8397
+ ).asJava)
8398
+ ).asJava)
8399
+
8400
+ val expectedResponse = new OffsetFetchResponseData()
8401
+ .setGroups(List(
8402
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
8403
+ .setGroupId("group-1")
8404
+ .setTopics(List(
8405
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8406
+ .setTopicId(fooId)
8407
+ .setPartitions(List(
8408
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8409
+ .setPartitionIndex(0)
8410
+ .setCommittedOffset(100)
8411
+ .setCommittedLeaderEpoch(1)
8412
+ ).asJava),
8413
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8414
+ .setTopicId(barId)
8415
+ .setPartitions(List(
8416
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8417
+ .setPartitionIndex(0)
8418
+ .setCommittedOffset(-1)
8419
+ .setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
8420
+ ).asJava)
8421
+ ).asJava),
8422
+ new OffsetFetchResponseData.OffsetFetchResponseGroup()
8423
+ .setGroupId("group-2")
8424
+ .setTopics(List(
8425
+ new OffsetFetchResponseData.OffsetFetchResponseTopics()
8426
+ .setTopicId(fooId)
8427
+ .setPartitions(List(
8428
+ new OffsetFetchResponseData.OffsetFetchResponsePartitions()
8429
+ .setPartitionIndex(0)
8430
+ .setCommittedOffset(100)
8431
+ .setCommittedLeaderEpoch(1)
8432
+ ).asJava)
8433
+ ).asJava)
8434
+ ).asJava)
8435
+
8436
+ group1Future.complete(group1Response)
8437
+ group2Future.complete(group2Response)
8438
+
8439
+ val response = verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
8440
+ assertEquals(expectedResponse, response.data)
8441
+ }
8442
+
8298
8443
@ParameterizedTest
8299
8444
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
8300
8445
def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
@@ -8663,27 +8808,55 @@ class KafkaApisTest extends Logging {
8663
8808
assertEquals(expectedOffsetFetchResponse, response.data)
8664
8809
}
8665
8810
8666
- // TODO Add test for unknown topic id
8667
- // TODO Add test for topic id provided by coordinator that should not be overriden.
8811
+ @ParameterizedTest
8812
+ @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
8813
+ def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(version: Short): Unit = {
8814
+ // We don't test the non batched API.
8815
+ if (version < 8) return
8816
+
8817
+ val foo = "foo"
8818
+ val bar = "bar"
8819
+ val fooId = Uuid.randomUuid()
8820
+ val barId = Uuid.randomUuid()
8821
+ addTopicToMetadataCache(foo, topicId = fooId, numPartitions = 2)
8822
+ addTopicToMetadataCache(bar, topicId = barId, numPartitions = 2)
8668
8823
8669
- // TODO Parameterize it.
8670
- @Test
8671
- def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
8672
8824
def makeRequest(version: Short): RequestChannel.Request = {
8673
- val groups = Map(
8674
- "group-1" -> List(
8675
- new TopicPartition("foo", 0),
8676
- new TopicPartition("bar", 0)
8677
- ).asJava,
8678
- "group-2" -> List(
8679
- new TopicPartition("foo", 0),
8680
- new TopicPartition("bar", 0)
8681
- ).asJava
8682
- ).asJava
8683
- buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version))
8825
+ buildRequest(
8826
+ new OffsetFetchRequest.Builder(
8827
+ new OffsetFetchRequestData()
8828
+ .setGroups(List(
8829
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8830
+ .setGroupId("group-1")
8831
+ .setTopics(List(
8832
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8833
+ .setName(foo)
8834
+ .setTopicId(fooId)
8835
+ .setPartitionIndexes(List[Integer](0).asJava),
8836
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8837
+ .setName(bar)
8838
+ .setTopicId(barId)
8839
+ .setPartitionIndexes(List[Integer](0).asJava)
8840
+ ).asJava),
8841
+ new OffsetFetchRequestData.OffsetFetchRequestGroup()
8842
+ .setGroupId("group-2")
8843
+ .setTopics(List(
8844
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8845
+ .setName(foo)
8846
+ .setTopicId(fooId)
8847
+ .setPartitionIndexes(List[Integer](0).asJava),
8848
+ new OffsetFetchRequestData.OffsetFetchRequestTopics()
8849
+ .setName(bar)
8850
+ .setTopicId(barId)
8851
+ .setPartitionIndexes(List[Integer](0).asJava)
8852
+ ).asJava)
8853
+ ).asJava),
8854
+ false
8855
+ ).build(version)
8856
+ )
8684
8857
}
8685
8858
8686
- val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion(false) )
8859
+ val requestChannelRequest = makeRequest(version )
8687
8860
8688
8861
val authorizer: Authorizer = mock(classOf[Authorizer])
8689
8862
@@ -8711,7 +8884,8 @@ class KafkaApisTest extends Logging {
8711
8884
new OffsetFetchRequestData.OffsetFetchRequestGroup()
8712
8885
.setGroupId("group-1")
8713
8886
.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
8714
- .setName("bar")
8887
+ .setName(bar)
8888
+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
8715
8889
.setPartitionIndexes(List[Integer](0).asJava)).asJava),
8716
8890
false
8717
8891
)).thenReturn(group1Future)
@@ -8722,7 +8896,8 @@ class KafkaApisTest extends Logging {
8722
8896
new OffsetFetchRequestData.OffsetFetchRequestGroup()
8723
8897
.setGroupId("group-2")
8724
8898
.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
8725
- .setName("bar")
8899
+ .setName(bar)
8900
+ .setTopicId(if (version >= 10) barId else Uuid.ZERO_UUID)
8726
8901
.setPartitionIndexes(List[Integer](0).asJava)).asJava),
8727
8902
false
8728
8903
)).thenReturn(group1Future)
0 commit comments