Skip to content

KAFKA-17897: Deprecate Admin.listConsumerGroups [2/N] #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3524,44 +3524,62 @@ void handleResponse(AbstractResponse abstractResponse) {
for (final Node node : allNodes) {
final long nowList = time.milliseconds();
runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(node.id())) {

// If only regular consumer group types are required, we can try an earlier request version if
// UnsupportedVersionException is thrown
final boolean canTryEarlierRequestVersion = options.regularConsumerGroupTypes();
boolean tryUsingEarlierRequestVersion = false;

@Override
ListGroupsRequest.Builder createRequest(int timeoutMs) {
List<String> groupTypes = options.types()
.stream()
.map(GroupType::toString)
.collect(Collectors.toList());
List<String> groupStates = options.groupStates()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setTypesFilter(groupTypes)
.setStatesFilter(groupStates)
);
if (tryUsingEarlierRequestVersion) {
List<String> groupStates = options.groupStates()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setStatesFilter(groupStates)
);
} else {
List<String> groupTypes = options.types()
.stream()
.map(GroupType::toString)
.collect(Collectors.toList());
List<String> groupStates = options.groupStates()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setTypesFilter(groupTypes)
.setStatesFilter(groupStates)
);
}
}

private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) {
final String groupId = group.groupId();
final Optional<GroupType> type;
if (group.groupType() == null || group.groupType().isEmpty()) {
type = Optional.empty();
} else {
type = Optional.of(GroupType.parse(group.groupType()));
}
final String protocolType = group.protocolType();
final Optional<GroupState> groupState;
if (group.groupState() == null || group.groupState().isEmpty()) {
groupState = Optional.empty();
} else {
groupState = Optional.of(GroupState.parse(group.groupState()));
String protocolType = group.protocolType();
if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) {
final String groupId = group.groupId();
final Optional<GroupType> type;
if (group.groupType() == null || group.groupType().isEmpty()) {
type = Optional.empty();
} else {
type = Optional.of(GroupType.parse(group.groupType()));
}
final Optional<GroupState> groupState;
if (group.groupState() == null || group.groupState().isEmpty()) {
groupState = Optional.empty();
} else {
groupState = Optional.of(GroupState.parse(group.groupState()));
}
final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType,
groupState
);
results.addListing(groupListing);
}
final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType,
groupState
);
results.addListing(groupListing);
}

@Override
Expand All @@ -3582,6 +3600,23 @@ void handleResponse(AbstractResponse abstractResponse) {
}
}

@Override
boolean handleUnsupportedVersionException(final UnsupportedVersionException exception) {
// If we cannot try the earlier request version, give up
if (!canTryEarlierRequestVersion) {
return false;
}

// If have already tried the earlier request version, give up
if (tryUsingEarlierRequestVersion) {
return false;
}

// Have a try using the earlier request version
tryUsingEarlierRequestVersion = true;
return true;
}

@Override
void handleFailure(Throwable throwable) {
synchronized (results) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,38 @@ public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
private Set<GroupType> types = Set.of();
private Set<String> protocolTypes = Set.of();

// Types filter is supported by brokers with version 4.0.0 or later. Older brokers only support
// classic groups, so listing consumer groups on an older broker does not need to use a types filter.
private boolean regularConsumerGroupTypes = false;

/**
* Only consumer groups will be returned by listGroups().
* This operation sets filters on group type and protocol type which select consumer groups.
*/
public static ListGroupsOptions forConsumerGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER))
.withTypes(Set.of(GroupType.CLASSIC, GroupType.CONSUMER), true)
.withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE));
}

/**
* Only share groups will be returned by listGroups().
* This operation sets a filter on group type which select share groups.
*/
public static ListGroupsOptions forShareGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.SHARE));
}

/**
* Only streams groups will be returned by listGroups().
* This operation sets a filter on group type which select streams groups.
*/
public static ListGroupsOptions forStreamsGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.STREAMS));
}

/**
* If groupStates is set, only groups in these states will be returned by listGroups().
* Otherwise, all groups are returned.
Expand All @@ -56,6 +78,10 @@ public ListGroupsOptions inGroupStates(Set<GroupState> groupStates) {
return this;
}

/**
* If protocol types is set, only groups of these protocol types will be returned by listGroups().
* Otherwise, all groups are returned.
*/
public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes);
return this;
Expand All @@ -66,7 +92,12 @@ public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
* Otherwise, all groups are returned.
*/
public ListGroupsOptions withTypes(Set<GroupType> types) {
return this.withTypes(types, false);
}

ListGroupsOptions withTypes(Set<GroupType> types, boolean regularConsumerGroupTypes) {
this.types = (types == null || types.isEmpty()) ? Set.of() : Set.copyOf(types);
this.regularConsumerGroupTypes = regularConsumerGroupTypes;
return this;
}

Expand All @@ -90,4 +121,8 @@ public Set<String> protocolTypes() {
public Set<GroupType> types() {
return types;
}

boolean regularConsumerGroupTypes() {
return regularConsumerGroupTypes;
}
}
Loading