Skip to content

Commit c48817e

Browse files
author
Tincu Gabriel
authored
Support configuration of custom kafka client for Admin/Consumer/Producer (dpkp#2144)
1 parent 53dc740 commit c48817e

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

kafka/admin/client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ class KafkaAdminClient(object):
146146
sasl mechanism handshake. Default: one of bootstrap servers
147147
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
148148
instance. (See kafka.oauth.abstract). Default: None
149+
kafka_client (callable): Custom class / callable for creating KafkaClient instances
149150
150151
"""
151152
DEFAULT_CONFIG = {
@@ -186,6 +187,7 @@ class KafkaAdminClient(object):
186187
'metric_reporters': [],
187188
'metrics_num_samples': 2,
188189
'metrics_sample_window_ms': 30000,
190+
'kafka_client': KafkaClient,
189191
}
190192

191193
def __init__(self, **configs):
@@ -205,9 +207,11 @@ def __init__(self, **configs):
205207
reporters = [reporter() for reporter in self.config['metric_reporters']]
206208
self._metrics = Metrics(metric_config, reporters)
207209

208-
self._client = KafkaClient(metrics=self._metrics,
209-
metric_group_prefix='admin',
210-
**self.config)
210+
self._client = self.config['kafka_client'](
211+
metrics=self._metrics,
212+
metric_group_prefix='admin',
213+
**self.config
214+
)
211215
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
212216

213217
# Get auto-discovered version from client if necessary

kafka/consumer/group.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class KafkaConsumer(six.Iterator):
244244
sasl mechanism handshake. Default: one of bootstrap servers
245245
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
246246
instance. (See kafka.oauth.abstract). Default: None
247+
kafka_client (callable): Custom class / callable for creating KafkaClient instances
247248
248249
Note:
249250
Configuration parameters are described in more detail at
@@ -306,6 +307,7 @@ class KafkaConsumer(six.Iterator):
306307
'sasl_kerberos_domain_name': None,
307308
'sasl_oauth_token_provider': None,
308309
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
310+
'kafka_client': KafkaClient,
309311
}
310312
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
311313

@@ -353,7 +355,7 @@ def __init__(self, *topics, **configs):
353355
log.warning('use api_version=%s [tuple] -- "%s" as str is deprecated',
354356
str(self.config['api_version']), str_version)
355357

356-
self._client = KafkaClient(metrics=self._metrics, **self.config)
358+
self._client = self.config['kafka_client'](metrics=self._metrics, **self.config)
357359

358360
# Get auto-discovered version from client if necessary
359361
if self.config['api_version'] is None:

kafka/producer/kafka.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ class KafkaProducer(object):
280280
sasl mechanism handshake. Default: one of bootstrap servers
281281
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
282282
instance. (See kafka.oauth.abstract). Default: None
283+
kafka_client (callable): Custom class / callable for creating KafkaClient instances
283284
284285
Note:
285286
Configuration parameters are described in more detail at
@@ -332,7 +333,8 @@ class KafkaProducer(object):
332333
'sasl_plain_password': None,
333334
'sasl_kerberos_service_name': 'kafka',
334335
'sasl_kerberos_domain_name': None,
335-
'sasl_oauth_token_provider': None
336+
'sasl_oauth_token_provider': None,
337+
'kafka_client': KafkaClient,
336338
}
337339

338340
_COMPRESSORS = {
@@ -378,9 +380,10 @@ def __init__(self, **configs):
378380
reporters = [reporter() for reporter in self.config['metric_reporters']]
379381
self._metrics = Metrics(metric_config, reporters)
380382

381-
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
382-
wakeup_timeout_ms=self.config['max_block_ms'],
383-
**self.config)
383+
client = self.config['kafka_client'](
384+
metrics=self._metrics, metric_group_prefix='producer',
385+
wakeup_timeout_ms=self.config['max_block_ms'],
386+
**self.config)
384387

385388
# Get auto-discovered version from client if necessary
386389
if self.config['api_version'] is None:

0 commit comments

Comments
 (0)