Skip to content

Commit 86a7f86

Browse files
committed
Add metrics instrumentation celery
1 parent ffbbb4d commit 86a7f86

File tree

4 files changed

+150
-3
lines changed

4 files changed

+150
-3
lines changed

instrumentation/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
| [opentelemetry-instrumentation-boto](./opentelemetry-instrumentation-boto) | boto~=2.0 | No
1111
| [opentelemetry-instrumentation-boto3sqs](./opentelemetry-instrumentation-boto3sqs) | boto3 ~= 1.0 | No
1212
| [opentelemetry-instrumentation-botocore](./opentelemetry-instrumentation-botocore) | botocore ~= 1.0 | No
13-
| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | No
13+
| [opentelemetry-instrumentation-celery](./opentelemetry-instrumentation-celery) | celery >= 4.0, < 6.0 | Yes
1414
| [opentelemetry-instrumentation-confluent-kafka](./opentelemetry-instrumentation-confluent-kafka) | confluent-kafka >= 1.8.2, < 2.0.0 | No
1515
| [opentelemetry-instrumentation-dbapi](./opentelemetry-instrumentation-dbapi) | dbapi | No
1616
| [opentelemetry-instrumentation-django](./opentelemetry-instrumentation-django) | django >= 1.10 | Yes

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def add(x, y):
6060
"""
6161

6262
import logging
63+
from timeit import default_timer
6364
from typing import Collection, Iterable
6465

6566
from celery import signals # pylint: disable=no-name-in-module
@@ -69,6 +70,7 @@ def add(x, y):
6970
from opentelemetry.instrumentation.celery.package import _instruments
7071
from opentelemetry.instrumentation.celery.version import __version__
7172
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
73+
from opentelemetry.metrics import get_meter
7274
from opentelemetry.propagate import extract, inject
7375
from opentelemetry.propagators.textmap import Getter
7476
from opentelemetry.semconv.trace import SpanAttributes
@@ -104,6 +106,11 @@ def keys(self, carrier):
104106

105107

106108
class CeleryInstrumentor(BaseInstrumentor):
109+
def __init__(self):
110+
super().__init__()
111+
self.metrics = None
112+
self.task_id_to_start_time = {}
113+
107114
def instrumentation_dependencies(self) -> Collection[str]:
108115
return _instruments
109116

@@ -113,6 +120,11 @@ def _instrument(self, **kwargs):
113120
# pylint: disable=attribute-defined-outside-init
114121
self._tracer = trace.get_tracer(__name__, __version__, tracer_provider)
115122

123+
meter_provider = kwargs.get("meter_provider")
124+
meter = get_meter(__name__, __version__, meter_provider)
125+
126+
self.create_celery_metrics(meter)
127+
116128
signals.task_prerun.connect(self._trace_prerun, weak=False)
117129
signals.task_postrun.connect(self._trace_postrun, weak=False)
118130
signals.before_task_publish.connect(
@@ -139,6 +151,7 @@ def _trace_prerun(self, *args, **kwargs):
139151
if task is None or task_id is None:
140152
return
141153

154+
self.update_task_start_time(task_id)
142155
request = task.request
143156
tracectx = extract(request, getter=celery_getter) or None
144157

@@ -153,8 +166,7 @@ def _trace_prerun(self, *args, **kwargs):
153166
activation.__enter__() # pylint: disable=E1101
154167
utils.attach_span(task, task_id, (span, activation))
155168

156-
@staticmethod
157-
def _trace_postrun(*args, **kwargs):
169+
def _trace_postrun(self, *args, **kwargs):
158170
task = utils.retrieve_task(kwargs)
159171
task_id = utils.retrieve_task_id(kwargs)
160172

@@ -178,6 +190,12 @@ def _trace_postrun(*args, **kwargs):
178190

179191
activation.__exit__(None, None, None)
180192
utils.detach_span(task, task_id)
193+
self.update_task_start_time(task_id)
194+
labels = {
195+
'task': task.name,
196+
'worker': task.request.hostname
197+
}
198+
self._record_histograms(task_id, labels)
181199

182200
def _trace_before_publish(self, *args, **kwargs):
183201
task = utils.retrieve_task_from_sender(kwargs)
@@ -270,3 +288,22 @@ def _trace_retry(*args, **kwargs):
270288
# Use `str(reason)` instead of `reason.message` in case we get
271289
# something that isn't an `Exception`
272290
span.set_attribute(_TASK_RETRY_REASON_KEY, str(reason))
291+
292+
def update_task_start_time(self, task_id):
293+
cur_time = default_timer()
294+
elapsed_time = cur_time - self.task_id_to_start_time[
295+
task_id] if task_id in self.task_id_to_start_time else cur_time
296+
self.task_id_to_start_time[task_id] = elapsed_time
297+
298+
def _record_histograms(self, task_id, metric_attributes):
299+
self.metrics["flower.task.runtime.seconds"].record(
300+
self.task_id_to_start_time.get(task_id), attributes=metric_attributes)
301+
302+
def create_celery_metrics(self, meter) -> None:
303+
self.metrics = {
304+
"flower.task.runtime.seconds": meter.create_histogram(
305+
name="flower.task.runtime.seconds",
306+
unit="seconds",
307+
description="The time it took to run the task.",
308+
)
309+
}

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/package.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414

1515

1616
_instruments = ("celery >= 4.0, < 6.0",)
17+
_supports_metrics = True
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import time
2+
import threading
3+
from timeit import default_timer
4+
from typing import Union, Optional
5+
6+
from opentelemetry.sdk.metrics._internal.point import Metric
7+
from opentelemetry.sdk.metrics.export import (
8+
HistogramDataPoint,
9+
NumberDataPoint,
10+
)
11+
from opentelemetry.test.test_base import TestBase
12+
from opentelemetry.instrumentation.celery import CeleryInstrumentor
13+
14+
from .celery_test_tasks import task_add, app
15+
16+
17+
class TestMetrics(TestBase):
18+
def setUp(self):
19+
super().setUp()
20+
self._worker = app.Worker(app=app, pool="solo", concurrency=1,
21+
hostname='celery@akochavi')
22+
self._thread = threading.Thread(target=self._worker.start)
23+
self._thread.daemon = True
24+
self._thread.start()
25+
26+
def tearDown(self):
27+
super().tearDown()
28+
self._worker.stop()
29+
self._thread.join()
30+
31+
def get_metrics(self):
32+
CeleryInstrumentor().instrument()
33+
result = task_add.delay(1, 2)
34+
35+
timeout = time.time() + 60 * 1 # 1 minutes from now
36+
while not result.ready():
37+
if time.time() > timeout:
38+
break
39+
time.sleep(0.05)
40+
CeleryInstrumentor().uninstrument()
41+
resource_metrics = (
42+
self.memory_metrics_reader.get_metrics_data().resource_metrics
43+
)
44+
45+
all_metrics = []
46+
for metrics in resource_metrics:
47+
for scope_metrics in metrics.scope_metrics:
48+
all_metrics.extend(scope_metrics.metrics)
49+
50+
return all_metrics
51+
52+
def assert_metric_expected(
53+
self,
54+
metric: Metric,
55+
expected_value: Union[int, float],
56+
expected_attributes: dict = None,
57+
est_delta: Optional[float] = None,
58+
):
59+
data_point = next(iter(metric.data.data_points))
60+
61+
if isinstance(data_point, HistogramDataPoint):
62+
self.assertEqual(
63+
data_point.count,
64+
1,
65+
)
66+
if est_delta is None:
67+
self.assertEqual(
68+
data_point.sum,
69+
expected_value,
70+
)
71+
else:
72+
self.assertAlmostEqual(
73+
data_point.sum,
74+
expected_value,
75+
delta=est_delta,
76+
)
77+
elif isinstance(data_point, NumberDataPoint):
78+
self.assertEqual(
79+
data_point.value,
80+
expected_value,
81+
)
82+
83+
if expected_attributes:
84+
self.assertDictEqual(
85+
expected_attributes,
86+
dict(data_point.attributes),
87+
)
88+
89+
def test_basic_metric(self):
90+
start_time = default_timer()
91+
task_runtime_estimated = (default_timer() - start_time) * 1000
92+
93+
metrics = self.get_metrics()
94+
self.assertEqual(len(metrics), 1)
95+
96+
task_runtime = metrics[0]
97+
print(task_runtime)
98+
self.assertEqual(
99+
task_runtime.name, "flower.task.runtime.seconds"
100+
)
101+
self.assert_metric_expected(
102+
task_runtime,
103+
task_runtime_estimated,
104+
{
105+
'task': 'tests.celery_test_tasks.task_add',
106+
'worker': 'celery@akochavi',
107+
},
108+
est_delta=200,
109+
)

0 commit comments

Comments
 (0)