Skip to content

Commit 702d536

Browse files
committed
Address review feedback
Use the VERSION attribute exposed by Billiard to decide whether to import ExceptionWithTraceback. Add a test for a failing task and check that the exceptions' type and message are preserved.
1 parent 6166d1f commit 702d536

File tree

4 files changed

+91
-9
lines changed

4 files changed

+91
-9
lines changed

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,6 @@ def add(x, y):
6666
from billiard.einfo import ExceptionInfo
6767
from celery import signals # pylint: disable=no-name-in-module
6868

69-
try:
70-
from billiard.einfo import ( # pylint: disable=no-name-in-module
71-
ExceptionWithTraceback,
72-
)
73-
except ImportError:
74-
ExceptionWithTraceback = None
75-
7669
from opentelemetry import trace
7770
from opentelemetry.instrumentation.celery import utils
7871
from opentelemetry.instrumentation.celery.package import _instruments
@@ -84,6 +77,8 @@ def add(x, y):
8477
from opentelemetry.semconv.trace import SpanAttributes
8578
from opentelemetry.trace.status import Status, StatusCode
8679

80+
ExceptionWithTraceback = utils.import_exception_with_traceback()
81+
8782
logger = logging.getLogger(__name__)
8883

8984
# Task operations

instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import logging
1616

1717
from celery import registry # pylint: disable=no-name-in-module
18+
from billiard import VERSION
1819

1920
from opentelemetry.semconv.trace import SpanAttributes
2021

@@ -47,6 +48,15 @@
4748
)
4849

4950

51+
def import_exception_with_traceback():
52+
if VERSION >= (4, 0, 1):
53+
from billiard.einfo import ExceptionWithTraceback
54+
55+
return ExceptionWithTraceback
56+
57+
return None
58+
59+
5060
# pylint:disable=too-many-branches
5161
def set_attributes_from_context(span, context):
5262
"""Helper to extract meta values from a Celery Context"""

instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,15 @@ class Config:
2424
app.config_from_object(Config)
2525

2626

27+
class CustomError(Exception):
28+
pass
29+
30+
2731
@app.task
2832
def task_add(num_a, num_b):
2933
return num_a + num_b
34+
35+
36+
@app.task
37+
def task_raises():
38+
raise CustomError("The task failed!")

instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
from opentelemetry.instrumentation.celery import CeleryInstrumentor
1919
from opentelemetry.semconv.trace import SpanAttributes
2020
from opentelemetry.test.test_base import TestBase
21-
from opentelemetry.trace import SpanKind
21+
from opentelemetry.trace import SpanKind, StatusCode
2222

23-
from .celery_test_tasks import app, task_add
23+
from .celery_test_tasks import app, task_add, task_raises
2424

2525

2626
class TestCeleryInstrumentation(TestBase):
@@ -66,6 +66,10 @@ def test_task(self):
6666
},
6767
)
6868

69+
self.assertEqual(consumer.status.status_code, StatusCode.UNSET)
70+
71+
self.assertEqual(0, len(consumer.events))
72+
6973
self.assertEqual(
7074
producer.name, "apply_async/tests.celery_test_tasks.task_add"
7175
)
@@ -84,6 +88,70 @@ def test_task(self):
8488
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
8589
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)
8690

91+
def test_task_raises(self):
92+
CeleryInstrumentor().instrument()
93+
94+
result = task_raises.delay()
95+
96+
timeout = time.time() + 60 * 1 # 1 minutes from now
97+
while not result.ready():
98+
if time.time() > timeout:
99+
break
100+
time.sleep(0.05)
101+
102+
spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
103+
self.assertEqual(len(spans), 2)
104+
105+
consumer, producer = spans
106+
107+
self.assertEqual(
108+
consumer.name, "run/tests.celery_test_tasks.task_raises"
109+
)
110+
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
111+
self.assertSpanHasAttributes(
112+
consumer,
113+
{
114+
"celery.action": "run",
115+
"celery.state": "FAILURE",
116+
SpanAttributes.MESSAGING_DESTINATION: "celery",
117+
"celery.task_name": "tests.celery_test_tasks.task_raises",
118+
},
119+
)
120+
121+
self.assertEqual(consumer.status.status_code, StatusCode.ERROR)
122+
123+
self.assertEqual(1, len(consumer.events))
124+
event = consumer.events[0]
125+
126+
self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes)
127+
128+
self.assertEqual(
129+
event.attributes[SpanAttributes.EXCEPTION_TYPE], "CustomError"
130+
)
131+
132+
self.assertEqual(
133+
event.attributes[SpanAttributes.EXCEPTION_MESSAGE],
134+
"The task failed!",
135+
)
136+
137+
self.assertEqual(
138+
producer.name, "apply_async/tests.celery_test_tasks.task_raises"
139+
)
140+
self.assertEqual(producer.kind, SpanKind.PRODUCER)
141+
self.assertSpanHasAttributes(
142+
producer,
143+
{
144+
"celery.action": "apply_async",
145+
"celery.task_name": "tests.celery_test_tasks.task_raises",
146+
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
147+
SpanAttributes.MESSAGING_DESTINATION: "celery",
148+
},
149+
)
150+
151+
self.assertNotEqual(consumer.parent, producer.context)
152+
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
153+
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)
154+
87155
def test_uninstrument(self):
88156
CeleryInstrumentor().instrument()
89157
CeleryInstrumentor().uninstrument()

0 commit comments

Comments
 (0)