Skip to content

Commit 17a6cf0

Browse files
glowskirantonpirkersentrivana
authored
feat: Add ray integration support (#2400) (#2444)
Adds a basic instrumentation for the Ray framework (https://www.ray.io/) Closes #2400 ---- Co-authored-by: Anton Pirker <[email protected]> Co-authored-by: Ivana Kellyer <[email protected]>
1 parent 4858996 commit 17a6cf0

File tree

7 files changed

+374
-0
lines changed

7 files changed

+374
-0
lines changed

.github/workflows/test-integrations-data-processing.yml

+8
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ jobs:
6767
run: |
6868
set -x # print commands that are executed
6969
./scripts/runtox.sh "py${{ matrix.python-version }}-huey-latest"
70+
- name: Test ray latest
71+
run: |
72+
set -x # print commands that are executed
73+
./scripts/runtox.sh "py${{ matrix.python-version }}-ray-latest"
7074
- name: Test rq latest
7175
run: |
7276
set -x # print commands that are executed
@@ -139,6 +143,10 @@ jobs:
139143
run: |
140144
set -x # print commands that are executed
141145
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-huey"
146+
- name: Test ray pinned
147+
run: |
148+
set -x # print commands that are executed
149+
./scripts/runtox.sh --exclude-latest "py${{ matrix.python-version }}-ray"
142150
- name: Test rq pinned
143151
run: |
144152
set -x # print commands that are executed

scripts/split-tox-gh-actions/split-tox-gh-actions.py

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
"celery",
8383
"dramatiq",
8484
"huey",
85+
"ray",
8586
"rq",
8687
"spark",
8788
],

sentry_sdk/consts.py

+2
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,8 @@ class OP:
465465
QUEUE_TASK_RQ = "queue.task.rq"
466466
QUEUE_SUBMIT_HUEY = "queue.submit.huey"
467467
QUEUE_TASK_HUEY = "queue.task.huey"
468+
QUEUE_SUBMIT_RAY = "queue.submit.ray"
469+
QUEUE_TASK_RAY = "queue.task.ray"
468470
SUBPROCESS = "subprocess"
469471
SUBPROCESS_WAIT = "subprocess.wait"
470472
SUBPROCESS_COMMUNICATE = "subprocess.communicate"

sentry_sdk/integrations/ray.py

+146
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import inspect
2+
import sys
3+
4+
import sentry_sdk
5+
from sentry_sdk.consts import OP, SPANSTATUS
6+
from sentry_sdk.integrations import DidNotEnable, Integration
7+
from sentry_sdk.tracing import TRANSACTION_SOURCE_TASK
8+
from sentry_sdk.utils import (
9+
event_from_exception,
10+
logger,
11+
package_version,
12+
qualname_from_function,
13+
reraise,
14+
)
15+
16+
try:
17+
import ray # type: ignore[import-not-found]
18+
except ImportError:
19+
raise DidNotEnable("Ray not installed.")
20+
import functools
21+
22+
from typing import TYPE_CHECKING
23+
24+
if TYPE_CHECKING:
25+
from collections.abc import Callable
26+
from typing import Any, Optional
27+
from sentry_sdk.utils import ExcInfo
28+
29+
30+
def _check_sentry_initialized():
31+
# type: () -> None
32+
if sentry_sdk.get_client().is_active():
33+
return
34+
35+
logger.debug(
36+
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
37+
)
38+
39+
40+
def _patch_ray_remote():
41+
# type: () -> None
42+
old_remote = ray.remote
43+
44+
@functools.wraps(old_remote)
45+
def new_remote(f, *args, **kwargs):
46+
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
47+
if inspect.isclass(f):
48+
# Ray Actors
49+
# (https://docs.ray.io/en/latest/ray-core/actors.html)
50+
# are not supported
51+
# (Only Ray Tasks are supported)
52+
return old_remote(f, *args, *kwargs)
53+
54+
def _f(*f_args, _tracing=None, **f_kwargs):
55+
# type: (Any, Optional[dict[str, Any]], Any) -> Any
56+
"""
57+
Ray Worker
58+
"""
59+
_check_sentry_initialized()
60+
61+
transaction = sentry_sdk.continue_trace(
62+
_tracing or {},
63+
op=OP.QUEUE_TASK_RAY,
64+
name=qualname_from_function(f),
65+
origin=RayIntegration.origin,
66+
source=TRANSACTION_SOURCE_TASK,
67+
)
68+
69+
with sentry_sdk.start_transaction(transaction) as transaction:
70+
try:
71+
result = f(*f_args, **f_kwargs)
72+
transaction.set_status(SPANSTATUS.OK)
73+
except Exception:
74+
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
75+
exc_info = sys.exc_info()
76+
_capture_exception(exc_info)
77+
reraise(*exc_info)
78+
79+
return result
80+
81+
rv = old_remote(_f, *args, *kwargs)
82+
old_remote_method = rv.remote
83+
84+
def _remote_method_with_header_propagation(*args, **kwargs):
85+
# type: (*Any, **Any) -> Any
86+
"""
87+
Ray Client
88+
"""
89+
with sentry_sdk.start_span(
90+
op=OP.QUEUE_SUBMIT_RAY,
91+
description=qualname_from_function(f),
92+
origin=RayIntegration.origin,
93+
) as span:
94+
tracing = {
95+
k: v
96+
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
97+
}
98+
try:
99+
result = old_remote_method(*args, **kwargs, _tracing=tracing)
100+
span.set_status(SPANSTATUS.OK)
101+
except Exception:
102+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
103+
exc_info = sys.exc_info()
104+
_capture_exception(exc_info)
105+
reraise(*exc_info)
106+
107+
return result
108+
109+
rv.remote = _remote_method_with_header_propagation
110+
111+
return rv
112+
113+
ray.remote = new_remote
114+
115+
116+
def _capture_exception(exc_info, **kwargs):
117+
# type: (ExcInfo, **Any) -> None
118+
client = sentry_sdk.get_client()
119+
120+
event, hint = event_from_exception(
121+
exc_info,
122+
client_options=client.options,
123+
mechanism={
124+
"handled": False,
125+
"type": RayIntegration.identifier,
126+
},
127+
)
128+
sentry_sdk.capture_event(event, hint=hint)
129+
130+
131+
class RayIntegration(Integration):
132+
identifier = "ray"
133+
origin = f"auto.queue.{identifier}"
134+
135+
@staticmethod
136+
def setup_once():
137+
# type: () -> None
138+
version = package_version("ray")
139+
140+
if version is None:
141+
raise DidNotEnable("Unparsable ray version: {}".format(version))
142+
143+
if version < (2, 7, 0):
144+
raise DidNotEnable("Ray 2.7.0 or newer required")
145+
146+
_patch_ray_remote()

tests/integrations/ray/__init__.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import pytest
2+
3+
pytest.importorskip("ray")

tests/integrations/ray/test_ray.py

+205
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import json
2+
import os
3+
import pytest
4+
5+
import ray
6+
7+
import sentry_sdk
8+
from sentry_sdk.envelope import Envelope
9+
from sentry_sdk.integrations.ray import RayIntegration
10+
from tests.conftest import TestTransport
11+
12+
13+
class RayTestTransport(TestTransport):
14+
def __init__(self):
15+
self.envelopes = []
16+
super().__init__()
17+
18+
def capture_envelope(self, envelope: Envelope) -> None:
19+
self.envelopes.append(envelope)
20+
21+
22+
class RayLoggingTransport(TestTransport):
23+
def __init__(self):
24+
super().__init__()
25+
26+
def capture_envelope(self, envelope: Envelope) -> None:
27+
print(envelope.serialize().decode("utf-8", "replace"))
28+
29+
30+
def setup_sentry_with_logging_transport():
31+
setup_sentry(transport=RayLoggingTransport())
32+
33+
34+
def setup_sentry(transport=None):
35+
sentry_sdk.init(
36+
integrations=[RayIntegration()],
37+
transport=RayTestTransport() if transport is None else transport,
38+
traces_sample_rate=1.0,
39+
)
40+
41+
42+
@pytest.mark.forked
43+
def test_ray_tracing():
44+
setup_sentry()
45+
46+
ray.init(
47+
runtime_env={
48+
"worker_process_setup_hook": setup_sentry,
49+
"working_dir": "./",
50+
}
51+
)
52+
53+
@ray.remote
54+
def example_task():
55+
with sentry_sdk.start_span(op="task", description="example task step"):
56+
...
57+
58+
return sentry_sdk.get_client().transport.envelopes
59+
60+
with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
61+
worker_envelopes = ray.get(example_task.remote())
62+
63+
client_envelope = sentry_sdk.get_client().transport.envelopes[0]
64+
client_transaction = client_envelope.get_transaction_event()
65+
worker_envelope = worker_envelopes[0]
66+
worker_transaction = worker_envelope.get_transaction_event()
67+
68+
assert (
69+
client_transaction["contexts"]["trace"]["trace_id"]
70+
== client_transaction["contexts"]["trace"]["trace_id"]
71+
)
72+
73+
for span in client_transaction["spans"]:
74+
assert (
75+
span["trace_id"]
76+
== client_transaction["contexts"]["trace"]["trace_id"]
77+
== client_transaction["contexts"]["trace"]["trace_id"]
78+
)
79+
80+
for span in worker_transaction["spans"]:
81+
assert (
82+
span["trace_id"]
83+
== client_transaction["contexts"]["trace"]["trace_id"]
84+
== client_transaction["contexts"]["trace"]["trace_id"]
85+
)
86+
87+
88+
@pytest.mark.forked
89+
def test_ray_spans():
90+
setup_sentry()
91+
92+
ray.init(
93+
runtime_env={
94+
"worker_process_setup_hook": setup_sentry,
95+
"working_dir": "./",
96+
}
97+
)
98+
99+
@ray.remote
100+
def example_task():
101+
return sentry_sdk.get_client().transport.envelopes
102+
103+
with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
104+
worker_envelopes = ray.get(example_task.remote())
105+
106+
client_envelope = sentry_sdk.get_client().transport.envelopes[0]
107+
client_transaction = client_envelope.get_transaction_event()
108+
worker_envelope = worker_envelopes[0]
109+
worker_transaction = worker_envelope.get_transaction_event()
110+
111+
for span in client_transaction["spans"]:
112+
assert span["op"] == "queue.submit.ray"
113+
assert span["origin"] == "auto.queue.ray"
114+
115+
for span in worker_transaction["spans"]:
116+
assert span["op"] == "queue.task.ray"
117+
assert span["origin"] == "auto.queue.ray"
118+
119+
120+
@pytest.mark.forked
121+
def test_ray_errors():
122+
setup_sentry_with_logging_transport()
123+
124+
ray.init(
125+
runtime_env={
126+
"worker_process_setup_hook": setup_sentry_with_logging_transport,
127+
"working_dir": "./",
128+
}
129+
)
130+
131+
@ray.remote
132+
def example_task():
133+
1 / 0
134+
135+
with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
136+
with pytest.raises(ZeroDivisionError):
137+
future = example_task.remote()
138+
ray.get(future)
139+
140+
job_id = future.job_id().hex()
141+
142+
# Read the worker log output containing the error
143+
log_dir = "/tmp/ray/session_latest/logs/"
144+
log_file = [
145+
f
146+
for f in os.listdir(log_dir)
147+
if "worker" in f and job_id in f and f.endswith(".out")
148+
][0]
149+
with open(os.path.join(log_dir, log_file), "r") as file:
150+
lines = file.readlines()
151+
# parse error object from log line
152+
error = json.loads(lines[4][:-1])
153+
154+
assert error["level"] == "error"
155+
assert (
156+
error["transaction"]
157+
== "tests.integrations.ray.test_ray.test_ray_errors.<locals>.example_task"
158+
) # its in the worker, not the client thus not "ray test transaction"
159+
assert error["exception"]["values"][0]["mechanism"]["type"] == "ray"
160+
assert not error["exception"]["values"][0]["mechanism"]["handled"]
161+
162+
163+
@pytest.mark.forked
164+
def test_ray_actor():
165+
setup_sentry()
166+
167+
ray.init(
168+
runtime_env={
169+
"worker_process_setup_hook": setup_sentry,
170+
"working_dir": "./",
171+
}
172+
)
173+
174+
@ray.remote
175+
class Counter(object):
176+
def __init__(self):
177+
self.n = 0
178+
179+
def increment(self):
180+
with sentry_sdk.start_span(op="task", description="example task step"):
181+
self.n += 1
182+
183+
return sentry_sdk.get_client().transport.envelopes
184+
185+
with sentry_sdk.start_transaction(op="task", name="ray test transaction"):
186+
counter = Counter.remote()
187+
worker_envelopes = ray.get(counter.increment.remote())
188+
189+
# Currently no transactions/spans are captured in actors
190+
assert worker_envelopes == []
191+
192+
client_envelope = sentry_sdk.get_client().transport.envelopes[0]
193+
client_transaction = client_envelope.get_transaction_event()
194+
195+
assert (
196+
client_transaction["contexts"]["trace"]["trace_id"]
197+
== client_transaction["contexts"]["trace"]["trace_id"]
198+
)
199+
200+
for span in client_transaction["spans"]:
201+
assert (
202+
span["trace_id"]
203+
== client_transaction["contexts"]["trace"]["trace_id"]
204+
== client_transaction["contexts"]["trace"]["trace_id"]
205+
)

0 commit comments

Comments
 (0)