Skip to content

Commit c52f25a

Browse files
authored
Pass metrics_enabled=False to disable metrics (#2581)
1 parent 56eb39d commit c52f25a

File tree

11 files changed

+120
-71
lines changed

11 files changed

+120
-71
lines changed

kafka/consumer/fetcher.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,13 @@ class Fetcher(six.Iterator):
5656
'max_partition_fetch_bytes': 1048576,
5757
'max_poll_records': sys.maxsize,
5858
'check_crcs': True,
59+
'metrics': None,
5960
'metric_group_prefix': 'consumer',
6061
'retry_backoff_ms': 100,
6162
'enable_incremental_fetch_sessions': True,
6263
}
6364

64-
def __init__(self, client, subscriptions, metrics, **configs):
65+
def __init__(self, client, subscriptions, **configs):
6566
"""Initialize a Kafka Message Fetcher.
6667
6768
Keyword Arguments:
@@ -111,7 +112,10 @@ def __init__(self, client, subscriptions, metrics, **configs):
111112
self._next_partition_records = None # Holds a single PartitionRecords until fully consumed
112113
self._iterator = None
113114
self._fetch_futures = collections.deque()
114-
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
115+
if self.config['metrics']:
116+
self._sensors = FetchManagerMetrics(self.config['metrics'], self.config['metric_group_prefix'])
117+
else:
118+
self._sensors = None
115119
self._isolation_level = READ_UNCOMMITTED
116120
self._session_handlers = {}
117121
self._nodes_with_pending_fetch_requests = set()
@@ -391,7 +395,7 @@ def _append(self, drained, part, max_records, update_offsets):
391395
# when each message is yielded). There may be edge cases where we re-fetch records
392396
# that we'll end up skipping, but for now we'll live with that.
393397
highwater = self._subscriptions.assignment[tp].highwater
394-
if highwater is not None:
398+
if highwater is not None and self._sensors:
395399
self._sensors.records_fetch_lag.record(highwater - part.next_fetch_offset)
396400
if update_offsets or not part_records:
397401
# TODO: save leader_epoch
@@ -705,7 +709,10 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
705709
partitions = set([TopicPartition(topic, partition_data[0])
706710
for topic, partitions in response.topics
707711
for partition_data in partitions])
708-
metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions)
712+
if self._sensors:
713+
metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions)
714+
else:
715+
metric_aggregator = None
709716

710717
for topic, partitions in response.topics:
711718
for partition_data in partitions:
@@ -719,7 +726,8 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
719726
)
720727
self._completed_fetches.append(completed_fetch)
721728

722-
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
729+
if self._sensors:
730+
self._sensors.fetch_latency.record((time.time() - send_time) * 1000)
723731
self._nodes_with_pending_fetch_requests.remove(node_id)
724732

725733
def _handle_fetch_error(self, node_id, exception):
@@ -816,7 +824,7 @@ def _parse_fetched_data(self, completed_fetch):
816824
raise error_type('Unexpected error while fetching data')
817825

818826
finally:
819-
if parsed_records is None:
827+
if parsed_records is None and completed_fetch.metric_aggregator:
820828
completed_fetch.metric_aggregator.record(tp, 0, 0)
821829

822830
if error_type is not Errors.NoError:
@@ -873,7 +881,8 @@ def __bool__(self):
873881
def drain(self):
874882
if self.record_iterator is not None:
875883
self.record_iterator = None
876-
self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read)
884+
if self.metric_aggregator:
885+
self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read)
877886
self.on_drain(self)
878887

879888
def take(self, n=None):

kafka/consumer/group.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ class KafkaConsumer(six.Iterator):
234234
metric_reporters (list): A list of classes to use as metrics reporters.
235235
Implementing the AbstractMetricsReporter interface allows plugging
236236
in classes that will be notified of new metric creation. Default: []
237+
metrics_enabled (bool): Whether to track metrics on this instance. Default True.
237238
metrics_num_samples (int): The number of samples maintained to compute
238239
metrics. Default: 2
239240
metrics_sample_window_ms (int): The maximum age in milliseconds of
@@ -315,6 +316,7 @@ class KafkaConsumer(six.Iterator):
315316
'api_version_auto_timeout_ms': 2000,
316317
'connections_max_idle_ms': 9 * 60 * 1000,
317318
'metric_reporters': [],
319+
'metrics_enabled': True,
318320
'metrics_num_samples': 2,
319321
'metrics_sample_window_ms': 30000,
320322
'metric_group_prefix': 'consumer',
@@ -358,13 +360,15 @@ def __init__(self, *topics, **configs):
358360
"fetch_max_wait_ms ({})."
359361
.format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms))
360362

361-
metrics_tags = {'client-id': self.config['client_id']}
362-
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
363-
time_window_ms=self.config['metrics_sample_window_ms'],
364-
tags=metrics_tags)
365-
reporters = [reporter() for reporter in self.config['metric_reporters']]
366-
self._metrics = Metrics(metric_config, reporters)
367-
# TODO _metrics likely needs to be passed to KafkaClient, etc.
363+
if self.config['metrics_enabled']:
364+
metrics_tags = {'client-id': self.config['client_id']}
365+
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
366+
time_window_ms=self.config['metrics_sample_window_ms'],
367+
tags=metrics_tags)
368+
reporters = [reporter() for reporter in self.config['metric_reporters']]
369+
self._metrics = Metrics(metric_config, reporters)
370+
else:
371+
self._metrics = None
368372

369373
# api_version was previously a str. Accept old format for now
370374
if isinstance(self.config['api_version'], str):
@@ -402,9 +406,9 @@ def __init__(self, *topics, **configs):
402406

403407
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
404408
self._fetcher = Fetcher(
405-
self._client, self._subscription, self._metrics, **self.config)
409+
self._client, self._subscription, metrics=self._metrics, **self.config)
406410
self._coordinator = ConsumerCoordinator(
407-
self._client, self._subscription, self._metrics,
411+
self._client, self._subscription, metrics=self._metrics,
408412
assignors=self.config['partition_assignment_strategy'],
409413
**self.config)
410414
self._closed = False
@@ -485,7 +489,8 @@ def close(self, autocommit=True, timeout_ms=None):
485489
log.debug("Closing the KafkaConsumer.")
486490
self._closed = True
487491
self._coordinator.close(autocommit=autocommit, timeout_ms=timeout_ms)
488-
self._metrics.close()
492+
if self._metrics:
493+
self._metrics.close()
489494
self._client.close()
490495
try:
491496
self.config['key_deserializer'].close()
@@ -989,6 +994,8 @@ def metrics(self, raw=False):
989994
This is an unstable interface. It may change in future
990995
releases without warning.
991996
"""
997+
if not self._metrics:
998+
return
992999
if raw:
9931000
return self._metrics.metrics.copy()
9941001

