Skip to content

Commit 72ac630

Browse files
feat(celery): Send queue name to Sentry
Send the queue name to Sentry for Celery tasks using the default exchange. The queue name is sent as span data with the key `messaging.destination.name` within a new span op named "queue.process". Also, add tests for the new behavior. Ref GH-2961
1 parent cc11c0f commit 72ac630

File tree

3 files changed

+83
-5
lines changed

3 files changed

+83
-5
lines changed

sentry_sdk/consts.py

+7
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,12 @@ class SPANDATA:
264264
Example: 418
265265
"""
266266

267+
MESSAGING_DESTINATION_NAME = "messaging.destination.name"
268+
"""
269+
The destination name where the message is being consumed from,
270+
e.g. the queue name or topic.
271+
"""
272+
267273
SERVER_ADDRESS = "server.address"
268274
"""
269275
Name of the database host.
@@ -366,6 +372,7 @@ class OP:
366372
LANGCHAIN_TOOL = "ai.tool.langchain"
367373
LANGCHAIN_AGENT = "ai.agent.langchain"
368374
LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain"
375+
QUEUE_PROCESS = "queue.process"
369376
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
370377
QUEUE_TASK_ARQ = "queue.task.arq"
371378
QUEUE_SUBMIT_CELERY = "queue.submit.celery"

sentry_sdk/integrations/celery/__init__.py

+22-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import sentry_sdk
55
from sentry_sdk import isolation_scope
66
from sentry_sdk.api import continue_trace
7-
from sentry_sdk.consts import OP
7+
from sentry_sdk.consts import OP, SPANDATA
88
from sentry_sdk.integrations import Integration, DidNotEnable
99
from sentry_sdk.integrations.celery.beat import (
1010
_patch_beat_apply_entry,
@@ -325,20 +325,38 @@ def _inner(*args, **kwargs):
325325
return _inner # type: ignore
326326

327327

328+
def _set_messaging_destination_name(task, span):
329+
# type: (Any, Span) -> None
330+
"""Set "messaging.destination.name" tag for span"""
331+
with capture_internal_exceptions():
332+
delivery_info = task.request.delivery_info
333+
routing_key = delivery_info.get("routing_key")
334+
if delivery_info.get("exchange") == "" and routing_key is not None:
335+
# Empty exchange indicates the default exchange, meaning the tasks
336+
# are sent to the queue with the same name as the routing key.
337+
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)
338+
339+
328340
def _wrap_task_call(task, f):
329341
# type: (Any, F) -> F
330342

331343
# Need to wrap task call because the exception is caught before we get to
332344
# see it. Also celery's reported stacktrace is untrustworthy.
333345

334346
# functools.wraps is important here because celery-once looks at this
335-
# method's name.
347+
# method's name. @ensure_integration_enabled internally calls functools.wraps,
348+
# but if we ever remove the @ensure_integration_enabled decorator, we need
349+
# to add @functools.wraps(f) here.
336350
# https://github.com/getsentry/sentry-python/issues/421
337-
@wraps(f)
351+
@ensure_integration_enabled(CeleryIntegration, f)
338352
def _inner(*args, **kwargs):
339353
# type: (*Any, **Any) -> Any
340354
try:
341-
return f(*args, **kwargs)
355+
with sentry_sdk.start_span(
356+
op=OP.QUEUE_PROCESS, description=task.name
357+
) as span:
358+
_set_messaging_destination_name(task, span)
359+
return f(*args, **kwargs)
342360
except Exception:
343361
exc_info = sys.exc_info()
344362
with capture_internal_exceptions():

tests/integrations/celery/test_celery.py

+54-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,17 @@ def dummy_task(x, y):
209209
else:
210210
assert execution_event["contexts"]["trace"]["status"] == "ok"
211211

212-
assert execution_event["spans"] == []
212+
assert len(execution_event["spans"]) == 1
213+
assert (
214+
execution_event["spans"][0].items()
215+
>= {
216+
"trace_id": str(transaction.trace_id),
217+
"same_process_as_parent": True,
218+
"op": "queue.process",
219+
"description": "dummy_task",
220+
"data": ApproxDict(),
221+
}.items()
222+
)
213223
assert submission_event["spans"] == [
214224
{
215225
"data": ApproxDict(),
@@ -606,3 +616,46 @@ def example_task():
606616
pytest.fail("Calling `apply_async` without arguments raised a TypeError")
607617

608618
assert result.get() == "success"
619+
620+
621+
@pytest.mark.parametrize("routing_key", ("celery", "custom"))
622+
@mock.patch("celery.app.task.Task.request")
623+
def test_messaging_destination_name_default_exchange(
624+
mock_request, routing_key, init_celery, capture_events
625+
):
626+
celery_app = init_celery(enable_tracing=True)
627+
events = capture_events()
628+
mock_request.delivery_info = {"routing_key": routing_key, "exchange": ""}
629+
630+
@celery_app.task()
631+
def task(): ...
632+
633+
task.apply_async()
634+
635+
(event,) = events
636+
(span,) = event["spans"]
637+
assert span["data"]["messaging.destination.name"] == routing_key
638+
639+
640+
@mock.patch("celery.app.task.Task.request")
641+
def test_messaging_destination_name_nondefault_exchange(
642+
mock_request, init_celery, capture_events
643+
):
644+
"""
645+
Currently, we only capture the routing key as the messaging.destination.name when
646+
we are using the default exchange (""). This is because the default exchange ensures
647+
that the routing key is the queue name. Other exchanges may not guarantee this
648+
behavior.
649+
"""
650+
celery_app = init_celery(enable_tracing=True)
651+
events = capture_events()
652+
mock_request.delivery_info = {"routing_key": "celery", "exchange": "custom"}
653+
654+
@celery_app.task()
655+
def task(): ...
656+
657+
task.apply_async()
658+
659+
(event,) = events
660+
(span,) = event["spans"]
661+
assert "messaging.destination.name" not in span["data"]

0 commit comments

Comments
 (0)