File tree Expand file tree Collapse file tree 1 file changed +5
-4
lines changed Expand file tree Collapse file tree 1 file changed +5
-4
lines changed Original file line number Diff line number Diff line change @@ -189,10 +189,6 @@ def __init__(self, **configs):
189
189
if self .config ['api_version' ] is None :
190
190
self .config ['api_version' ] = self ._client .config ['api_version' ]
191
191
192
- if self .config ['api_version' ] < (0 , 10 , 0 ):
193
- raise UnsupportedVersionError (
194
- "Kafka Admin interface not supported for cluster version {} < 0.10.0.0"
195
- .format (self .config ['api_version' ]))
196
192
self ._closed = False
197
193
self ._refresh_controller_id ()
198
194
log .debug ('Kafka administration interface started' )
@@ -241,6 +237,11 @@ def _refresh_controller_id(self):
241
237
MetadataRequest [1 ]([])
242
238
)
243
239
self ._controller_id = response .controller_id
240
+ version = self ._client .check_version (self ._controller_id )
241
+ if version < (0 , 10 , 0 ):
242
+ raise UnsupportedVersionError (
243
+ "Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
244
+ .format (version ))
244
245
245
246
def _send_request_to_node (self , node , request ):
246
247
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
You can’t perform that action at this time.
0 commit comments