kafka/coordinator/base.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,11 @@ class BaseCoordinator(object):
8484
'max_poll_interval_ms': 300000,
8585
'retry_backoff_ms': 100,
8686
'api_version': (0, 10, 1),
87+
'metrics': None,
8788
'metric_group_prefix': '',
8889
}
8990

90-
def __init__(self, client, metrics, **configs):
91+
def __init__(self, client, **configs):
9192
"""
9293
Keyword Arguments:
9394
group_id (str): name of the consumer group to join for dynamic
@@ -130,8 +131,11 @@ def __init__(self, client, metrics, **configs):
130131
self.coordinator_id = None
131132
self._find_coordinator_future = None
132133
self._generation = Generation.NO_GENERATION
133-
self.sensors = GroupCoordinatorMetrics(self.heartbeat, metrics,
134-
self.config['metric_group_prefix'])
134+
if self.config['metrics']:
135+
self._sensors = GroupCoordinatorMetrics(self.heartbeat, self.config['metrics'],
136+
self.config['metric_group_prefix'])
137+
else:
138+
self._sensors = None
135139

136140
@abc.abstractmethod
137141
def protocol_type(self):
@@ -531,7 +535,8 @@ def _handle_join_group_response(self, future, send_time, response):
531535
if error_type is Errors.NoError:
532536
log.debug("Received successful JoinGroup response for group %s: %s",
533537
self.group_id, response)
534-
self.sensors.join_latency.record((time.time() - send_time) * 1000)
538+
if self._sensors:
539+
self._sensors.join_latency.record((time.time() - send_time) * 1000)
535540
with self._lock:
536541
if self.state is not MemberState.REBALANCING:
537542
# if the consumer was woken up before a rebalance completes,
@@ -650,7 +655,8 @@ def _send_sync_group_request(self, request):
650655
def _handle_sync_group_response(self, future, send_time, response):
651656
error_type = Errors.for_code(response.error_code)
652657
if error_type is Errors.NoError:
653-
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
658+
if self._sensors:
659+
self._sensors.sync_latency.record((time.time() - send_time) * 1000)
654660
future.success(response.member_assignment)
655661
return
656662

@@ -856,7 +862,8 @@ def _send_heartbeat_request(self):
856862
return future
857863

858864
def _handle_heartbeat_response(self, future, send_time, response):
859-
self.sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
865+
if self._sensors:
866+
self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000)
860867
error_type = Errors.for_code(response.error_code)
861868
if error_type is Errors.NoError:
862869
log.debug("Received successful heartbeat response for group %s",

kafka/coordinator/consumer.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@ class ConsumerCoordinator(BaseCoordinator):
3939
'retry_backoff_ms': 100,
4040
'api_version': (0, 10, 1),
4141
'exclude_internal_topics': True,
42+
'metrics': None,
4243
'metric_group_prefix': 'consumer'
4344
}
4445

45-
def __init__(self, client, subscription, metrics, **configs):
46+
def __init__(self, client, subscription, **configs):
4647
"""Initialize the coordination manager.
4748
4849
Keyword Arguments:
@@ -78,7 +79,7 @@ def __init__(self, client, subscription, metrics, **configs):
7879
True the only way to receive records from an internal topic is
7980
subscribing to it. Requires 0.10+. Default: True
8081
"""
81-
super(ConsumerCoordinator, self).__init__(client, metrics, **configs)
82+
super(ConsumerCoordinator, self).__init__(client, **configs)
8283

8384
self.config = copy.copy(self.DEFAULT_CONFIG)
8485
for key in self.config:
@@ -120,8 +121,11 @@ def __init__(self, client, subscription, metrics, **configs):
120121
else:
121122
self.next_auto_commit_deadline = time.time() + self.auto_commit_interval
122123

123-
self.consumer_sensors = ConsumerCoordinatorMetrics(
124-
metrics, self.config['metric_group_prefix'], self._subscription)
124+
if self.config['metrics']:
125+
self._consumer_sensors = ConsumerCoordinatorMetrics(
126+
self.config['metrics'], self.config['metric_group_prefix'], self._subscription)
127+
else:
128+
self._consumer_sensors = None
125129

126130
self._cluster.request_update()
127131
self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
@@ -686,7 +690,8 @@ def _send_offset_commit_request(self, offsets):
686690

687691
def _handle_offset_commit_response(self, offsets, future, send_time, response):
688692
# TODO look at adding request_latency_ms to response (like java kafka)
689-
self.consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
693+
if self._consumer_sensors:
694+
self._consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
690695
unauthorized_topics = set()
691696

692697
for topic, partitions in response.topics:

kafka/producer/kafka.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ class KafkaProducer(object):
267267
metric_reporters (list): A list of classes to use as metrics reporters.
268268
Implementing the AbstractMetricsReporter interface allows plugging
269269
in classes that will be notified of new metric creation. Default: []
270+
metrics_enabled (bool): Whether to track metrics on this instance. Default True.
270271
metrics_num_samples (int): The number of samples maintained to compute
271272
metrics. Default: 2
272273
metrics_sample_window_ms (int): The maximum age in milliseconds of
@@ -336,6 +337,7 @@ class KafkaProducer(object):
336337
'api_version': None,
337338
'api_version_auto_timeout_ms': 2000,
338339
'metric_reporters': [],
340+
'metrics_enabled': True,
339341
'metrics_num_samples': 2,
340342
'metrics_sample_window_ms': 30000,
341343
'selector': selectors.DefaultSelector,
@@ -393,12 +395,15 @@ def __init__(self, **configs):
393395
str(self.config['api_version']), deprecated)
394396

395397
# Configure metrics
396-
metrics_tags = {'client-id': self.config['client_id']}
397-
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
398-
time_window_ms=self.config['metrics_sample_window_ms'],
399-
tags=metrics_tags)
400-
reporters = [reporter() for reporter in self.config['metric_reporters']]
401-
self._metrics = Metrics(metric_config, reporters)
398+
if self.config['metrics_enabled']:
399+
metrics_tags = {'client-id': self.config['client_id']}
400+
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],
401+
time_window_ms=self.config['metrics_sample_window_ms'],
402+
tags=metrics_tags)
403+
reporters = [reporter() for reporter in self.config['metric_reporters']]
404+
self._metrics = Metrics(metric_config, reporters)
405+
else:
406+
self._metrics = None
402407

403408
client = self.config['kafka_client'](
404409
metrics=self._metrics, metric_group_prefix='producer',
@@ -424,11 +429,12 @@ def __init__(self, **configs):
424429
self.config['compression_attrs'] = compression_attrs
425430

426431
message_version = self._max_usable_produce_magic()
427-
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
432+
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
428433
self._metadata = client.cluster
429434
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
430435
self._sender = Sender(client, self._metadata,
431-
self._accumulator, self._metrics,
436+
self._accumulator,
437+
metrics=self._metrics,
432438
guarantee_message_order=guarantee_message_order,
433439
**self.config)
434440
self._sender.daemon = True
@@ -524,7 +530,8 @@ def __getattr__(self, name):
524530
timeout)
525531
self._sender.force_close()
526532

527-
self._metrics.close()
533+
if self._metrics:
534+
self._metrics.close()
528535
try:
529536
self.config['key_serializer'].close()
530537
except AttributeError:
@@ -773,6 +780,8 @@ def metrics(self, raw=False):
773780
This is an unstable interface. It may change in future
774781
releases without warning.
775782
"""
783+
if not self._metrics:
784+
return
776785
if raw:
777786
return self._metrics.metrics.copy()
778787

