@@ -39,8 +39,27 @@ def setup_sentry(transport=None):
39
39
)
40
40
41
41
42
+ def read_error_from_log (job_id ):
43
+ log_dir = "/tmp/ray/session_latest/logs/"
44
+ log_file = [
45
+ f
46
+ for f in os .listdir (log_dir )
47
+ if "worker" in f and job_id in f and f .endswith (".out" )
48
+ ][0 ]
49
+ with open (os .path .join (log_dir , log_file ), "r" ) as file :
50
+ lines = file .readlines ()
51
+
52
+ try :
53
+ # parse error object from log line
54
+ error = json .loads (lines [4 ][:- 1 ])
55
+ except IndexError :
56
+ error = None
57
+
58
+ return error
59
+
60
+
42
61
@pytest .mark .forked
43
- def test_ray_tracing ():
62
+ def test_tracing_in_ray_tasks ():
44
63
setup_sentry ()
45
64
46
65
ray .init (
@@ -50,6 +69,7 @@ def test_ray_tracing():
50
69
}
51
70
)
52
71
72
+ # Setup ray task
53
73
@ray .remote
54
74
def example_task ():
55
75
with sentry_sdk .start_span (op = "task" , name = "example task step" ):
@@ -62,63 +82,42 @@ def example_task():
62
82
63
83
client_envelope = sentry_sdk .get_client ().transport .envelopes [0 ]
64
84
client_transaction = client_envelope .get_transaction_event ()
85
+ assert client_transaction ["transaction" ] == "ray test transaction"
86
+ assert client_transaction ["transaction_info" ] == {"source" : "custom" }
87
+
65
88
worker_envelope = worker_envelopes [0 ]
66
89
worker_transaction = worker_envelope .get_transaction_event ()
67
-
68
90
assert (
69
- client_transaction [ "contexts" ][ "trace" ][ "trace_id " ]
70
- == client_transaction [ "contexts" ][ "trace" ][ "trace_id" ]
91
+ worker_transaction [ "transaction " ]
92
+ == "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks.<locals>.example_task"
71
93
)
94
+ assert worker_transaction ["transaction_info" ] == {"source" : "task" }
72
95
73
- for span in client_transaction ["spans" ]:
74
- assert (
75
- span ["trace_id" ]
76
- == client_transaction ["contexts" ]["trace" ]["trace_id" ]
77
- == client_transaction ["contexts" ]["trace" ]["trace_id" ]
78
- )
79
-
80
- for span in worker_transaction ["spans" ]:
81
- assert (
82
- span ["trace_id" ]
83
- == client_transaction ["contexts" ]["trace" ]["trace_id" ]
84
- == client_transaction ["contexts" ]["trace" ]["trace_id" ]
85
- )
86
-
87
-
88
- @pytest .mark .forked
89
- def test_ray_spans ():
90
- setup_sentry ()
91
-
92
- ray .init (
93
- runtime_env = {
94
- "worker_process_setup_hook" : setup_sentry ,
95
- "working_dir" : "./" ,
96
- }
96
+ (span ,) = client_transaction ["spans" ]
97
+ assert span ["op" ] == "queue.submit.ray"
98
+ assert span ["origin" ] == "auto.queue.ray"
99
+ assert (
100
+ span ["description" ]
101
+ == "tests.integrations.ray.test_ray.test_tracing_in_ray_tasks.<locals>.example_task"
97
102
)
103
+ assert span ["parent_span_id" ] == client_transaction ["contexts" ]["trace" ]["span_id" ]
104
+ assert span ["trace_id" ] == client_transaction ["contexts" ]["trace" ]["trace_id" ]
98
105
99
- @ray .remote
100
- def example_task ():
101
- return sentry_sdk .get_client ().transport .envelopes
106
+ (span ,) = worker_transaction ["spans" ]
107
+ assert span ["op" ] == "task"
108
+ assert span ["origin" ] == "manual"
109
+ assert span ["description" ] == "example task step"
110
+ assert span ["parent_span_id" ] == worker_transaction ["contexts" ]["trace" ]["span_id" ]
111
+ assert span ["trace_id" ] == worker_transaction ["contexts" ]["trace" ]["trace_id" ]
102
112
103
- with sentry_sdk .start_transaction (op = "task" , name = "ray test transaction" ):
104
- worker_envelopes = ray .get (example_task .remote ())
105
-
106
- client_envelope = sentry_sdk .get_client ().transport .envelopes [0 ]
107
- client_transaction = client_envelope .get_transaction_event ()
108
- worker_envelope = worker_envelopes [0 ]
109
- worker_transaction = worker_envelope .get_transaction_event ()
110
-
111
- for span in client_transaction ["spans" ]:
112
- assert span ["op" ] == "queue.submit.ray"
113
- assert span ["origin" ] == "auto.queue.ray"
114
-
115
- for span in worker_transaction ["spans" ]:
116
- assert span ["op" ] == "queue.task.ray"
117
- assert span ["origin" ] == "auto.queue.ray"
113
+ assert (
114
+ client_transaction ["contexts" ]["trace" ]["trace_id" ]
115
+ == worker_transaction ["contexts" ]["trace" ]["trace_id" ]
116
+ )
118
117
119
118
120
119
@pytest .mark .forked
121
- def test_ray_errors ():
120
+ def test_errors_in_ray_tasks ():
122
121
setup_sentry_with_logging_transport ()
123
122
124
123
ray .init (
@@ -128,6 +127,7 @@ def test_ray_errors():
128
127
}
129
128
)
130
129
130
+ # Setup ray task
131
131
@ray .remote
132
132
def example_task ():
133
133
1 / 0
@@ -138,30 +138,19 @@ def example_task():
138
138
ray .get (future )
139
139
140
140
job_id = future .job_id ().hex ()
141
-
142
- # Read the worker log output containing the error
143
- log_dir = "/tmp/ray/session_latest/logs/"
144
- log_file = [
145
- f
146
- for f in os .listdir (log_dir )
147
- if "worker" in f and job_id in f and f .endswith (".out" )
148
- ][0 ]
149
- with open (os .path .join (log_dir , log_file ), "r" ) as file :
150
- lines = file .readlines ()
151
- # parse error object from log line
152
- error = json .loads (lines [4 ][:- 1 ])
141
+ error = read_error_from_log (job_id )
153
142
154
143
assert error ["level" ] == "error"
155
144
assert (
156
145
error ["transaction" ]
157
- == "tests.integrations.ray.test_ray.test_ray_errors .<locals>.example_task"
158
- ) # its in the worker, not the client thus not "ray test transaction"
146
+ == "tests.integrations.ray.test_ray.test_errors_in_ray_tasks .<locals>.example_task"
147
+ )
159
148
assert error ["exception" ]["values" ][0 ]["mechanism" ]["type" ] == "ray"
160
149
assert not error ["exception" ]["values" ][0 ]["mechanism" ]["handled" ]
161
150
162
151
163
152
@pytest .mark .forked
164
- def test_ray_actor ():
153
+ def test_tracing_in_ray_actors ():
165
154
setup_sentry ()
166
155
167
156
ray .init (
@@ -171,13 +160,14 @@ def test_ray_actor():
171
160
}
172
161
)
173
162
163
+ # Setup ray actor
174
164
@ray .remote
175
165
class Counter :
176
166
def __init__ (self ):
177
167
self .n = 0
178
168
179
169
def increment (self ):
180
- with sentry_sdk .start_span (op = "task" , name = "example task step " ):
170
+ with sentry_sdk .start_span (op = "task" , name = "example actor execution " ):
181
171
self .n += 1
182
172
183
173
return sentry_sdk .get_client ().transport .envelopes
@@ -186,20 +176,47 @@ def increment(self):
186
176
counter = Counter .remote ()
187
177
worker_envelopes = ray .get (counter .increment .remote ())
188
178
189
- # Currently no transactions/spans are captured in actors
190
- assert worker_envelopes == []
191
-
192
179
client_envelope = sentry_sdk .get_client ().transport .envelopes [0 ]
193
180
client_transaction = client_envelope .get_transaction_event ()
194
181
195
- assert (
196
- client_transaction ["contexts" ]["trace" ]["trace_id" ]
197
- == client_transaction ["contexts" ]["trace" ]["trace_id" ]
182
+ # Spans for submitting the actor task are not created (actors are not supported yet)
183
+ assert client_transaction ["spans" ] == []
184
+
185
+ # Transaction are not yet created when executing ray actors (actors are not supported yet)
186
+ assert worker_envelopes == []
187
+
188
+
189
+ @pytest .mark .forked
190
+ def test_errors_in_ray_actors ():
191
+ setup_sentry_with_logging_transport ()
192
+
193
+ ray .init (
194
+ runtime_env = {
195
+ "worker_process_setup_hook" : setup_sentry_with_logging_transport ,
196
+ "working_dir" : "./" ,
197
+ }
198
198
)
199
199
200
- for span in client_transaction ["spans" ]:
201
- assert (
202
- span ["trace_id" ]
203
- == client_transaction ["contexts" ]["trace" ]["trace_id" ]
204
- == client_transaction ["contexts" ]["trace" ]["trace_id" ]
205
- )
200
+ # Setup ray actor
201
+ @ray .remote
202
+ class Counter :
203
+ def __init__ (self ):
204
+ self .n = 0
205
+
206
+ def increment (self ):
207
+ with sentry_sdk .start_span (op = "task" , name = "example actor execution" ):
208
+ 1 / 0
209
+
210
+ return sentry_sdk .get_client ().transport .envelopes
211
+
212
+ with sentry_sdk .start_transaction (op = "task" , name = "ray test transaction" ):
213
+ with pytest .raises (ZeroDivisionError ):
214
+ counter = Counter .remote ()
215
+ future = counter .increment .remote ()
216
+ ray .get (future )
217
+
218
+ job_id = future .job_id ().hex ()
219
+ error = read_error_from_log (job_id )
220
+
221
+ # We do not capture errors in ray actors yet
222
+ assert error is None
0 commit comments