Skip to content

Commit c3c2415

Browse files
committed
export and shutdown timeouts for all OTLP exporters
Fixes open-telemetry#2284, open-telemetry#2663, open-telemetry#3309
1 parent 216411f commit c3c2415

File tree

17 files changed

+995
-584
lines changed

17 files changed

+995
-584
lines changed

exporter/opentelemetry-exporter-otlp-proto-common/README.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ OpenTelemetry Protobuf Encoding
66
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-otlp-proto-common.svg
77
:target: https://pypi.org/project/opentelemetry-exporter-otlp-proto-common/
88

9-
This library is provided as a convenience to encode to Protobuf. Currently used by:
9+
10+
This library provides the shared exporter interface as well as convenience modules to encode to Protobuf. Currently used by:
1011

1112
* opentelemetry-exporter-otlp-proto-grpc
1213
* opentelemetry-exporter-otlp-proto-http
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
import math
2+
import threading
3+
from itertools import count
4+
from logging import getLogger
5+
from os import environ
6+
from random import uniform
7+
from time import time
8+
from typing import (
9+
Generic,
10+
Iterator,
11+
Optional,
12+
Protocol,
13+
Type,
14+
TypeVar,
15+
)
16+
17+
from opentelemetry.sdk.environment_variables import (
18+
OTEL_EXPORTER_OTLP_TIMEOUT,
19+
)
20+
21+
logger = getLogger(__name__)
22+
23+
_DEFAULT_EXPORT_TIMEOUT_S = 10
24+
25+
ExportResultT = TypeVar("ExportResultT", covariant=True)
26+
27+
28+
class _ExportProtocol(Protocol[ExportResultT]):
29+
def __call__(self, timeout_s: float, *args, **kwargs) -> ExportResultT:
30+
...
31+
32+
33+
class RetryableExportError(Exception):
34+
def __init__(self, retry_delay_s: Optional[float] = None):
35+
super().__init__()
36+
37+
self.retry_delay_s = retry_delay_s
38+
39+
40+
class RetryingExporter(Generic[ExportResultT]):
41+
def __init__(
42+
self,
43+
export_function: _ExportProtocol[ExportResultT],
44+
result_type: Type[ExportResultT],
45+
timeout_s: Optional[float] = None,
46+
):
47+
"""OTLP exporter helper class.
48+
49+
Encapsulates timeout behavior for shutdown and export tasks.
50+
51+
Accepts a callable `export_function` of the form
52+
53+
def export_function(
54+
timeout_s: float,
55+
*args,
56+
**kwargs
57+
) -> result_type:
58+
....
59+
60+
that either returns the appropriate export result, or raises a
61+
RetryableExportError exception if the encountered error should
62+
be retried.
63+
64+
Args:
65+
export_function: A callable handling a single export attempt to
66+
be used by export_with_retry()
67+
result_type: Enum-like type defining SUCCESS and FAILURE values
68+
returned by export.
69+
timeout_s: Optional timeout for exports in seconds. Set to smaller
70+
of provided arg and value in OTEL_EXPORTER_OTLP_TIMEOUT. Defaults
71+
to constant if both are unset.
72+
"""
73+
self._result_type = result_type
74+
self._export_function = export_function
75+
if timeout_s:
76+
# If the user provided a timeout, don't use the default as a lower
77+
# bound.
78+
self._timeout_s = min(
79+
timeout_s,
80+
float(environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, math.inf)),
81+
)
82+
else:
83+
self._timeout_s = float(
84+
environ.get(
85+
OTEL_EXPORTER_OTLP_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_S
86+
)
87+
)
88+
89+
self._shutdown_event = threading.Event()
90+
self._export_lock = threading.Lock()
91+
92+
def shutdown(self, timeout_millis: float = 30_000):
93+
if self._shutdown_event.is_set():
94+
logger.warning("Exporter already shutdown, ignoring call")
95+
return
96+
locked = self._export_lock.acquire(timeout=timeout_millis * 1e-3)
97+
self._shutdown_event.set()
98+
if locked:
99+
self._export_lock.release()
100+
101+
def export_with_retry(
102+
self,
103+
timeout_s: float,
104+
*args,
105+
**kwargs,
106+
) -> ExportResultT:
107+
"""Exports data with handling of retryable errors.
108+
109+
Calls the export_function provided at initialization with the following
110+
signature:
111+
112+
export_function(*args, timeout_s=remaining_time, **kwargs)
113+
114+
where `remaining_time` is updated with each retry, and *args and
115+
**kwargs are forwarded as-is.
116+
117+
Retries will be attempted using exponential backoff with full jitter.
118+
If retry_delay_s is specified in the raised error, a retry attempt will
119+
not occur before that delay. If a retry after that delay is
120+
not possible, will immediately abort without retrying.
121+
122+
Will reattempt the export until timeout has passed, at which point
123+
the export will be abandoned and a failure will be returned.
124+
A pending shutdown timing out will also cause retries to time out.
125+
126+
Note: Can block longer than timeout if export_function is blocking.
127+
Ensure export_function blocks minimally and does not attempt
128+
retries.
129+
130+
Args:
131+
timeout_s: Timeout in seconds. No more reattempts will occur after
132+
this time.
133+
*args: Variable length argument list forwarded to underlying export
134+
**kwargs: Arbitrary keyword arguments forwarded to underlying export
135+
136+
"""
137+
# After the call to shutdown, subsequent calls to Export are
138+
# not allowed and should return a Failure result.
139+
if self._shutdown_event.is_set():
140+
logger.warning("Exporter already shutdown, ignoring batch")
141+
return self._result_type.FAILURE
142+
# If negative timeout passed (from e.g. external batch deadline)
143+
# fail immediately
144+
if timeout_s <= 0:
145+
logger.warning("Export deadline passed, ignoring data")
146+
return self._result_type.FAILURE
147+
148+
# Use the lowest of the possible timeouts
149+
timeout_s = (
150+
min(timeout_s, self._timeout_s)
151+
if timeout_s is not None
152+
else self._timeout_s
153+
)
154+
deadline_s = time() + timeout_s
155+
# We acquire a lock to prevent shutdown from interrupting us
156+
try:
157+
if not self._export_lock.acquire(timeout=timeout_s):
158+
logger.warning(
159+
"Exporter failed to acquire lock before timeout"
160+
)
161+
return self._result_type.FAILURE
162+
# _create_exp_backoff_with_jitter returns a generator that yields random delay
163+
# values whose upper bounds grow exponentially. The upper bound will cap at max
164+
# value (never wait more than 64 seconds at once)
165+
max_value = 64
166+
for delay_s in _create_exp_backoff_with_jitter_generator(
167+
max_value=max_value
168+
):
169+
remaining_time_s = deadline_s - time()
170+
171+
if remaining_time_s < 1e-09:
172+
# Timed out
173+
return self._result_type.FAILURE
174+
175+
if self._shutdown_event.is_set():
176+
logger.warning(
177+
"Export cancelled due to shutdown timing out",
178+
)
179+
return self._result_type.FAILURE
180+
181+
try:
182+
return self._export_function(
183+
remaining_time_s,
184+
*args,
185+
**kwargs,
186+
)
187+
except RetryableExportError as err:
188+
time_remaining_s = deadline_s - time()
189+
delay_s = min(time_remaining_s, delay_s)
190+
if err.retry_delay_s is not None:
191+
if err.retry_delay_s > time_remaining_s:
192+
# We should not retry before the requested interval, so
193+
# we must fail out prematurely.
194+
return self._result_type.FAILURE
195+
delay_s = max(err.retry_delay_s, delay_s)
196+
logger.warning(
197+
"Retrying in %ss",
198+
delay_s,
199+
)
200+
self._shutdown_event.wait(delay_s)
201+
finally:
202+
self._export_lock.release()
203+
204+
return self._result_type.FAILURE
205+
206+
207+
def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]:
208+
"""
209+
Generates an infinite sequence of exponential backoff values. The sequence starts
210+
from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified
211+
and non-zero, the generated values will not exceed this maximum, capping at max_value
212+
instead of growing indefinitely.
213+
214+
Parameters:
215+
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
216+
sequence grows without bound.
217+
218+
Returns:
219+
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
220+
capped at max_value.
221+
222+
Example:
223+
```
224+
gen = _create_exp_backoff_generator(max_value=10)
225+
for _ in range(5):
226+
print(next(gen))
227+
```
228+
This will print:
229+
1
230+
2
231+
4
232+
8
233+
10
234+
235+
Note: this functionality used to be handled by the 'backoff' package.
236+
"""
237+
for i in count(0):
238+
out = 2**i
239+
yield min(out, max_value) if max_value else out
240+
241+
242+
def _create_exp_backoff_with_jitter_generator(
243+
max_value: int = 0,
244+
) -> Iterator[float]:
245+
"""
246+
Generates an infinite sequence of exponential backoff values with jitter using the
247+
FullJitter approach. For each element "n" in the exponential backoff series created
248+
by _create_exp_backoff(max_value), yields a random number in the half-open range [0,n).
249+
250+
This algorithm is originally documented at
251+
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
252+
253+
Parameters:
254+
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
255+
sequence grows without bound.
256+
257+
Returns:
258+
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
259+
capped at max_value.
260+
261+
Example:
262+
```
263+
import random
264+
random.seed(20240220)
265+
gen = _create_exp_backoff_with_jitter_generator(max_value=10)
266+
for _ in range(5):
267+
print(next(gen))
268+
```
269+
This will print:
270+
0.1341603010697452
271+
0.34773275270578097
272+
3.6022913287022913
273+
6.663388602254524
274+
10
275+
276+
"""
277+
for i in _create_exp_backoff_generator(max_value):
278+
yield uniform(0, i)

0 commit comments

Comments
 (0)