kafka/producer/record_accumulator.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,6 @@ class RecordAccumulator(object):
162162
'linger_ms': 0,
163163
'retry_backoff_ms': 100,
164164
'message_version': 0,
165-
'metrics': None,
166-
'metric_group_prefix': 'producer-metrics',
167165
}
168166

169167
def __init__(self, **configs):

kafka/producer/sender.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ class Sender(threading.Thread):
2929
'acks': 1,
3030
'retries': 0,
3131
'request_timeout_ms': 30000,
32+
'metrics': None,
3233
'guarantee_message_order': False,
3334
'client_id': 'kafka-python-' + __version__,
3435
}
3536

36-
def __init__(self, client, metadata, accumulator, metrics, **configs):
37+
def __init__(self, client, metadata, accumulator, **configs):
3738
super(Sender, self).__init__()
3839
self.config = copy.copy(self.DEFAULT_CONFIG)
3940
for key in self.config:
@@ -47,7 +48,10 @@ def __init__(self, client, metadata, accumulator, metrics, **configs):
4748
self._running = True
4849
self._force_close = False
4950
self._topics_to_add = set()
50-
self._sensors = SenderMetrics(metrics, self._client, self._metadata)
51+
if self.config['metrics']:
52+
self._sensors = SenderMetrics(self.config['metrics'], self._client, self._metadata)
53+
else:
54+
self._sensors = None
5155

