@@ -3524,44 +3524,62 @@ void handleResponse(AbstractResponse abstractResponse) {
3524
3524
for (final Node node : allNodes ) {
3525
3525
final long nowList = time .milliseconds ();
3526
3526
runnable .call (new Call ("listGroups" , deadline , new ConstantNodeIdProvider (node .id ())) {
3527
+
3528
+ // If only regular consumer group types are required, we can try an earlier request version if
3529
+ // UnsupportedVersionException is thrown
3530
+ final boolean canTryEarlierRequestVersion = options .regularConsumerGroupTypes ();
3531
+ boolean tryUsingEarlierRequestVersion = false ;
3532
+
3527
3533
@ Override
3528
3534
ListGroupsRequest .Builder createRequest (int timeoutMs ) {
3529
- List <String > groupTypes = options .types ()
3530
- .stream ()
3531
- .map (GroupType ::toString )
3532
- .collect (Collectors .toList ());
3533
- List <String > groupStates = options .groupStates ()
3534
- .stream ()
3535
- .map (GroupState ::toString )
3536
- .collect (Collectors .toList ());
3537
- return new ListGroupsRequest .Builder (new ListGroupsRequestData ()
3538
- .setTypesFilter (groupTypes )
3539
- .setStatesFilter (groupStates )
3540
- );
3535
+ if (tryUsingEarlierRequestVersion ) {
3536
+ List <String > groupStates = options .groupStates ()
3537
+ .stream ()
3538
+ .map (GroupState ::toString )
3539
+ .collect (Collectors .toList ());
3540
+ return new ListGroupsRequest .Builder (new ListGroupsRequestData ()
3541
+ .setStatesFilter (groupStates )
3542
+ );
3543
+ } else {
3544
+ List <String > groupTypes = options .types ()
3545
+ .stream ()
3546
+ .map (GroupType ::toString )
3547
+ .collect (Collectors .toList ());
3548
+ List <String > groupStates = options .groupStates ()
3549
+ .stream ()
3550
+ .map (GroupState ::toString )
3551
+ .collect (Collectors .toList ());
3552
+ return new ListGroupsRequest .Builder (new ListGroupsRequestData ()
3553
+ .setTypesFilter (groupTypes )
3554
+ .setStatesFilter (groupStates )
3555
+ );
3556
+ }
3541
3557
}
3542
3558
3543
3559
private void maybeAddGroup (ListGroupsResponseData .ListedGroup group ) {
3544
- final String groupId = group .groupId ();
3545
- final Optional <GroupType > type ;
3546
- if (group .groupType () == null || group .groupType ().isEmpty ()) {
3547
- type = Optional .empty ();
3548
- } else {
3549
- type = Optional .of (GroupType .parse (group .groupType ()));
3550
- }
3551
- final String protocolType = group .protocolType ();
3552
- final Optional <GroupState > groupState ;
3553
- if (group .groupState () == null || group .groupState ().isEmpty ()) {
3554
- groupState = Optional .empty ();
3555
- } else {
3556
- groupState = Optional .of (GroupState .parse (group .groupState ()));
3560
+ String protocolType = group .protocolType ();
3561
+ if (options .protocolTypes ().isEmpty () || options .protocolTypes ().contains (protocolType )) {
3562
+ final String groupId = group .groupId ();
3563
+ final Optional <GroupType > type ;
3564
+ if (group .groupType () == null || group .groupType ().isEmpty ()) {
3565
+ type = Optional .empty ();
3566
+ } else {
3567
+ type = Optional .of (GroupType .parse (group .groupType ()));
3568
+ }
3569
+ final Optional <GroupState > groupState ;
3570
+ if (group .groupState () == null || group .groupState ().isEmpty ()) {
3571
+ groupState = Optional .empty ();
3572
+ } else {
3573
+ groupState = Optional .of (GroupState .parse (group .groupState ()));
3574
+ }
3575
+ final GroupListing groupListing = new GroupListing (
3576
+ groupId ,
3577
+ type ,
3578
+ protocolType ,
3579
+ groupState
3580
+ );
3581
+ results .addListing (groupListing );
3557
3582
}
3558
- final GroupListing groupListing = new GroupListing (
3559
- groupId ,
3560
- type ,
3561
- protocolType ,
3562
- groupState
3563
- );
3564
- results .addListing (groupListing );
3565
3583
}
3566
3584
3567
3585
@ Override
@@ -3582,6 +3600,23 @@ void handleResponse(AbstractResponse abstractResponse) {
3582
3600
}
3583
3601
}
3584
3602
3603
+ @ Override
3604
+ boolean handleUnsupportedVersionException (final UnsupportedVersionException exception ) {
3605
+ // If we cannot try the earlier request version, give up
3606
+ if (!canTryEarlierRequestVersion ) {
3607
+ return false ;
3608
+ }
3609
+
3610
+ // If have already tried the earlier request version, give up
3611
+ if (tryUsingEarlierRequestVersion ) {
3612
+ return false ;
3613
+ }
3614
+
3615
+ // Have a try using the earlier request version
3616
+ tryUsingEarlierRequestVersion = true ;
3617
+ return true ;
3618
+ }
3619
+
3585
3620
@ Override
3586
3621
void handleFailure (Throwable throwable ) {
3587
3622
synchronized (results ) {
0 commit comments