@@ -3190,6 +3190,42 @@ public void testListGroupsEmptyGroupType() throws Exception {
3190
3190
}
3191
3191
}
3192
3192
3193
+ @Test
3194
+ public void testListGroupsWithProtocolTypes() throws Exception {
3195
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
3196
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
3197
+
3198
+ // Test with list group options.
3199
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
3200
+
3201
+ env.kafkaClient().prepareResponseFrom(
3202
+ expectListGroupsRequestWithFilters(Set.of(), Set.of()),
3203
+ new ListGroupsResponse(new ListGroupsResponseData()
3204
+ .setErrorCode(Errors.NONE.code())
3205
+ .setGroups(List.of(
3206
+ new ListGroupsResponseData.ListedGroup()
3207
+ .setGroupId("group-1")
3208
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
3209
+ .setGroupState("Stable")
3210
+ .setGroupType(GroupType.CONSUMER.toString()),
3211
+ new ListGroupsResponseData.ListedGroup()
3212
+ .setGroupId("group-2")
3213
+ .setGroupState("Empty")
3214
+ .setGroupType(GroupType.CONSUMER.toString())))),
3215
+ env.cluster().nodeById(0));
3216
+
3217
+ final ListGroupsOptions options = new ListGroupsOptions().withProtocolTypes(Set.of(""));
3218
+ final ListGroupsResult result = env.adminClient().listGroups(options);
3219
+ Collection<GroupListing> listing = result.valid().get();
3220
+
3221
+ assertEquals(1, listing.size());
3222
+ List<GroupListing> expected = new ArrayList<>();
3223
+ expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY)));
3224
+ assertEquals(expected, listing);
3225
+ assertEquals(0, result.errors().get().size());
3226
+ }
3227
+ }
3228
+
3193
3229
@Test
3194
3230
public void testListGroupsWithTypes() throws Exception {
3195
3231
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
@@ -3432,6 +3468,42 @@ public void testListConsumerGroupsWithStates() throws Exception {
3432
3468
}
3433
3469
}
3434
3470
3471
+ @Test
3472
+ public void testListConsumerGroupsWithProtocolTypes() throws Exception {
3473
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
3474
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
3475
+
3476
+ // Test with a specific protocol type filter in list consumer group options.
3477
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
3478
+
3479
+ env.kafkaClient().prepareResponseFrom(
3480
+ expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())),
3481
+ new ListGroupsResponse(new ListGroupsResponseData()
3482
+ .setErrorCode(Errors.NONE.code())
3483
+ .setGroups(List.of(
3484
+ new ListGroupsResponseData.ListedGroup()
3485
+ .setGroupId("group-1")
3486
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
3487
+ .setGroupState("Stable")
3488
+ .setGroupType(GroupType.CONSUMER.toString()),
3489
+ new ListGroupsResponseData.ListedGroup()
3490
+ .setGroupId("group-2")
3491
+ .setGroupState("Empty")
3492
+ .setGroupType(GroupType.CONSUMER.toString())))),
3493
+ env.cluster().nodeById(0));
3494
+
3495
+ final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE));
3496
+ final ListGroupsResult result = env.adminClient().listGroups(options);
3497
+ Collection<GroupListing> listings = result.valid().get();
3498
+
3499
+ assertEquals(1, listings.size());
3500
+ List<GroupListing> expected = new ArrayList<>();
3501
+ expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
3502
+ assertEquals(expected, listings);
3503
+ assertEquals(0, result.errors().get().size());
3504
+ }
3505
+ }
3506
+
3435
3507
@Test
3436
3508
public void testListConsumerGroupsWithTypes() throws Exception {
3437
3509
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
0 commit comments