Skip to content

Commit bd19a3d

Browse files
committed
export and shutdown timeouts for all OTLP exporters
Fixes open-telemetry#2284, open-telemetry#2663, open-telemetry#3309
1 parent 216411f commit bd19a3d

File tree

19 files changed

+1140
-628
lines changed

19 files changed

+1140
-628
lines changed

exporter/opentelemetry-exporter-otlp-proto-common/README.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ OpenTelemetry Protobuf Encoding
66
.. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-otlp-proto-common.svg
77
:target: https://pypi.org/project/opentelemetry-exporter-otlp-proto-common/
88

9-
This library is provided as a convenience to encode to Protobuf. Currently used by:
9+
10+
This library provides the shared exporter interface as well as convenience modules to encode to Protobuf. Currently used by:
1011

1112
* opentelemetry-exporter-otlp-proto-grpc
1213
* opentelemetry-exporter-otlp-proto-http

exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import logging
1717
from collections.abc import Sequence
18-
from itertools import count
1918
from typing import (
2019
Any,
2120
Mapping,
@@ -24,7 +23,6 @@
2423
Callable,
2524
TypeVar,
2625
Dict,
27-
Iterator,
2826
)
2927

3028
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
@@ -138,38 +136,3 @@ def _get_resource_data(
138136
)
139137
)
140138
return resource_data
141-
142-
143-
def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]:
144-
"""
145-
Generates an infinite sequence of exponential backoff values. The sequence starts
146-
from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified
147-
and non-zero, the generated values will not exceed this maximum, capping at max_value
148-
instead of growing indefinitely.
149-
150-
Parameters:
151-
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
152-
sequence grows without bound.
153-
154-
Returns:
155-
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
156-
capped at max_value.
157-
158-
Example:
159-
```
160-
gen = _create_exp_backoff_generator(max_value=10)
161-
for _ in range(5):
162-
print(next(gen))
163-
```
164-
This will print:
165-
1
166-
2
167-
4
168-
8
169-
10
170-
171-
Note: this functionality used to be handled by the 'backoff' package.
172-
"""
173-
for i in count(0):
174-
out = 2**i
175-
yield min(out, max_value) if max_value else out
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
import math
2+
import threading
3+
from itertools import count
4+
from logging import getLogger
5+
from os import environ
6+
from random import uniform
7+
from time import time
8+
from typing import (
9+
Generic,
10+
Iterator,
11+
Optional,
12+
Protocol,
13+
Type,
14+
TypeVar,
15+
)
16+
17+
from opentelemetry.sdk.environment_variables import (
18+
OTEL_EXPORTER_OTLP_TIMEOUT,
19+
)
20+
21+
logger = getLogger(__name__)
22+
23+
_DEFAULT_EXPORT_TIMEOUT_S = 10
24+
25+
ExportResultT = TypeVar("ExportResultT", covariant=True)
26+
27+
28+
class _ExportProtocol(Protocol[ExportResultT]):
29+
def __call__(self, timeout_s: float, *args, **kwargs) -> ExportResultT:
30+
...
31+
32+
33+
class RetryableExportError(Exception):
34+
def __init__(self, retry_delay_s: Optional[float] = None):
35+
super().__init__()
36+
37+
self.retry_delay_s = retry_delay_s
38+
39+
40+
class RetryingExporter(Generic[ExportResultT]):
41+
def __init__(
42+
self,
43+
export_function: _ExportProtocol[ExportResultT],
44+
result_type: Type[ExportResultT],
45+
timeout_s: Optional[float] = None,
46+
):
47+
"""OTLP exporter helper class.
48+
49+
Encapsulates timeout behavior for shutdown and export tasks.
50+
51+
Accepts a callable `export_function` of the form
52+
53+
def export_function(
54+
timeout_s: float,
55+
*args,
56+
**kwargs
57+
) -> result_type:
58+
....
59+
60+
that either returns the appropriate export result, or raises a
61+
RetryableExportError exception if the encountered error should
62+
be retried.
63+
64+
Args:
65+
export_function: A callable handling a single export attempt to
66+
be used by export_with_retry()
67+
result_type: Enum-like type defining SUCCESS and FAILURE values
68+
returned by export.
69+
timeout_s: Optional timeout for exports in seconds. Set to smaller
70+
of provided arg and value in OTEL_EXPORTER_OTLP_TIMEOUT. Defaults
71+
to constant if both are unset.
72+
"""
73+
self._result_type = result_type
74+
self._export_function = export_function
75+
if timeout_s:
76+
# If the user provided a timeout, don't use the default as a lower
77+
# bound.
78+
self._timeout_s = min(
79+
timeout_s,
80+
float(environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, math.inf)),
81+
)
82+
else:
83+
self._timeout_s = float(
84+
environ.get(
85+
OTEL_EXPORTER_OTLP_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_S
86+
)
87+
)
88+
89+
self._shutdown_event = threading.Event()
90+
self._export_lock = threading.Lock()
91+
92+
def shutdown(self, timeout_millis: float = 30_000):
93+
if self._shutdown_event.is_set():
94+
logger.warning("Exporter already shutdown, ignoring call")
95+
return
96+
locked = False
97+
try:
98+
locked = self._export_lock.acquire(timeout=timeout_millis * 1e-3)
99+
self._shutdown_event.set()
100+
finally:
101+
if locked:
102+
# Only release if we were the one who acquired the lock
103+
self._export_lock.release()
104+
105+
def export_with_retry(
106+
self,
107+
timeout_s: float,
108+
*args,
109+
**kwargs,
110+
) -> ExportResultT:
111+
"""Exports data with handling of retryable errors.
112+
113+
Calls the export_function provided at initialization with the following
114+
signature:
115+
116+
export_function(*args, timeout_s=remaining_time, **kwargs)
117+
118+
where `remaining_time` is updated with each retry, and *args and
119+
**kwargs are forwarded as-is.
120+
121+
Retries will be attempted using exponential backoff with full jitter.
122+
If retry_delay_s is specified in the raised error, a retry attempt will
123+
not occur before that delay. If a retry after that delay is
124+
not possible, will immediately abort without retrying.
125+
126+
Will reattempt the export until timeout has passed, at which point
127+
the export will be abandoned and a failure will be returned.
128+
A pending shutdown timing out will also cause retries to time out.
129+
130+
Note: Can block longer than timeout if export_function is blocking.
131+
Ensure export_function blocks minimally and does not attempt
132+
retries.
133+
134+
Args:
135+
timeout_s: Timeout in seconds. No more reattempts will occur after
136+
this time.
137+
*args: Variable length argument list forwarded to underlying export
138+
**kwargs: Arbitrary keyword arguments forwarded to underlying export
139+
140+
"""
141+
# After the call to shutdown, subsequent calls to Export are
142+
# not allowed and should return a Failure result.
143+
if self._shutdown_event.is_set():
144+
logger.warning("Exporter already shutdown, ignoring batch")
145+
return self._result_type.FAILURE
146+
# If negative timeout passed (from e.g. external batch deadline)
147+
# fail immediately
148+
if timeout_s <= 0:
149+
logger.warning("Export deadline passed, ignoring data")
150+
return self._result_type.FAILURE
151+
152+
# Use the lowest of the possible timeouts
153+
timeout_s = (
154+
min(timeout_s, self._timeout_s)
155+
if timeout_s is not None
156+
else self._timeout_s
157+
)
158+
deadline_s = time() + timeout_s
159+
# We acquire a lock to prevent shutdown from interrupting us
160+
try:
161+
if not self._export_lock.acquire(timeout=timeout_s):
162+
logger.warning(
163+
"Exporter failed to acquire lock before timeout"
164+
)
165+
return self._result_type.FAILURE
166+
# _create_exp_backoff_with_jitter returns a generator that yields random delay
167+
# values whose upper bounds grow exponentially. The upper bound will cap at max
168+
# value (never wait more than 64 seconds at once)
169+
max_value = 64
170+
for delay_s in _create_exp_backoff_with_jitter_generator(
171+
max_value=max_value
172+
):
173+
remaining_time_s = deadline_s - time()
174+
175+
if remaining_time_s < 1e-09:
176+
# Timed out
177+
return self._result_type.FAILURE
178+
179+
if self._shutdown_event.is_set():
180+
logger.warning(
181+
"Export cancelled due to shutdown timing out",
182+
)
183+
return self._result_type.FAILURE
184+
185+
try:
186+
return self._export_function(
187+
remaining_time_s,
188+
*args,
189+
**kwargs,
190+
)
191+
except RetryableExportError as err:
192+
time_remaining_s = deadline_s - time()
193+
delay_s = min(time_remaining_s, delay_s)
194+
if err.retry_delay_s is not None:
195+
if err.retry_delay_s > time_remaining_s:
196+
# We should not retry before the requested interval, so
197+
# we must fail out prematurely.
198+
return self._result_type.FAILURE
199+
delay_s = max(err.retry_delay_s, delay_s)
200+
logger.warning(
201+
"Retrying in %ss",
202+
delay_s,
203+
)
204+
self._shutdown_event.wait(delay_s)
205+
finally:
206+
self._export_lock.release()
207+
208+
return self._result_type.FAILURE
209+
210+
211+
def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]:
212+
"""
213+
Generates an infinite sequence of exponential backoff values. The sequence starts
214+
from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified
215+
and non-zero, the generated values will not exceed this maximum, capping at max_value
216+
instead of growing indefinitely.
217+
218+
Parameters:
219+
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
220+
sequence grows without bound.
221+
222+
Returns:
223+
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
224+
capped at max_value.
225+
226+
Example:
227+
```
228+
gen = _create_exp_backoff_generator(max_value=10)
229+
for _ in range(5):
230+
print(next(gen))
231+
```
232+
This will print:
233+
1
234+
2
235+
4
236+
8
237+
10
238+
239+
Note: this functionality used to be handled by the 'backoff' package.
240+
"""
241+
for i in count(0):
242+
out = 2**i
243+
yield min(out, max_value) if max_value else out
244+
245+
246+
def _create_exp_backoff_with_jitter_generator(
247+
max_value: int = 0,
248+
) -> Iterator[float]:
249+
"""
250+
Generates an infinite sequence of exponential backoff values with jitter using the
251+
FullJitter approach. For each element "n" in the exponential backoff series created
252+
by _create_exp_backoff(max_value), yields a random number in the half-open range [0,n).
253+
254+
This algorithm is originally documented at
255+
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
256+
257+
Parameters:
258+
- max_value (int, optional): The maximum value to yield. If 0 or not provided, the
259+
sequence grows without bound.
260+
261+
Returns:
262+
Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or
263+
capped at max_value.
264+
265+
Example:
266+
```
267+
import random
268+
random.seed(20240220)
269+
gen = _create_exp_backoff_with_jitter_generator(max_value=10)
270+
for _ in range(5):
271+
print(next(gen))
272+
```
273+
This will print:
274+
0.1341603010697452
275+
0.34773275270578097
276+
3.6022913287022913
277+
6.663388602254524
278+
6.492676168164246
279+
280+
"""
281+
for i in _create_exp_backoff_generator(max_value):
282+
yield uniform(0, i)