5256
def run(self):
5357
"""The main run loop for the sender thread."""
@@ -123,10 +127,12 @@ def run_once(self):
123127

124128
expired_batches = self._accumulator.abort_expired_batches(
125129
self.config['request_timeout_ms'], self._metadata)
126-
for expired_batch in expired_batches:
127-
self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
128130

129-
self._sensors.update_produce_request_metrics(batches_by_node)
131+
if self._sensors:
132+
for expired_batch in expired_batches:
133+
self._sensors.record_errors(expired_batch.topic_partition.topic, expired_batch.record_count)
134+
self._sensors.update_produce_request_metrics(batches_by_node)
135+
130136
requests = self._create_produce_requests(batches_by_node)
131137
# If we have any nodes that are ready to send + have sendable data,
132138
# poll with 0 timeout so this can immediately loop and try sending more
@@ -237,15 +243,16 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star
237243
self.config['retries'] - batch.attempts - 1,
238244
error)
239245
self._accumulator.reenqueue(batch)
240-
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
246+
if self._sensors:
247+
self._sensors.record_retries(batch.topic_partition.topic, batch.record_count)
241248
else:
242249
if error is Errors.TopicAuthorizationFailedError:
243250
error = error(batch.topic_partition.topic)
244251

245252
# tell the user the result of their request
246253
batch.done(base_offset, timestamp_ms, error, log_start_offset)
247254
self._accumulator.deallocate(batch)
248-
if error is not None:
255+
if error is not None and self._sensors:
249256
self._sensors.record_errors(batch.topic_partition.topic, batch.record_count)
250257

251258
if getattr(error, 'invalid_metadata', False):

0 commit comments

Comments
 (0)