|
| 1 | +import json |
| 2 | + |
| 3 | +import sentry_sdk |
| 4 | +from sentry_sdk.integrations import Integration |
| 5 | +from sentry_sdk._types import TYPE_CHECKING |
| 6 | +from sentry_sdk.integrations._wsgi_common import request_body_within_bounds |
| 7 | +from sentry_sdk.utils import ( |
| 8 | + AnnotatedValue, |
| 9 | + capture_internal_exceptions, |
| 10 | + event_from_exception, |
| 11 | +) |
| 12 | + |
| 13 | +from dramatiq.broker import Broker # type: ignore |
| 14 | +from dramatiq.message import Message # type: ignore |
| 15 | +from dramatiq.middleware import Middleware, default_middleware # type: ignore |
| 16 | +from dramatiq.errors import Retry # type: ignore |
| 17 | + |
| 18 | +if TYPE_CHECKING: |
| 19 | + from typing import Any, Callable, Dict, Optional, Union |
| 20 | + from sentry_sdk._types import Event, Hint |
| 21 | + |
| 22 | + |
| 23 | +class DramatiqIntegration(Integration): |
| 24 | + """ |
| 25 | + Dramatiq integration for Sentry |
| 26 | +
|
| 27 | + Please make sure that you call `sentry_sdk.init` *before* initializing |
| 28 | + your broker, as it monkey patches `Broker.__init__`. |
| 29 | +
|
| 30 | + This integration was originally developed and maintained |
| 31 | + by https://github.com/jacobsvante and later donated to the Sentry |
| 32 | + project. |
| 33 | + """ |
| 34 | + |
| 35 | + identifier = "dramatiq" |
| 36 | + |
| 37 | + @staticmethod |
| 38 | + def setup_once(): |
| 39 | + # type: () -> None |
| 40 | + _patch_dramatiq_broker() |
| 41 | + |
| 42 | + |
| 43 | +def _patch_dramatiq_broker(): |
| 44 | + # type: () -> None |
| 45 | + original_broker__init__ = Broker.__init__ |
| 46 | + |
| 47 | + def sentry_patched_broker__init__(self, *args, **kw): |
| 48 | + # type: (Broker, *Any, **Any) -> None |
| 49 | + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) |
| 50 | + |
| 51 | + try: |
| 52 | + middleware = kw.pop("middleware") |
| 53 | + except KeyError: |
| 54 | + # Unfortunately Broker and StubBroker allows middleware to be |
| 55 | + # passed in as positional arguments, whilst RabbitmqBroker and |
| 56 | + # RedisBroker does not. |
| 57 | + if len(args) == 1: |
| 58 | + middleware = args[0] |
| 59 | + args = [] # type: ignore |
| 60 | + else: |
| 61 | + middleware = None |
| 62 | + |
| 63 | + if middleware is None: |
| 64 | + middleware = list(m() for m in default_middleware) |
| 65 | + else: |
| 66 | + middleware = list(middleware) |
| 67 | + |
| 68 | + if integration is not None: |
| 69 | + middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)] |
| 70 | + middleware.insert(0, SentryMiddleware()) |
| 71 | + |
| 72 | + kw["middleware"] = middleware |
| 73 | + original_broker__init__(self, *args, **kw) |
| 74 | + |
| 75 | + Broker.__init__ = sentry_patched_broker__init__ |
| 76 | + |
| 77 | + |
| 78 | +class SentryMiddleware(Middleware): # type: ignore[misc] |
| 79 | + """ |
| 80 | + A Dramatiq middleware that automatically captures and sends |
| 81 | + exceptions to Sentry. |
| 82 | +
|
| 83 | + This is automatically added to every instantiated broker via the |
| 84 | + DramatiqIntegration. |
| 85 | + """ |
| 86 | + |
| 87 | + def before_process_message(self, broker, message): |
| 88 | + # type: (Broker, Message) -> None |
| 89 | + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) |
| 90 | + if integration is None: |
| 91 | + return |
| 92 | + |
| 93 | + message._scope_manager = sentry_sdk.new_scope() |
| 94 | + message._scope_manager.__enter__() |
| 95 | + |
| 96 | + scope = sentry_sdk.get_current_scope() |
| 97 | + scope.transaction = message.actor_name |
| 98 | + scope.set_extra("dramatiq_message_id", message.message_id) |
| 99 | + scope.add_event_processor(_make_message_event_processor(message, integration)) |
| 100 | + |
| 101 | + def after_process_message(self, broker, message, *, result=None, exception=None): |
| 102 | + # type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None |
| 103 | + integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) |
| 104 | + if integration is None: |
| 105 | + return |
| 106 | + |
| 107 | + actor = broker.get_actor(message.actor_name) |
| 108 | + throws = message.options.get("throws") or actor.options.get("throws") |
| 109 | + |
| 110 | + try: |
| 111 | + if ( |
| 112 | + exception is not None |
| 113 | + and not (throws and isinstance(exception, throws)) |
| 114 | + and not isinstance(exception, Retry) |
| 115 | + ): |
| 116 | + event, hint = event_from_exception( |
| 117 | + exception, |
| 118 | + client_options=sentry_sdk.get_client().options, |
| 119 | + mechanism={ |
| 120 | + "type": DramatiqIntegration.identifier, |
| 121 | + "handled": False, |
| 122 | + }, |
| 123 | + ) |
| 124 | + sentry_sdk.capture_event(event, hint=hint) |
| 125 | + finally: |
| 126 | + message._scope_manager.__exit__(None, None, None) |
| 127 | + |
| 128 | + |
| 129 | +def _make_message_event_processor(message, integration): |
| 130 | + # type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] |
| 131 | + |
| 132 | + def inner(event, hint): |
| 133 | + # type: (Event, Hint) -> Optional[Event] |
| 134 | + with capture_internal_exceptions(): |
| 135 | + DramatiqMessageExtractor(message).extract_into_event(event) |
| 136 | + |
| 137 | + return event |
| 138 | + |
| 139 | + return inner |
| 140 | + |
| 141 | + |
| 142 | +class DramatiqMessageExtractor(object): |
| 143 | + def __init__(self, message): |
| 144 | + # type: (Message) -> None |
| 145 | + self.message_data = dict(message.asdict()) |
| 146 | + |
| 147 | + def content_length(self): |
| 148 | + # type: () -> int |
| 149 | + return len(json.dumps(self.message_data)) |
| 150 | + |
| 151 | + def extract_into_event(self, event): |
| 152 | + # type: (Event) -> None |
| 153 | + client = sentry_sdk.get_client() |
| 154 | + if not client.is_active(): |
| 155 | + return |
| 156 | + |
| 157 | + contexts = event.setdefault("contexts", {}) |
| 158 | + request_info = contexts.setdefault("dramatiq", {}) |
| 159 | + request_info["type"] = "dramatiq" |
| 160 | + |
| 161 | + data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]] |
| 162 | + if not request_body_within_bounds(client, self.content_length()): |
| 163 | + data = AnnotatedValue.removed_because_over_size_limit() |
| 164 | + else: |
| 165 | + data = self.message_data |
| 166 | + |
| 167 | + request_info["data"] = data |
0 commit comments