21
21
from kafka .structs import TopicPartition , OffsetAndMetadata
22
22
from kafka .version import __version__
23
23
24
+
24
25
log = logging .getLogger (__name__ )
25
26
27
+
26
28
class KafkaAdmin (object ):
27
- """An class for administering the kafka cluster.
29
+ """A class for administering the Kafka cluster.
28
30
29
31
Warning:
30
32
This is an unstable interface that was recently added and is subject to
@@ -35,10 +37,9 @@ class KafkaAdmin(object):
35
37
36
38
The KafkaAdmin class will negotiate for the latest version of each message
37
39
protocol format supported by both the kafka-python client library and the
38
- kafka broker. Usage of optional fields from protocol versions that are not
40
+ Kafka broker. Usage of optional fields from protocol versions that are not
39
41
supported by the broker will result in IncompatibleBrokerVersion exceptions.
40
42
41
-
42
43
Use of this class requires a minimum broker version >= 0.10.0.0.
43
44
44
45
Keyword Arguments:
@@ -167,16 +168,16 @@ class KafkaAdmin(object):
167
168
'sasl_kerberos_service_name' : 'kafka' ,
168
169
169
170
# metrics configs
170
- 'metric_reporters' : [],
171
+ 'metric_reporters' : [],
171
172
'metrics_num_samples' : 2 ,
172
173
'metrics_sample_window_ms' : 30000 ,
173
174
}
174
175
175
176
def __init__ (self , ** configs ):
176
- log .debug ("Starting Kafka administration interface" )
177
+ log .debug ("Starting KafkaAdmin interface. " )
177
178
extra_configs = set (configs ).difference (self .DEFAULT_CONFIG )
178
179
if extra_configs :
179
- raise KafkaConfigurationError ("Unrecognized configs: %s" % (extra_configs , ))
180
+ raise KafkaConfigurationError ("Unrecognized configs: {}" . format (extra_configs ))
180
181
181
182
self .config = copy .copy (self .DEFAULT_CONFIG )
182
183
self .config .update (configs )
@@ -189,55 +190,59 @@ def __init__(self, **configs):
189
190
reporters = [reporter () for reporter in self .config ['metric_reporters' ]]
190
191
self ._metrics = Metrics (metric_config , reporters )
191
192
192
- self ._client = KafkaClient (metrics = self ._metrics , metric_group_prefix = 'admin' ,
193
- ** self .config )
193
+ self ._client = KafkaClient (metrics = self ._metrics ,
194
+ metric_group_prefix = 'admin' ,
195
+ ** self .config )
194
196
195
197
# Get auto-discovered version from client if necessary
196
198
if self .config ['api_version' ] is None :
197
199
self .config ['api_version' ] = self ._client .config ['api_version' ]
198
200
199
201
self ._closed = False
200
202
self ._refresh_controller_id ()
201
- log .debug ('Kafka administration interface started' )
203
+ log .debug ("KafkaAdmin interface started." )
202
204
203
205
def close (self ):
204
- """Close the administration connection to the kafka broker"""
206
+ """Close the KafkaAdmin connection to the Kafka broker. """
205
207
if not hasattr (self , '_closed' ) or self ._closed :
206
- log .info ('Kafka administration interface already closed' )
208
+ log .info ("KafkaAdmin interface already closed." )
207
209
return
208
210
209
211
self ._metrics .close ()
210
212
self ._client .close ()
211
213
self ._closed = True
212
- log .debug ('Kafka administration interface has closed' )
214
+ log .debug ("KafkaAdmin interface has closed." )
213
215
214
216
def _matching_api_version (self , operation ):
215
- """Find matching api version, the lesser of either the latest api version the library supports, or
216
- the max version supported by the broker
217
+ """Find the latest version of the protocol operation supported by both
218
+ this library and the broker.
219
+
220
+ This resolves to the lesser of either the latest api version this
221
+ library supports, or the max version supported by the broker.
217
222
218
- :param operation: An operation array from kafka.protocol
219
- :return: The max matching version number between client and broker
223
+ :param operation: A list of protocol operation versions from kafka.protocol.
224
+ :return: The max matching version number between client and broker.
220
225
"""
221
226
version = min (len (operation ) - 1 ,
222
227
self ._client .get_api_versions ()[operation [0 ].API_KEY ][1 ])
223
228
if version < self ._client .get_api_versions ()[operation [0 ].API_KEY ][0 ]:
224
- # max library version is less than min broker version. Not sure any brokers
225
- # actually set a min version greater than 0 right now, tho. But maybe in the future?
229
+ # max library version is less than min broker version. Currently,
230
+ # no Kafka versions specify a min msg version. Maybe in the future?
226
231
raise IncompatibleBrokerVersion (
227
- "No version of the '{}' kafka protocol is supported by both the client and broker."
232
+ "No version of the '{}' Kafka protocol is supported by both the client and broker."
228
233
.format (operation .__name__ ))
229
234
return version
230
235
231
236
def _validate_timeout (self , timeout_ms ):
232
- """Validate the timeout is set or use the configuration default
237
+ """Validate the timeout is set or use the configuration default.
233
238
234
- :param timeout_ms: The timeout provided by api call, in milliseconds
235
- :return: The timeout to use for the operation
239
+ :param timeout_ms: The timeout provided by api call, in milliseconds.
240
+ :return: The timeout to use for the operation.
236
241
"""
237
242
return timeout_ms or self .config ['request_timeout_ms' ]
238
243
239
244
def _refresh_controller_id (self ):
240
- """Determine the kafka cluster controller."""
245
+ """Determine the Kafka cluster controller."""
241
246
version = self ._matching_api_version (MetadataRequest )
242
247
if 1 <= version <= 6 :
243
248
request = MetadataRequest [version ]()
@@ -293,31 +298,34 @@ def _find_group_coordinator_id(self, group_id):
293
298
assert group_coordinator != - 1
294
299
return group_coordinator
295
300
296
- def _send_request_to_node (self , node , request ):
297
- """Send a kafka protocol message to a specific broker. Will block until the message result is received .
301
+ def _send_request_to_node (self , node_id , request ):
302
+ """Send a Kafka protocol message to a specific broker.
298
303
299
- :param node: The broker id to which to send the message
300
- :param request: The message to send
301
- :return: The kafka protocol response for the message
302
- :exception: The exception if the message could not be sent
304
+ Will block until the message result is received.
305
+
306
+ :param node_id: The broker id to which to send the message.
307
+ :param request: The message to send.
308
+ :return: The Kafka protocol response for the message.
309
+ :exception: The exception if the message could not be sent.
303
310
"""
304
- while not self ._client .ready (node ):
305
- # connection to broker not ready, poll until it is or send will fail with NodeNotReadyError
311
+ while not self ._client .ready (node_id ):
312
+ # poll until the connection to broker is ready, otherwise send()
313
+ # will fail with NodeNotReadyError
306
314
self ._client .poll ()
307
- future = self ._client .send (node , request )
315
+ future = self ._client .send (node_id , request )
308
316
self ._client .poll (future = future )
309
317
if future .succeeded ():
310
318
return future .value
311
319
else :
312
- raise future .exception # pylint: disable-msg=raising-bad-type
320
+ raise future .exception # pylint: disable-msg=raising-bad-type
313
321
314
322
def _send_request_to_controller (self , request ):
315
- """Send a kafka protocol message to the cluster controller.
323
+ """Send a Kafka protocol message to the cluster controller.
316
324
317
325
Will block until the message result is received.
318
326
319
- :param request: The message to send
320
- :return: The kafka protocol response for the message
327
+ :param request: The message to send.
328
+ :return: The Kafka protocol response for the message.
321
329
"""
322
330
tries = 2 # in case our cached self._controller_id is outdated
323
331
while tries :
@@ -357,11 +365,12 @@ def _convert_new_topic_request(new_topic):
357
365
def create_topics (self , new_topics , timeout_ms = None , validate_only = False ):
358
366
"""Create new topics in the cluster.
359
367
360
- :param new_topics: Array of NewTopic objects
361
- :param timeout_ms: Milliseconds to wait for new topics to be created before broker returns
368
+ :param new_topics: A list of NewTopic objects.
369
+ :param timeout_ms: Milliseconds to wait for new topics to be created
370
+ before the broker returns.
362
371
:param validate_only: If True, don't actually create new topics.
363
372
Not supported by all versions. Default: False
364
- :return: Appropriate version of CreateTopicResponse class
373
+ :return: Appropriate version of CreateTopicResponse class.
365
374
"""
366
375
version = self ._matching_api_version (CreateTopicsRequest )
367
376
timeout_ms = self ._validate_timeout (timeout_ms )
@@ -371,40 +380,44 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
371
380
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
372
381
.format (self .config ['api_version' ]))
373
382
request = CreateTopicsRequest [version ](
374
- create_topic_requests = [self ._convert_new_topic_request (new_topic ) for new_topic in new_topics ],
375
- timeout = timeout_ms
383
+ create_topic_requests = [self ._convert_new_topic_request (new_topic ) for new_topic in new_topics ],
384
+ timeout = timeout_ms
376
385
)
377
386
elif version <= 2 :
378
387
request = CreateTopicsRequest [version ](
379
- create_topic_requests = [self ._convert_new_topic_request (new_topic ) for new_topic in new_topics ],
380
- timeout = timeout_ms ,
381
- validate_only = validate_only
388
+ create_topic_requests = [self ._convert_new_topic_request (new_topic ) for new_topic in new_topics ],
389
+ timeout = timeout_ms ,
390
+ validate_only = validate_only
382
391
)
383
392
else :
384
393
raise NotImplementedError (
385
394
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
386
395
.format (version ))
396
+ # TODO convert structs to a more pythonic interface
397
+ # TODO raise exceptions if errors
387
398
return self ._send_request_to_controller (request )
388
399
389
400
def delete_topics (self , topics , timeout_ms = None ):
390
- """Delete topics from the cluster
401
+ """Delete topics from the cluster.
391
402
392
- :param topics: Array of topic name strings
393
- :param timeout_ms: Milliseconds to wait for topics to be deleted before broker returns
394
- :return: Appropriate version of DeleteTopicsResponse class
403
+ :param topics: A list of topic name strings.
404
+ :param timeout_ms: Milliseconds to wait for topics to be deleted
405
+ before the broker returns.
406
+ :return: Appropriate version of DeleteTopicsResponse class.
395
407
"""
396
408
version = self ._matching_api_version (DeleteTopicsRequest )
397
409
timeout_ms = self ._validate_timeout (timeout_ms )
398
410
if version <= 1 :
399
411
request = DeleteTopicsRequest [version ](
400
- topics = topics ,
401
- timeout = timeout_ms
412
+ topics = topics ,
413
+ timeout = timeout_ms
402
414
)
415
+ response = self ._send_request_to_controller (request )
403
416
else :
404
417
raise NotImplementedError (
405
418
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
406
419
.format (version ))
407
- return self . _send_request_to_controller ( request )
420
+ return response
408
421
409
422
# list topics functionality is in ClusterMetadata
410
423
# Note: if implemented here, send the request to the least_loaded_node()
@@ -435,14 +448,15 @@ def _convert_describe_config_resource_request(config_resource):
435
448
)
436
449
437
450
def describe_configs (self , config_resources , include_synonyms = False ):
438
- """Fetch configuration parameters for one or more kafka resources.
451
+ """Fetch configuration parameters for one or more Kafka resources.
439
452
440
- :param config_resources: An array of ConfigResource objects.
441
- Any keys in ConfigResource.configs dict will be used to filter the result. The configs dict should be None
442
- to get all values. An empty dict will get zero values (as per kafka protocol).
443
- :param include_synonyms: If True, return synonyms in response. Not
453
+ :param config_resources: An list of ConfigResource objects.
454
+ Any keys in ConfigResource.configs dict will be used to filter the
455
+ result. Setting the configs dict to None will get all values. An
456
+ empty dict will get zero values (as per Kafka protocol).
457
+ :param include_synonyms: If True, return synonyms in response. Not
444
458
supported by all versions. Default: False.
445
- :return: Appropriate version of DescribeConfigsResponse class
459
+ :return: Appropriate version of DescribeConfigsResponse class.
446
460
"""
447
461
version = self ._matching_api_version (DescribeConfigsRequest )
448
462
if version == 0 :
@@ -451,12 +465,12 @@ def describe_configs(self, config_resources, include_synonyms=False):
451
465
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
452
466
.format (self .config ['api_version' ]))
453
467
request = DescribeConfigsRequest [version ](
454
- resources = [self ._convert_describe_config_resource_request (config_resource ) for config_resource in config_resources ]
468
+ resources = [self ._convert_describe_config_resource_request (config_resource ) for config_resource in config_resources ]
455
469
)
456
- elif version < = 1 :
470
+ elif version = = 1 :
457
471
request = DescribeConfigsRequest [version ](
458
- resources = [self ._convert_describe_config_resource_request (config_resource ) for config_resource in config_resources ],
459
- include_synonyms = include_synonyms
472
+ resources = [self ._convert_describe_config_resource_request (config_resource ) for config_resource in config_resources ],
473
+ include_synonyms = include_synonyms
460
474
)
461
475
else :
462
476
raise NotImplementedError (
@@ -475,21 +489,21 @@ def _convert_alter_config_resource_request(config_resource):
475
489
)
476
490
477
491
def alter_configs (self , config_resources ):
478
- """Alter configuration parameters of one or more kafka resources.
492
+ """Alter configuration parameters of one or more Kafka resources.
479
493
480
494
Warning:
481
495
This is currently broken for BROKER resources because those must be
482
496
sent to that specific broker, versus this always picks the
483
497
least-loaded node. See the comment in the source code for details.
484
498
We would happily accept a PR fixing this.
485
499
486
- :param config_resources: An array of ConfigResource objects.
487
- :return: Appropriate version of AlterConfigsResponse class
500
+ :param config_resources: A list of ConfigResource objects.
501
+ :return: Appropriate version of AlterConfigsResponse class.
488
502
"""
489
503
version = self ._matching_api_version (AlterConfigsRequest )
490
504
if version == 0 :
491
505
request = AlterConfigsRequest [version ](
492
- resources = [self ._convert_alter_config_resource_request (config_resource ) for config_resource in config_resources ]
506
+ resources = [self ._convert_alter_config_resource_request (config_resource ) for config_resource in config_resources ]
493
507
)
494
508
else :
495
509
raise NotImplementedError (
@@ -522,19 +536,20 @@ def _convert_create_partitions_request(topic_name, new_partitions):
522
536
def create_partitions (self , topic_partitions , timeout_ms = None , validate_only = False ):
523
537
"""Create additional partitions for an existing topic.
524
538
525
- :param topic_partitions: A map of topic name strings to NewPartition objects
526
- :param timeout_ms: Milliseconds to wait for new partitions to be created before broker returns
539
+ :param topic_partitions: A map of topic name strings to NewPartition objects.
540
+ :param timeout_ms: Milliseconds to wait for new partitions to be
541
+ created before the broker returns.
527
542
:param validate_only: If True, don't actually create new partitions.
528
543
Default: False
529
- :return: Appropriate version of CreatePartitionsResponse class
544
+ :return: Appropriate version of CreatePartitionsResponse class.
530
545
"""
531
546
version = self ._matching_api_version (CreatePartitionsRequest )
532
547
timeout_ms = self ._validate_timeout (timeout_ms )
533
548
if version == 0 :
534
549
request = CreatePartitionsRequest [version ](
535
- topic_partitions = [self ._convert_create_partitions_request (topic_name , new_partitions ) for topic_name , new_partitions in topic_partitions .items ()],
536
- timeout = timeout_ms ,
537
- validate_only = validate_only
550
+ topic_partitions = [self ._convert_create_partitions_request (topic_name , new_partitions ) for topic_name , new_partitions in topic_partitions .items ()],
551
+ timeout = timeout_ms ,
552
+ validate_only = validate_only
538
553
)
539
554
else :
540
555
raise NotImplementedError (
0 commit comments