Skip to content

Commit 5069088

Browse files
committed
Fix send to controller
The controller send error handling was completely broken. It also pinned the metadata version unnecessarily. Additionally, several of the methods were sending to the controller but either that was unnecessary, or just plain wrong. So updated following the pattern of the Java Admin client.
1 parent ac1a2a0 commit 5069088

File tree

1 file changed

+89
-44
lines changed

1 file changed

+89
-44
lines changed

kafka/admin/kafka.py

Lines changed: 89 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from kafka.client_async import KafkaClient, selectors
77
import kafka.errors as Errors
88
from kafka.errors import (
9-
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
10-
NodeNotReadyError, NotControllerError)
9+
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
10+
UnrecognizedBrokerVersion)
1111
from kafka.metrics import MetricConfig, Metrics
1212
from kafka.protocol.admin import (
1313
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
@@ -232,17 +232,22 @@ def _validate_timeout(self, timeout_ms):
232232
return timeout_ms or self.config['request_timeout_ms']
233233

234234
def _refresh_controller_id(self):
235-
"""Determine the kafka cluster controller
236-
"""
237-
response = self._send_request_to_node(
238-
self._client.least_loaded_node(),
239-
MetadataRequest[1]([])
240-
)
241-
self._controller_id = response.controller_id
242-
version = self._client.check_version(self._controller_id)
243-
if version < (0, 10, 0):
244-
raise IncompatibleBrokerVersion(
245-
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
235+
"""Determine the kafka cluster controller."""
236+
version = self._matching_api_version(MetadataRequest)
237+
if 1 <= version <= 6:
238+
request = MetadataRequest[version]()
239+
response = self._send_request_to_node(self._client.least_loaded_node(), request)
240+
controller_id = response.controller_id
241+
# verify the controller is new enough to support our requests
242+
controller_version = self._client.check_version(controller_id)
243+
if controller_version < (0, 10, 0):
244+
raise IncompatibleBrokerVersion(
245+
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
246+
.format(controller_version))
247+
self._controller_id = controller_id
248+
else:
249+
raise UnrecognizedBrokerVersion(
250+
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
246251
.format(version))
247252

248253
def _find_group_coordinator_id(self, group_id):
@@ -301,22 +306,34 @@ def _send_request_to_node(self, node, request):
301306
else:
302307
raise future.exception # pylint: disable-msg=raising-bad-type
303308

304-
def _send(self, request):
305-
"""Send a kafka protocol message to the cluster controller. Will block until the message result is received.
309+
def _send_request_to_controller(self, request):
310+
"""Send a kafka protocol message to the cluster controller.
311+
312+
Will block until the message result is received.
306313
307314
:param request: The message to send
308-
:return The kafka protocol response for the message
309-
:exception NodeNotReadyError: If the controller connection can't be established
315+
:return: The kafka protocol response for the message
310316
"""
311-
remaining_tries = 2
312-
while remaining_tries > 0:
313-
remaining_tries = remaining_tries - 1
314-
try:
315-
return self._send_request_to_node(self._controller_id, request)
316-
except (NotControllerError, KafkaConnectionError) as e:
317-
# controller changed? refresh it
318-
self._refresh_controller_id()
319-
raise NodeNotReadyError(self._controller_id)
317+
tries = 2 # in case our cached self._controller_id is outdated
318+
while tries:
319+
tries -= 1
320+
response = self._send_request_to_node(self._controller_id, request)
321+
# DeleteTopicsResponse returns topic_error_codes rather than topic_errors
322+
for topic, error_code in getattr(response, "topic_errors", response.topic_error_codes):
323+
error_type = Errors.for_code(error_code)
324+
if tries and isinstance(error_type, NotControllerError):
325+
# No need to inspect the rest of the errors for
326+
# non-retriable errors because NotControllerError should
327+
# either be thrown for all errors or no errors.
328+
self._refresh_controller_id()
329+
break
330+
elif error_type is not Errors.NoError:
331+
raise error_type(
332+
"Request '{}' failed with response '{}'."
333+
.format(request, response))
334+
else:
335+
return response
336+
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")
320337

321338
@staticmethod
322339
def _convert_new_topic_request(new_topic):
@@ -362,7 +379,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
362379
raise NotImplementedError(
363380
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
364381
.format(version))
365-
return self._send(request)
382+
return self._send_request_to_controller(request)
366383

367384
def delete_topics(self, topics, timeout_ms=None):
368385
"""Delete topics from the cluster
@@ -382,19 +399,25 @@ def delete_topics(self, topics, timeout_ms=None):
382399
raise NotImplementedError(
383400
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
384401
.format(version))
385-
return self._send(request)
402+
return self._send_request_to_controller(request)
386403

387404
# list topics functionality is in ClusterMetadata
405+
# Note: if implemented here, send the request to the least_loaded_node()
388406

389407
# describe topics functionality is in ClusterMetadata
408+
# Note: if implemented here, send the request to the controller
390409

391410
# describe cluster functionality is in ClusterMetadata
411+
# Note: if implemented here, send the request to the least_loaded_node()
392412

393-
# describe_acls protocol not implemented
413+
# describe_acls protocol not yet implemented
414+
# Note: send the request to the least_loaded_node()
394415

395-
# create_acls protocol not implemented
416+
# create_acls protocol not yet implemented
417+
# Note: send the request to the least_loaded_node()
396418

397-
# delete_acls protocol not implemented
419+
# delete_acls protocol not yet implemented
420+
# Note: send the request to the least_loaded_node()
398421

399422
@staticmethod
400423
def _convert_describe_config_resource_request(config_resource):
@@ -434,7 +457,7 @@ def describe_configs(self, config_resources, include_synonyms=None):
434457
raise NotImplementedError(
435458
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
436459
.format(version))
437-
return self._send(request)
460+
return self._send_request_to_node(self._client.least_loaded_node(), request)
438461

439462
@staticmethod
440463
def _convert_alter_config_resource_request(config_resource):
@@ -449,6 +472,12 @@ def _convert_alter_config_resource_request(config_resource):
449472
def alter_configs(self, config_resources):
450473
"""Alter configuration parameters of one or more kafka resources.
451474
475+
Warning:
476+
This is currently broken for BROKER resources because those must be
477+
sent to that specific broker, versus this always picks the
478+
least-loaded node. See the comment in the source code for details.
479+
We would happily accept a PR fixing this.
480+
452481
:param config_resources: An array of ConfigResource objects.
453482
:return: Appropriate version of AlterConfigsResponse class
454483
"""
@@ -461,11 +490,19 @@ def alter_configs(self, config_resources):
461490
raise NotImplementedError(
462491
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
463492
.format(version))
464-
return self._send(request)
493+
# TODO the Java client has the note:
494+
# // We must make a separate AlterConfigs request for every BROKER resource we want to alter
495+
# // and send the request to that specific broker. Other resources are grouped together into
496+
# // a single request that may be sent to any broker.
497+
#
498+
# So this is currently broken as it always sends to the least_loaded_node()
499+
return self._send_request_to_node(self._client.least_loaded_node(), request)
465500

466-
# alter replica logs dir protocol not implemented
501+
# alter replica logs dir protocol not yet implemented
502+
# Note: have to lookup the broker with the replica assignment and send the request to that broker
467503

468-
# describe log dirs protocol not implemented
504+
# describe log dirs protocol not yet implemented
505+
# Note: have to lookup the broker with the replica assignment and send the request to that broker
469506

470507
@staticmethod
471508
def _convert_create_partitions_request(topic_name, new_partitions):
@@ -498,17 +535,22 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
498535
raise NotImplementedError(
499536
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
500537
.format(version))
501-
return self._send(request)
538+
return self._send_request_to_controller(request)
502539

503-
# delete records protocol not implemented
540+
# delete records protocol not yet implemented
541+
# Note: send the request to the partition leaders
504542

505-
# create delegation token protocol not implemented
543+
# create delegation token protocol not yet implemented
544+
# Note: send the request to the least_loaded_node()
506545

507-
# renew delegation token protocol not implemented
546+
# renew delegation token protocol not yet implemented
547+
# Note: send the request to the least_loaded_node()
508548

509-
# expire delegation_token protocol not implemented
549+
# expire delegation_token protocol not yet implemented
550+
# Note: send the request to the least_loaded_node()
510551

511-
# describe delegation_token protocol not implemented
552+
# describe delegation_token protocol not yet implemented
553+
# Note: send the request to the least_loaded_node()
512554

513555
def describe_consumer_groups(self, group_ids):
514556
"""Describe a set of consumer groups.
@@ -525,7 +567,8 @@ def describe_consumer_groups(self, group_ids):
525567
raise NotImplementedError(
526568
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
527569
.format(version))
528-
return self._send(request)
570+
# TODO this is completely broken, as it needs to send to the group coordinator
571+
# return self._send(request)
529572

530573
def list_consumer_groups(self):
531574
"""List all consumer groups known to the cluster.
@@ -539,6 +582,8 @@ def list_consumer_groups(self):
539582
raise NotImplementedError(
540583
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
541584
.format(version))
542-
return self._send(request)
585+
# TODO this is completely broken, as it needs to send to the group coordinator
586+
# return self._send(request)
543587

544-
# delete groups protocol not implemented
588+
# delete groups protocol not yet implemented
589+
# Note: send the request to the group's coordinator.

0 commit comments

Comments
 (0)