Skip to content

Commit 232a2d6

Browse files
committed
Fix list_consumer_groups() to query all brokers
Previously, this only queried the controller. In actuality, the Kafka protocol requires that the client query all brokers in order to get the full list of consumer groups. Note: The Java code (as best I can tell) doesn't allow limiting this to specific brokers. And on the surface, this makes sense... you typically don't care about specific brokers. However, the inverse is true... consumer groups care about knowing their group coordinator so they don't have to repeatedly query to find it. In fact, a Kafka broker will only return the groups that it's a coordinator for. While this is an implementation detail that is not guaranteed by the upstream broker code, and technically should not be relied upon, I think it very unlikely to change. So monitoring scripts that fetch the offsets or describe the consumers groups of all groups in the cluster can simply issue one call per broker to identify all the coordinators, rather than having to issue one call per consumer group. For an ad-hoc script this doesn't matter, but for a monitoring script that runs every couple of minutes, this can be a big deal. I know in the situations where I will use this, this matters more to me than the risk of the interface unexpectedly breaking.
1 parent cc8e914 commit 232a2d6

File tree

1 file changed

+39
-5
lines changed

1 file changed

+39
-5
lines changed

kafka/admin/kafka.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -575,20 +575,54 @@ def describe_consumer_groups(self, group_ids):
575575
# TODO this is completely broken, as it needs to send to the group coordinator
576576
# return self._send(request)
577577

578-
def list_consumer_groups(self):
578+
def list_consumer_groups(self, broker_ids=None):
579579
"""List all consumer groups known to the cluster.
580580
581-
:return: Appropriate version of ListGroupsResponse class
581+
This returns a list of Consumer Group tuples. The tuples are
582+
composed of the consumer group name and the consumer group protocol
583+
type.
584+
585+
Only consumer groups that store their offsets in Kafka are returned.
586+
The protocol type will be an empty string for groups created using
587+
Kafka < 0.9 APIs because, although they store their offsets in Kafka,
588+
they don't use Kafka for group coordination. For groups created using
589+
Kafka >= 0.9, the protocol type will typically be "consumer".
590+
591+
As soon as any error is encountered, it is immediately raised.
592+
593+
:param broker_ids: A list of broker node_ids to query for consumer
594+
groups. If set to None, will query all brokers in the cluster.
595+
Explicitly specifying broker(s) can be useful for determining which
596+
consumer groups are coordinated by those broker(s). Default: None
597+
:return list: List of tuples of Consumer Groups.
598+
:exception GroupCoordinatorNotAvailableError: The coordinator is not
599+
available, so cannot process requests.
600+
:exception GroupLoadInProgressError: The coordinator is loading and
601+
hence can't process requests.
582602
"""
603+
# While we return a list, internally use a set to prevent duplicates
604+
# because if a group coordinator fails after being queried, and its
605+
# consumer groups move to new brokers that haven't yet been queried,
606+
# then the same group could be returned by multiple brokers.
607+
consumer_groups = set()
608+
if broker_ids is None:
609+
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
583610
version = self._matching_api_version(ListGroupsRequest)
584-
if version <= 1:
611+
if version <= 2:
585612
request = ListGroupsRequest[version]()
613+
for broker_id in broker_ids:
614+
response = self._send_request_to_node(broker_id, request)
615+
error_type = Errors.for_code(response.error_code)
616+
if error_type is not Errors.NoError:
617+
raise error_type(
618+
"Request '{}' failed with response '{}'."
619+
.format(request, response))
620+
consumer_groups.update(response.groups)
586621
else:
587622
raise NotImplementedError(
588623
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
589624
.format(version))
590-
# TODO this is completely broken, as it needs to send to the group coordinator
591-
# return self._send(request)
625+
return list(consumer_groups)
592626

593627
def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
594628
partitions=None):

0 commit comments

Comments
 (0)