Skip to content

Commit b046d25

Browse files
committed
Add KafkaAdmin class
Requires cluster version > 0.10.0.0, and uses new wire protocol classes to do many things via broker connection that previously needed to be done directly in zookeeper.
1 parent 9ac3cb1 commit b046d25

File tree

11 files changed

+683
-0
lines changed

11 files changed

+683
-0
lines changed

kafka/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def emit(self, record):
1818
logging.getLogger(__name__).addHandler(NullHandler())
1919

2020

21+
from kafka.admin import KafkaAdmin
2122
from kafka.consumer import KafkaConsumer
2223
from kafka.consumer.subscription_state import ConsumerRebalanceListener
2324
from kafka.producer import KafkaProducer
@@ -46,6 +47,7 @@ def __init__(self, *args, **kwargs):
4647

4748

4849
__all__ = [
50+
'KafkaAdmin',
4951
'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection',
5052
'SimpleClient', 'SimpleProducer', 'KeyedProducer',
5153
'RoundRobinPartitioner', 'HashedPartitioner',

kafka/admin/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.admin.config_resource import ConfigResource, ConfigResourceType
4+
from kafka.admin.kafka import KafkaAdmin
5+
from kafka.admin.new_topic import NewTopic
6+
from kafka.admin.new_partitions import NewPartitions
7+
8+
__all__ = [
9+
'ConfigResource', 'ConfigResourceType', 'KafkaAdmin', 'NewTopic', 'NewPartitions'
10+
]

kafka/admin/config_resource.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from __future__ import absolute_import
2+
3+
import sys
4+
if sys.version_info < (3, 4):
5+
from enum34 import IntEnum # pylint: disable=import-error
6+
else:
7+
from enum import IntEnum
8+
9+
10+
class ConfigResourceType(IntEnum):
11+
"""An enumerated type of config resources"""
12+
13+
BROKER = 4,
14+
TOPIC = 2
15+
16+
17+
class ConfigResource(object):
18+
"""A class for specifying config resources.
19+
Arguments:
20+
resource_type (ConfigResourceType): the type of kafka resource
21+
name (string): The name of the kafka resource
22+
configs ({key : value}): A maps of config keys to values.
23+
"""
24+
25+
def __init__(
26+
self,
27+
resource_type,
28+
name,
29+
configs=None
30+
):
31+
if not isinstance(resource_type, (ConfigResourceType)):
32+
resource_type = ConfigResourceType[str(resource_type).upper()]
33+
self.resource_type = resource_type
34+
self.name = name
35+
self.configs = configs

kafka/admin/kafka.py

Lines changed: 504 additions & 0 deletions
Large diffs are not rendered by default.

kafka/admin/new_partitions.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from __future__ import absolute_import
2+
3+
4+
class NewPartitions(object):
5+
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
6+
must be the difference between the new total number of partitions and the existing number of partitions.
7+
Arguments:
8+
total_count (int): the total number of partitions that should exist on the topic
9+
new_assignments ([[int]]): an array of arrays of replica assignments for new partitions.
10+
If not set, broker assigns replicas per an internal algorithm.
11+
"""
12+
13+
def __init__(
14+
self,
15+
total_count,
16+
new_assignments=None
17+
):
18+
self.total_count = total_count
19+
self.new_assignments = new_assignments

kafka/admin/new_topic.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from __future__ import absolute_import
2+
3+
from kafka.errors import IllegalArgumentError
4+
5+
6+
class NewTopic(object):
7+
""" A class for new topic creation
8+
Arguments:
9+
name (string): name of the topic
10+
num_partitions (int): number of partitions
11+
or -1 if replica_assignment has been specified
12+
replication_factor (int): replication factor or -1 if
13+
replica assignment is specified
14+
replica_assignment (dict of int: [int]): A mapping containing
15+
partition id and replicas to assign to it.
16+
topic_configs (dict of str: str): A mapping of config key
17+
and value for the topic.
18+
"""
19+
20+
def __init__(
21+
self,
22+
name,
23+
num_partitions,
24+
replication_factor,
25+
replica_assignments=None,
26+
topic_configs=None,
27+
):
28+
if not (num_partitions == -1 or replication_factor == -1) ^ (replica_assignments is None):
29+
raise IllegalArgumentError('either num_partitions/replication_factor or replica_assignment must be specified')
30+
self.name = name
31+
self.num_partitions = num_partitions
32+
self.replication_factor = replication_factor
33+
self.replica_assignments = replica_assignments or {}
34+
self.topic_configs = topic_configs or {}

kafka/client_async.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ def __init__(self, **configs):
192192
self._metadata_refresh_in_progress = False
193193
self._selector = self.config['selector']()
194194
self._conns = Dict() # object to support weakrefs
195+
self._api_versions = None
195196
self._connecting = set()
196197
self._refresh_on_disconnects = True
197198
self._last_bootstrap = 0
@@ -798,6 +799,17 @@ def refresh_done(val_or_error):
798799
# to let us know the selected connection might be usable again.
799800
return float('inf')
800801

802+
def get_api_versions(self):
803+
"""Return the ApiVersions map, if available.
804+
805+
Note: A call to check_version must previously have succeeded and returned
806+
version 0.10.0 or later
807+
808+
Returns: a map of dict mapping {api_key : (min_version, max_version)},
809+
or None if ApiVersion is not supported by the kafka cluster.
810+
"""
811+
return self._api_versions
812+
801813
def check_version(self, node_id=None, timeout=2, strict=False):
802814
"""Attempt to guess the version of a Kafka broker.
803815
@@ -831,6 +843,10 @@ def check_version(self, node_id=None, timeout=2, strict=False):
831843
try:
832844
remaining = end - time.time()
833845
version = conn.check_version(timeout=remaining, strict=strict)
846+
if version >= (0, 10, 0):
847+
# cache the api versions map if it's available (starting
848+
# in 0.10 cluster version)
849+
self._api_versions = conn.get_api_versions()
834850
return version
835851
except Errors.NodeNotReadyError:
836852
# Only raise to user if this is a node-specific request

kafka/conn.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,16 @@ def _handle_api_version_response(self, response):
869869
])
870870
return self._api_versions
871871

