Skip to content

Commit cc8e914

Browse files
committed
Add list_consumer_group_offsets()
Support fetching the offsets of a consumer group. Note: As far as I can tell (the Java code is a little inscrutable), the Java AdminClient doesn't allow specifying the `coordinator_id` or the `partitions`. But I decided to include them because they provide a lot of additional flexibility: 1. allowing users to specify the partitions allows this method to be used even for older brokers that don't support the OffsetFetchRequest_v2 2. allowing users to specify the coordinator ID gives them a way to bypass a network round trip. This method will frequently be used for monitoring, and if you've got 1,000 consumer groups that are being monitored once a minute, that's ~1.5M requests a day that are unnecessarily duplicated as the coordinator doesn't change unless there's an error.
1 parent 5069088 commit cc8e914

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

kafka/admin/kafka.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
from __future__ import absolute_import
22

3+
from collections import defaultdict
34
import copy
45
import logging
56
import socket
7+
8+
from kafka.vendor import six
9+
610
from kafka.client_async import KafkaClient, selectors
711
import kafka.errors as Errors
812
from kafka.errors import (
@@ -12,8 +16,9 @@
1216
from kafka.protocol.admin import (
1317
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
1418
ListGroupsRequest, DescribeGroupsRequest)
15-
from kafka.protocol.commit import GroupCoordinatorRequest
19+
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
1620
from kafka.protocol.metadata import MetadataRequest
21+
from kafka.structs import TopicPartition, OffsetAndMetadata
1722
from kafka.version import __version__
1823

1924
log = logging.getLogger(__name__)
@@ -585,5 +590,75 @@ def list_consumer_groups(self):
585590
# TODO this is completely broken, as it needs to send to the group coordinator
586591
# return self._send(request)
587592

593+
def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
594+
partitions=None):
595+
"""Fetch Consumer Group Offsets.
596+
597+
Note:
598+
This does not verify that the group_id or partitions actually exist
599+
in the cluster.
600+
601+
As soon as any error is encountered, it is immediately raised.
602+
603+
:param group_id: The consumer group id name for which to fetch offsets.
604+
:param group_coordinator_id: The node_id of the group's coordinator
605+
broker. If set to None, will query the cluster to find the group
606+
coordinator. Explicitly specifying this can be useful to prevent
607+
that extra network round trip if you already know the group
608+
coordinator. Default: None.
609+
:param partitions: A list of TopicPartitions for which to fetch
610+
offsets. On brokers >= 0.10.2, this can be set to None to fetch all
611+
known offsets for the consumer group. Default: None.
612+
:return dictionary: A dictionary with TopicPartition keys and
613+
OffsetAndMetada values. Partitions that are not specified and for
614+
which the group_id does not have a recorded offset are omitted. An
615+
offset value of `-1` indicates the group_id has no offset for that
616+
TopicPartition. A `-1` can only happen for partitions that are
617+
explicitly specified.
618+
"""
619+
group_offsets_listing = {}
620+
if group_coordinator_id is None:
621+
group_coordinator_id = self._find_group_coordinator_id(group_id)
622+
version = self._matching_api_version(OffsetFetchRequest)
623+
if version <= 3:
624+
if partitions is None:
625+
if version <= 1:
626+
raise ValueError(
627+
"""OffsetFetchRequest_v{} requires specifying the
628+
partitions for which to fetch offsets. Omitting the
629+
partitions is only supported on brokers >= 0.10.2.
630+
For details, see KIP-88.""".format(version))
631+
topics_partitions = None
632+
else:
633+
# transform from [TopicPartition("t1", 1), TopicPartition("t1", 2)] to [("t1", [1, 2])]
634+
topics_partitions_dict = defaultdict(set)
635+
for topic, partition in partitions:
636+
topics_partitions_dict[topic].add(partition)
637+
topics_partitions = list(six.iteritems(topics_partitions_dict))
638+
request = OffsetFetchRequest[version](group_id, topics_partitions)
639+
response = self._send_request_to_node(group_coordinator_id, request)
640+
if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
641+
error_type = Errors.for_code(response.error_code)
642+
if error_type is not Errors.NoError:
643+
# optionally we could retry if error_type.retriable
644+
raise error_type(
645+
"Request '{}' failed with response '{}'."
646+
.format(request, response))
647+
# transform response into a dictionary with TopicPartition keys and
648+
# OffsetAndMetada values--this is what the Java AdminClient returns
649+
for topic, partitions in response.topics:
650+
for partition, offset, metadata, error_code in partitions:
651+
error_type = Errors.for_code(error_code)
652+
if error_type is not Errors.NoError:
653+
raise error_type(
654+
"Unable to fetch offsets for group_id {}, topic {}, partition {}"
655+
.format(group_id, topic, partition))
656+
group_offsets_listing[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, metadata)
657+
else:
658+
raise NotImplementedError(
659+
"Support for OffsetFetch v{} has not yet been added to KafkaAdmin."
660+
.format(version))
661+
return group_offsets_listing
662+
588663
# delete groups protocol not yet implemented
589664
# Note: send the request to the group's coordinator.

kafka/structs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
["topic", "partition", "leader", "replicas", "isr", "error"])
7373

7474
OffsetAndMetadata = namedtuple("OffsetAndMetadata",
75+
# TODO add leaderEpoch: OffsetAndMetadata(offset, leaderEpoch, metadata)
7576
["offset", "metadata"])
7677

7778
OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",

0 commit comments

Comments
 (0)