Skip to content

Commit 6a88ec0

Browse files
llamahunterjeffwidman
authored andcommitted
Check broker protocol version on each connection to controller.
1 parent 6eeaf63 commit 6a88ec0

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

kafka/admin/kafka.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,6 @@ def __init__(self, **configs):
189189
if self.config['api_version'] is None:
190190
self.config['api_version'] = self._client.config['api_version']
191191

192-
if self.config['api_version'] < (0, 10, 0):
193-
raise UnsupportedVersionError(
194-
"Kafka Admin interface not supported for cluster version {} < 0.10.0.0"
195-
.format(self.config['api_version']))
196192
self._closed = False
197193
self._refresh_controller_id()
198194
log.debug('Kafka administration interface started')
@@ -241,6 +237,11 @@ def _refresh_controller_id(self):
241237
MetadataRequest[1]([])
242238
)
243239
self._controller_id = response.controller_id
240+
version = self._client.check_version(self._controller_id)
241+
if version < (0, 10, 0):
242+
raise UnsupportedVersionError(
243+
"Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
244+
.format(version))
244245

245246
def _send_request_to_node(self, node, request):
246247
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.

0 commit comments

Comments
 (0)