872+
def get_api_versions(self):
873+
version = self.check_version()
874+
if version < (0, 10, 0):
875+
raise Errors.UnsupportedVersionError(
876+
"ApiVersion not supported by cluster version {} < 0.10.0"
877+
.format(version))
878+
# _api_versions is set as a side effect of check_versions() on a cluster
879+
# that supports 0.10.0 or later
880+
return self._api_versions;
881+
872882
def _infer_broker_version_from_api_versions(self, api_versions):
873883
# The logic here is to check the list of supported request versions
874884
# in reverse order. As soon as we find one that works, return it

kafka/protocol/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@
4444
33: 'AlterConfigs',
4545
36: 'SaslAuthenticate',
4646
37: 'CreatePartitions',
47+
38: 'CreateDelegationToken',
48+
39: 'RenewDelegationToken',
49+
40: 'ExpireDelegationToken',
50+
41: 'DescribeDelegationToken',
51+
42: 'DeleteGroups',
4752
}

test/test_admin.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import pytest
2+
3+
import kafka.admin
4+
from kafka.errors import IllegalArgumentError
5+
6+
7+
def test_config_resource():
8+
with pytest.raises(KeyError):
9+
bad_resource = kafka.admin.ConfigResource('something', 'foo')
10+
good_resource = kafka.admin.ConfigResource('broker', 'bar')
11+
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER)
12+
assert(good_resource.name == 'bar')
13+
assert(good_resource.configs is None)
14+
good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob' : 'nob'})
15+
assert(good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC)
16+
assert(good_resource.name == 'baz')
17+
assert(good_resource.configs == {'frob' : 'nob'})
18+
19+
20+
def test_new_partitions():
21+
good_partitions = kafka.admin.NewPartitions(6)
22+
assert(good_partitions.total_count == 6)
23+
assert(good_partitions.new_assignments is None)
24+
good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]])
25+
assert(good_partitions.total_count == 7)
26+
assert(good_partitions.new_assignments == [[1, 2, 3]])
27+
28+
29+
def test_new_topic():
30+
with pytest.raises(IllegalArgumentError):
31+
bad_topic = kafka.admin.NewTopic('foo', -1, -1)
32+
with pytest.raises(IllegalArgumentError):
33+
bad_topic = kafka.admin.NewTopic('foo', 1, -1)
34+
with pytest.raises(IllegalArgumentError):
35+
bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1 : [1, 1, 1]})
36+
good_topic = kafka.admin.NewTopic('foo', 1, 2)
37+
assert(good_topic.name == 'foo')
38+
assert(good_topic.num_partitions == 1)
39+
assert(good_topic.replication_factor == 2)
40+
assert(good_topic.replica_assignments == {})
41+
assert(good_topic.topic_configs == {})
42+
good_topic = kafka.admin.NewTopic('bar', -1, -1, {1 : [1, 2, 3]}, {'key' : 'value'})
43+
assert(good_topic.name == 'bar')
44+
assert(good_topic.num_partitions == -1)
45+
assert(good_topic.replication_factor == -1)
46+
assert(good_topic.replica_assignments == {1: [1, 2, 3]})
47+
assert(good_topic.topic_configs == {'key' : 'value'})

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ deps =
2121
crc32c
2222
py26: unittest2
2323
decorator
24+
py27: enum34
2425
commands =
2526
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
2627
setenv =

0 commit comments

Comments
 (0)