Skip to content

Commit a02eb9c

Browse files
feat(celery): Set "messaging.system" on span
Set the "messaging.system" data on the "queue.process" span in the Celery integration. The messaging.system span data attribute should be set to the Celery broker being used, e.g. "amqp" for RabbitMQ, "redis" for Redis, and "sqs" for Amazon SQS. Also, add tests for this feature. ref #2951
1 parent 68332d8 commit a02eb9c

File tree

3 files changed

+29
-1
lines changed

3 files changed

+29
-1
lines changed

sentry_sdk/consts.py

+5
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,11 @@ class SPANDATA:
280280
Number of retries/attempts to process a message.
281281
"""
282282

283+
MESSAGING_SYSTEM = "messaging.system"
284+
"""
285+
The messaging system's name, e.g. `kafka`, `aws_sqs`
286+
"""
287+
283288
SERVER_ADDRESS = "server.address"
284289
"""
285290
Name of the database host.

sentry_sdk/integrations/celery/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,11 @@ def _inner(*args, **kwargs):
362362
span.set_data(
363363
SPANDATA.MESSAGING_MESSAGE_RETRY_COUNT, task.request.retries
364364
)
365+
with capture_internal_exceptions():
366+
span.set_data(
367+
SPANDATA.MESSAGING_SYSTEM,
368+
task.app.connection().transport.driver_type,
369+
)
365370

366371
return f(*args, **kwargs)
367372
except Exception:

tests/integrations/celery/test_celery.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def init_celery(sentry_init, request):
2828
def inner(propagate_traces=True, backend="always_eager", **kwargs):
2929
sentry_init(
3030
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
31-
**kwargs
31+
**kwargs,
3232
)
3333
celery = Celery(__name__)
3434

@@ -704,3 +704,21 @@ def task(): ...
704704
(event,) = events
705705
(span,) = event["spans"]
706706
assert span["data"]["messaging.message.retry.count"] == 3
707+
708+
709+
@pytest.mark.parametrize("system", ("redis", "amqp"))
710+
def test_messaging_system(system, init_celery, capture_events):
711+
celery = init_celery(enable_tracing=True)
712+
events = capture_events()
713+
714+
# Does not need to be a real URL, since we use always eager
715+
celery.conf.broker_url = f"{system}://example.com" # noqa: E231
716+
717+
@celery.task()
718+
def task(): ...
719+
720+
task.apply_async()
721+
722+
(event,) = events
723+
(span,) = event["spans"]
724+
assert span["data"]["messaging.system"] == system

0 commit comments

Comments
 (0)