Skip to content

Commit 8dab14b

Browse files
committed
Fix describe_groups
This was completely broken previously because it didn't lookup the group coordinator of the consumer group. Also added basic error handling/raising. Note: I added the `group_coordinator_id` as an optional kwarg. As best I can tell, the Java client doesn't include this and instead looks it up every time. However, if we add this, it allows the caller the flexibility to bypass the network round trip of the lookup if for some reason they already know the `group_coordinator_id`.
1 parent 665f1e4 commit 8dab14b

File tree

1 file changed

+50
-13
lines changed

1 file changed

+50
-13
lines changed

kafka/admin/kafka.py

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -557,23 +557,60 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
557557
# describe delegation_token protocol not yet implemented
558558
# Note: send the request to the least_loaded_node()
559559

560-
def describe_consumer_groups(self, group_ids):
560+
def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
561561
"""Describe a set of consumer groups.
562562
563-
:param group_ids: A list of consumer group id names
564-
:return: Appropriate version of DescribeGroupsResponse class
563+
Any errors are immediately raised.
564+
565+
:param group_ids: A list of consumer group IDs. These are typically the
566+
group names as strings.
567+
:param group_coordinator_id: The node_id of the groups' coordinator
568+
broker. If set to None, it will query the cluster for each group to
569+
find that group's coordinator. Explicitly specifying this can be
570+
useful for avoiding extra network round trips if you already know
571+
the group coordinator. This is only useful when all the group_ids
572+
have the same coordinator, otherwise it will error. Default: None.
573+
:return: A list of group descriptions. For now the group descriptions
574+
are the raw results from the DescribeGroupsResponse. Long-term, we
575+
plan to change this to return namedtuples as well as decoding the
576+
partition assignments.
565577
"""
578+
group_descriptions = []
566579
version = self._matching_api_version(DescribeGroupsRequest)
567-
if version <= 1:
568-
request = DescribeGroupsRequest[version](
569-
groups = group_ids
570-
)
571-
else:
572-
raise NotImplementedError(
573-
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
574-
.format(version))
575-
# TODO this is completely broken, as it needs to send to the group coordinator
576-
# return self._send(request)
580+
for group_id in group_ids:
581+
if group_coordinator_id is None:
582+
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
583+
if version <= 1:
584+
# Note: KAFKA-6788 A potential optimization is to group the
585+
# request per coordinator and send one request with a list of
586+
# all consumer groups. Java still hasn't implemented this
587+
# because the error checking is hard to get right when some
588+
# groups error and others don't.
589+
request = DescribeGroupsRequest[version](groups=(group_id,))
590+
response = self._send_request_to_node(this_groups_coordinator_id, request)
591+
assert len(response.groups) == 1
592+
# TODO need to implement converting the response tuple into
593+
# a more accessible interface like a namedtuple and then stop
594+
# hardcoding tuple indices here. Several Java examples,
595+
# including KafkaAdminClient.java
596+
group_description = response.groups[0]
597+
error_code = group_description[0]
598+
error_type = Errors.for_code(error_code)
599+
# Java has the note: KAFKA-6789, we can retry based on the error code
600+
if error_type is not Errors.NoError:
601+
raise error_type(
602+
"Request '{}' failed with response '{}'."
603+
.format(request, response))
604+
# TODO Java checks the group protocol type, and if consumer
605+
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
606+
# the members' partition assignments... that hasn't yet been
607+
# implemented here so just return the raw struct results
608+
group_descriptions.append(group_description)
609+
else:
610+
raise NotImplementedError(
611+
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
612+
.format(version))
613+
return group_descriptions
577614

578615
def list_consumer_groups(self, broker_ids=None):
579616
"""List all consumer groups known to the cluster.

0 commit comments

Comments
 (0)