Skip to content

Commit 1c286b5

Browse files
wbarnhaCourouge
authored andcommitted
Support Describe log dirs (dpkp#145)
I implemented API KEY 35 from the official Apache Kafka documentation. This functionality is requested in issue # 2163 and this is an implementation proposal. Co-authored-by: chopatate <[email protected]>
1 parent 02f1467 commit 1c286b5

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

kafka/admin/client.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from kafka.protocol.admin import (
1818
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
1919
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
20-
DeleteGroupsRequest
20+
DeleteGroupsRequest, DescribeLogDirsRequest
2121
)
2222
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2323
from kafka.protocol.metadata import MetadataRequest
@@ -1342,3 +1342,19 @@ def _wait_for_futures(self, futures):
13421342

13431343
if future.failed():
13441344
raise future.exception # pylint: disable-msg=raising-bad-type
1345+
1346+
def describe_log_dirs(self):
1347+
"""Send a DescribeLogDirsRequest request to a broker.
1348+
1349+
:return: A message future
1350+
"""
1351+
version = self._matching_api_version(DescribeLogDirsRequest)
1352+
if version <= 1:
1353+
request = DescribeLogDirsRequest[version]()
1354+
future = self._send_request_to_node(self._client.least_loaded_node(), request)
1355+
self._wait_for_futures([future])
1356+
else:
1357+
raise NotImplementedError(
1358+
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
1359+
.format(version))
1360+
return future.value

kafka/protocol/admin.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,48 @@ class DescribeConfigsRequest_v2(Request):
788788
]
789789

790790

791+
class DescribeLogDirsResponse_v0(Response):
792+
API_KEY = 35
793+
API_VERSION = 0
794+
FLEXIBLE_VERSION = True
795+
SCHEMA = Schema(
796+
('throttle_time_ms', Int32),
797+
('log_dirs', Array(
798+
('error_code', Int16),
799+
('log_dir', String('utf-8')),
800+
('topics', Array(
801+
('name', String('utf-8')),
802+
('partitions', Array(
803+
('partition_index', Int32),
804+
('partition_size', Int64),
805+
('offset_lag', Int64),
806+
('is_future_key', Boolean)
807+
))
808+
))
809+
))
810+
)
811+
812+
813+
class DescribeLogDirsRequest_v0(Request):
814+
API_KEY = 35
815+
API_VERSION = 0
816+
RESPONSE_TYPE = DescribeLogDirsResponse_v0
817+
SCHEMA = Schema(
818+
('topics', Array(
819+
('topic', String('utf-8')),
820+
('partitions', Int32)
821+
))
822+
)
823+
824+
825+
DescribeLogDirsResponse = [
826+
DescribeLogDirsResponse_v0,
827+
]
828+
DescribeLogDirsRequest = [
829+
DescribeLogDirsRequest_v0,
830+
]
831+
832+
791833
class SaslAuthenticateResponse_v0(Response):
792834
API_KEY = 36
793835
API_VERSION = 0

0 commit comments

Comments
 (0)