Skip to content

Commit c254ba4

Browse files
feat(ourlogs): Add a class which batches groups of logs together. (#4229)
Currently, sentry logs create a new envelope per-log, which is inefficient. This changes the behavior to batch a large chunk of logs to be sent all at once. Fixes #4155 Fixes #4225 Fixes #4152 --------- Co-authored-by: Anton Pirker <[email protected]>
1 parent 438ee01 commit c254ba4

File tree

8 files changed

+397
-184
lines changed

8 files changed

+397
-184
lines changed

sentry_sdk/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"start_transaction",
4646
"trace",
4747
"monitor",
48-
"_experimental_logger",
48+
"logger",
4949
]
5050

5151
# Initialize the debug support after everything is loaded

sentry_sdk/_log_batcher.py

+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import os
2+
import random
3+
import threading
4+
from datetime import datetime, timezone
5+
from typing import Optional, List, Callable, TYPE_CHECKING, Any
6+
7+
from sentry_sdk.utils import format_timestamp, safe_repr
8+
from sentry_sdk.envelope import Envelope
9+
10+
if TYPE_CHECKING:
11+
from sentry_sdk._types import Log
12+
13+
14+
class LogBatcher:
15+
MAX_LOGS_BEFORE_FLUSH = 100
16+
FLUSH_WAIT_TIME = 5.0
17+
18+
def __init__(
19+
self,
20+
capture_func, # type: Callable[[Envelope], None]
21+
):
22+
# type: (...) -> None
23+
self._log_buffer = [] # type: List[Log]
24+
self._capture_func = capture_func
25+
self._running = True
26+
self._lock = threading.Lock()
27+
28+
self._flush_event = threading.Event() # type: threading.Event
29+
30+
self._flusher = None # type: Optional[threading.Thread]
31+
self._flusher_pid = None # type: Optional[int]
32+
33+
def _ensure_thread(self):
34+
# type: (...) -> bool
35+
"""For forking processes we might need to restart this thread.
36+
This ensures that our process actually has that thread running.
37+
"""
38+
if not self._running:
39+
return False
40+
41+
pid = os.getpid()
42+
if self._flusher_pid == pid:
43+
return True
44+
45+
with self._lock:
46+
# Recheck to make sure another thread didn't get here and start the
47+
# the flusher in the meantime
48+
if self._flusher_pid == pid:
49+
return True
50+
51+
self._flusher_pid = pid
52+
53+
self._flusher = threading.Thread(target=self._flush_loop)
54+
self._flusher.daemon = True
55+
56+
try:
57+
self._flusher.start()
58+
except RuntimeError:
59+
# Unfortunately at this point the interpreter is in a state that no
60+
# longer allows us to spawn a thread and we have to bail.
61+
self._running = False
62+
return False
63+
64+
return True
65+
66+
def _flush_loop(self):
67+
# type: (...) -> None
68+
while self._running:
69+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
70+
self._flush_event.clear()
71+
self._flush()
72+
73+
def add(
74+
self,
75+
log, # type: Log
76+
):
77+
# type: (...) -> None
78+
if not self._ensure_thread() or self._flusher is None:
79+
return None
80+
81+
with self._lock:
82+
self._log_buffer.append(log)
83+
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
84+
self._flush_event.set()
85+
86+
def kill(self):
87+
# type: (...) -> None
88+
if self._flusher is None:
89+
return
90+
91+
self._running = False
92+
self._flush_event.set()
93+
self._flusher = None
94+
95+
def flush(self):
96+
# type: (...) -> None
97+
self._flush()
98+
99+
@staticmethod
100+
def _log_to_otel(log):
101+
# type: (Log) -> Any
102+
def format_attribute(key, val):
103+
# type: (str, int | float | str | bool) -> Any
104+
if isinstance(val, bool):
105+
return {"key": key, "value": {"boolValue": val}}
106+
if isinstance(val, int):
107+
return {"key": key, "value": {"intValue": str(val)}}
108+
if isinstance(val, float):
109+
return {"key": key, "value": {"doubleValue": val}}
110+
if isinstance(val, str):
111+
return {"key": key, "value": {"stringValue": val}}
112+
return {"key": key, "value": {"stringValue": safe_repr(val)}}
113+
114+
otel_log = {
115+
"severityText": log["severity_text"],
116+
"severityNumber": log["severity_number"],
117+
"body": {"stringValue": log["body"]},
118+
"timeUnixNano": str(log["time_unix_nano"]),
119+
"attributes": [
120+
format_attribute(k, v) for (k, v) in log["attributes"].items()
121+
],
122+
}
123+
124+
if "trace_id" in log:
125+
otel_log["traceId"] = log["trace_id"]
126+
127+
return otel_log
128+
129+
def _flush(self):
130+
# type: (...) -> Optional[Envelope]
131+
132+
envelope = Envelope(
133+
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
134+
)
135+
with self._lock:
136+
for log in self._log_buffer:
137+
envelope.add_log(self._log_to_otel(log))
138+
self._log_buffer.clear()
139+
if envelope.items:
140+
self._capture_func(envelope)
141+
return envelope
142+
return None