exporter/opentelemetry-exporter-otlp-proto-common/tests/test_backoff.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import random
1516
from unittest import TestCase
1617

17-
from opentelemetry.exporter.otlp.proto.common._internal import (
18+
from opentelemetry.exporter.otlp.proto.common.exporter import (
1819
_create_exp_backoff_generator,
20+
_create_exp_backoff_with_jitter_generator,
1921
)
2022

2123

@@ -44,3 +46,20 @@ def test_exp_backoff_generator_with_odd_max(self):
4446
self.assertEqual(next(generator), 4)
4547
self.assertEqual(next(generator), 8)
4648
self.assertEqual(next(generator), 11)
49+
50+
51+
class TestBackoffWithJitterGenerator(TestCase):
52+
def setUp(self):
53+
self.initial_state = random.getstate()
54+
55+
def tearDown(self):
56+
return random.setstate(self.initial_state)
57+
58+
def test_exp_backoff_with_jitter_generator(self):
59+
random.seed(20240220)
60+
generator = _create_exp_backoff_with_jitter_generator(max_value=10)
61+
self.assertAlmostEqual(next(generator), 0.1341603010697452)
62+
self.assertAlmostEqual(next(generator), 0.34773275270578097)
63+
self.assertAlmostEqual(next(generator), 3.6022913287022913)
64+
self.assertAlmostEqual(next(generator), 6.663388602254524)
65+
self.assertAlmostEqual(next(generator), 6.492676168164246)

0 commit comments

Comments
 (0)