sentry_sdk/client.py

+19-43
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import os
32
import uuid
43
import random
@@ -64,6 +63,7 @@
6463
from sentry_sdk.session import Session
6564
from sentry_sdk.spotlight import SpotlightClient
6665
from sentry_sdk.transport import Transport
66+
from sentry_sdk._log_batcher import LogBatcher
6767

6868
I = TypeVar("I", bound=Integration) # noqa: E741
6969

@@ -177,6 +177,7 @@ def __init__(self, options=None):
177177
self.transport = None # type: Optional[Transport]
178178
self.monitor = None # type: Optional[Monitor]
179179
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
180+
self.log_batcher = None # type: Optional[LogBatcher]
180181

181182
def __getstate__(self, *args, **kwargs):
182183
# type: (*Any, **Any) -> Any
@@ -374,6 +375,12 @@ def _capture_envelope(envelope):
374375
"Metrics not supported on Python 3.6 and lower with gevent."
375376
)
376377

378+
self.log_batcher = None
379+
if experiments.get("enable_logs", False):
380+
from sentry_sdk._log_batcher import LogBatcher
381+
382+
self.log_batcher = LogBatcher(capture_func=_capture_envelope)
383+
377384
max_request_body_size = ("always", "never", "small", "medium")
378385
if self.options["max_request_body_size"] not in max_request_body_size:
379386
raise ValueError(
@@ -450,6 +457,7 @@ def _capture_envelope(envelope):
450457
if (
451458
self.monitor
452459
or self.metrics_aggregator
460+
or self.log_batcher
453461
or has_profiling_enabled(self.options)
454462
or isinstance(self.transport, BaseHttpTransport)
455463
):
@@ -867,15 +875,11 @@ def capture_event(
867875

868876
def _capture_experimental_log(self, current_scope, log):
869877
# type: (Scope, Log) -> None
870-
logs_enabled = self.options["_experiments"].get("enable_sentry_logs", False)
878+
logs_enabled = self.options["_experiments"].get("enable_logs", False)
871879
if not logs_enabled:
872880
return
873881
isolation_scope = current_scope.get_isolation_scope()
874882

875-
headers = {
876-
"sent_at": format_timestamp(datetime.now(timezone.utc)),
877-
} # type: dict[str, object]
878-
879883
environment = self.options.get("environment")
880884
if environment is not None and "sentry.environment" not in log["attributes"]:
881885
log["attributes"]["sentry.environment"] = environment
@@ -903,46 +907,14 @@ def _capture_experimental_log(self, current_scope, log):
903907
f'[Sentry Logs] [{log.get("severity_text")}] {log.get("body")}'
904908
)
905909

906-
envelope = Envelope(headers=headers)
907-
908-
before_emit_log = self.options["_experiments"].get("before_emit_log")
909-
if before_emit_log is not None:
910-
log = before_emit_log(log, {})
910+
before_send_log = self.options["_experiments"].get("before_send_log")
911+
if before_send_log is not None:
912+
log = before_send_log(log, {})
911913
if log is None:
912914
return
913915

914-
def format_attribute(key, val):
915-
# type: (str, int | float | str | bool) -> Any
916-
if isinstance(val, bool):
917-
return {"key": key, "value": {"boolValue": val}}
918-
if isinstance(val, int):
919-
return {"key": key, "value": {"intValue": str(val)}}
920-
if isinstance(val, float):
921-
return {"key": key, "value": {"doubleValue": val}}
922-
if isinstance(val, str):
923-
return {"key": key, "value": {"stringValue": val}}
924-
return {"key": key, "value": {"stringValue": json.dumps(val)}}
925-
926-
otel_log = {
927-
"severityText": log["severity_text"],
928-
"severityNumber": log["severity_number"],
929-
"body": {"stringValue": log["body"]},
930-
"timeUnixNano": str(log["time_unix_nano"]),
931-
"attributes": [
932-
format_attribute(k, v) for (k, v) in log["attributes"].items()
933-
],
934-
}
935-
936-
if "trace_id" in log:
937-
otel_log["traceId"] = log["trace_id"]
938-
939-
envelope.add_log(otel_log) # TODO: batch these
940-
941-
if self.spotlight:
942-
self.spotlight.capture_envelope(envelope)
943-
944-
if self.transport is not None:
945-
self.transport.capture_envelope(envelope)
916+
if self.log_batcher:
917+
self.log_batcher.add(log)
946918

947919
def capture_session(
948920
self, session # type: Session
@@ -996,6 +968,8 @@ def close(
996968
self.session_flusher.kill()
997969
if self.metrics_aggregator is not None:
998970
self.metrics_aggregator.kill()
971+
if self.log_batcher is not None:
972+
self.log_batcher.kill()
999973
if self.monitor:
1000974
self.monitor.kill()
1001975
self.transport.kill()
@@ -1020,6 +994,8 @@ def flush(
1020994
self.session_flusher.flush()
1021995
if self.metrics_aggregator is not None:
1022996
self.metrics_aggregator.flush()
997+
if self.log_batcher is not None:
998+
self.log_batcher.flush()
1023999
self.transport.flush(timeout=timeout, callback=callback)
10241000

10251001
def __enter__(self):

sentry_sdk/consts.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class CompressionAlgo(Enum):
7878
Callable[[str, MetricValue, MeasurementUnit, MetricTags], bool]
7979
],
8080
"metric_code_locations": Optional[bool],
81-
"enable_sentry_logs": Optional[bool],
81+
"enable_logs": Optional[bool],
8282
},
8383
total=False,
8484
)

sentry_sdk/integrations/logging.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ def emit(self, record):
348348
if not client.is_active():
349349
return
350350

351-
if not client.options["_experiments"].get("enable_sentry_logs", False):
351+
if not client.options["_experiments"].get("enable_logs", False):
352352
return
353353

354354
SentryLogsHandler._capture_log_from_record(client, record)
@@ -365,7 +365,12 @@ def _capture_log_from_record(client, record):
365365
if isinstance(record.args, tuple):
366366
for i, arg in enumerate(record.args):
367367
attrs[f"sentry.message.parameters.{i}"] = (
368-
arg if isinstance(arg, str) else safe_repr(arg)
368+
arg
369+
if isinstance(arg, str)
370+
or isinstance(arg, float)
371+
or isinstance(arg, int)
372+
or isinstance(arg, bool)
373+
else safe_repr(arg)
369374
)
370375
if record.lineno:
371376
attrs["code.line.number"] = record.lineno

sentry_sdk/_experimental_logger.py renamed to sentry_sdk/logger.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import Any
55

66
from sentry_sdk import get_client, get_current_scope
7+
from sentry_sdk.utils import safe_repr
78

89

910
def _capture_log(severity_text, severity_number, template, **kwargs):
@@ -19,6 +20,20 @@ def _capture_log(severity_text, severity_number, template, **kwargs):
1920
for k, v in kwargs.items():
2021
attrs[f"sentry.message.parameters.{k}"] = v
2122

23+
attrs = {
24+
k: (
25+
v
26+
if (
27+
isinstance(v, str)
28+
or isinstance(v, int)
29+
or isinstance(v, bool)
30+
or isinstance(v, float)
31+
)
32+
else safe_repr(v)
33+
)
34+
for (k, v) in attrs.items()
35+
}
36+
2237
# noinspection PyProtectedMember
2338
client._capture_experimental_log(
2439
scope,
@@ -36,6 +51,6 @@ def _capture_log(severity_text, severity_number, template, **kwargs):
3651
trace = functools.partial(_capture_log, "trace", 1)
3752
debug = functools.partial(_capture_log, "debug", 5)
3853
info = functools.partial(_capture_log, "info", 9)
39-
warn = functools.partial(_capture_log, "warn", 13)
54+
warning = functools.partial(_capture_log, "warning", 13)
4055
error = functools.partial(_capture_log, "error", 17)
4156
fatal = functools.partial(_capture_log, "fatal", 21)

sentry_sdk/types.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from typing import TYPE_CHECKING
1212

1313
if TYPE_CHECKING:
14-
from sentry_sdk._types import Event, EventDataCategory, Hint
14+
from sentry_sdk._types import Event, EventDataCategory, Hint, Log
1515
else:
1616
from typing import Any
1717

@@ -20,5 +20,6 @@
2020
Event = Any
2121
EventDataCategory = Any
2222
Hint = Any
23+
Log = Any
2324

24-
__all__ = ("Event", "EventDataCategory", "Hint")
25+
__all__ = ("Event", "EventDataCategory", "Hint", "Log")

0 commit comments

Comments
 (0)