Skip to content

Commit 6395d42

Browse files
committed
Extract retry mechanic from OTLPExporterMixin
This is the first change in a chain of commits to rework the retry mechanic. It is based on the work of open-telemetry#3764 and basically trying to land the changes proposed by this monolithic commit step by step. The plan is roughly to proceed in these steps: * Extract retry mechanic from GRPC exporters * Consolidate HTTP with GRPC exporter retry implementation * Pipe timeout through RetryingExporter * Make exporter lock protect the whole export instead of just a single iteration * Make timeout float instead of int * Add back-off with jitter It's pretty likely that the plan will change along the way.
1 parent 19e0a09 commit 6395d42

File tree

9 files changed

+204
-155
lines changed

9 files changed

+204
-155
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Copyright The OpenTelemetry Authors
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
import threading
15+
from logging import getLogger
16+
from time import sleep
17+
from typing import Callable, Generic, Type, TypeVar, Optional
18+
19+
from ._internal import _create_exp_backoff_generator
20+
21+
ExportResultT = TypeVar("ExportResultT", covariant=True)
22+
ExportPayloadT = TypeVar("ExportPayloadT", covariant=True)
23+
24+
logger = getLogger(__name__)
25+
26+
27+
class RetryableExportError(Exception):
28+
def __init__(self, retry_delay_sec: Optional[int]):
29+
super().__init__()
30+
self.retry_delay_sec = retry_delay_sec
31+
32+
33+
class RetryingExporter(Generic[ExportResultT]):
34+
def __init__(
35+
self,
36+
export_function: Callable[[ExportPayloadT], ExportResultT],
37+
result: Type[ExportResultT],
38+
):
39+
self._export_function = export_function
40+
self._result = result
41+
42+
self._shutdown = False
43+
self._export_lock = threading.Lock()
44+
45+
def shutdown(self, timeout_millis: float = 30_000) -> None:
46+
# wait for the last export if any
47+
self._export_lock.acquire( # pylint: disable=consider-using-with
48+
timeout=timeout_millis / 1e3
49+
)
50+
self._shutdown = True
51+
self._export_lock.release()
52+
53+
def export_with_retry(self, payload: ExportPayloadT) -> ExportResultT:
54+
# After the call to shutdown, subsequent calls to Export are
55+
# not allowed and should return a Failure result.
56+
if self._shutdown:
57+
logger.warning("Exporter already shutdown, ignoring batch")
58+
return self._result.FAILURE
59+
60+
max_value = 64
61+
# expo returns a generator that yields delay values which grow
62+
# exponentially. Once delay is greater than max_value, the yielded
63+
# value will remain constant.
64+
for delay in _create_exp_backoff_generator(max_value=max_value):
65+
if delay == max_value or self._shutdown:
66+
return self._result.FAILURE
67+
68+
with self._export_lock:
69+
try:
70+
return self._export_function(payload)
71+
except RetryableExportError as exc:
72+
delay_sec = (
73+
exc.retry_delay_sec
74+
if exc.retry_delay_sec is not None
75+
else delay
76+
)
77+
logger.warning("Retrying in %ss", delay_sec)
78+
sleep(delay_sec)
79+
80+
return self._result.FAILURE

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def _translate_data(
110110
return encode_logs(data)
111111

112112
def export(self, batch: Sequence[LogData]) -> LogExportResult:
113-
return self._export(batch)
113+
return self._exporter.export_with_retry(batch)
114114

115115
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
116116
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 67 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,15 @@
1414

1515
"""OTLP Exporter"""
1616

17-
import threading
1817
from abc import ABC, abstractmethod
19-
from collections.abc import Sequence # noqa: F401
2018
from logging import getLogger
2119
from os import environ
22-
from time import sleep
23-
from typing import ( # noqa: F401
24-
Any,
25-
Callable,
26-
Dict,
27-
Generic,
28-
List,
29-
Optional,
30-
Tuple,
31-
Union,
32-
)
20+
from typing import Any, Callable, Dict, Generic, List, Optional # noqa: F401
3321
from typing import Sequence as TypingSequence
34-
from typing import TypeVar
22+
from typing import Tuple, TypeVar, Union # noqa: F401
3523
from urllib.parse import urlparse
3624

3725
from deprecated import deprecated
38-
39-
from opentelemetry.exporter.otlp.proto.common._internal import (
40-
_get_resource_data,
41-
_create_exp_backoff_generator,
42-
)
4326
from google.rpc.error_details_pb2 import RetryInfo
4427
from grpc import (
4528
ChannelCredentials,
@@ -51,9 +34,14 @@
5134
ssl_channel_credentials,
5235
)
5336

54-
from opentelemetry.exporter.otlp.proto.grpc import (
55-
_OTLP_GRPC_HEADERS,
37+
from opentelemetry.exporter.otlp.proto.common._internal import (
38+
_get_resource_data,
39+
)
40+
from opentelemetry.exporter.otlp.proto.common.exporter import (
41+
RetryableExportError,
42+
RetryingExporter,
5643
)
44+
from opentelemetry.exporter.otlp.proto.grpc import _OTLP_GRPC_HEADERS
5745
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
5846
AnyValue,
5947
ArrayValue,
@@ -260,8 +248,8 @@ def __init__(
260248
)
261249
)
262250

263-
self._export_lock = threading.Lock()
264251
self._shutdown = False
252+
self._exporter = RetryingExporter(self._export, self._result)
265253

266254
@abstractmethod
267255
def _translate_data(
@@ -272,98 +260,72 @@ def _translate_data(
272260
def _export(
273261
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
274262
) -> ExportResultT:
275-
# After the call to shutdown, subsequent calls to Export are
276-
# not allowed and should return a Failure result.
277-
if self._shutdown:
278-
logger.warning("Exporter already shutdown, ignoring batch")
279-
return self._result.FAILURE
263+
try:
264+
self._client.Export(
265+
request=self._translate_data(data),
266+
metadata=self._headers,
267+
timeout=self._timeout,
268+
)
280269

281-
# FIXME remove this check if the export type for traces
282-
# gets updated to a class that represents the proto
283-
# TracesData and use the code below instead.
284-
# logger.warning(
285-
# "Transient error %s encountered while exporting %s, retrying in %ss.",
286-
# error.code(),
287-
# data.__class__.__name__,
288-
# delay,
289-
# )
290-
max_value = 64
291-
# expo returns a generator that yields delay values which grow
292-
# exponentially. Once delay is greater than max_value, the yielded
293-
# value will remain constant.
294-
for delay in _create_exp_backoff_generator(max_value=max_value):
295-
if delay == max_value or self._shutdown:
296-
return self._result.FAILURE
297-
298-
with self._export_lock:
299-
try:
300-
self._client.Export(
301-
request=self._translate_data(data),
302-
metadata=self._headers,
303-
timeout=self._timeout,
270+
return self._result.SUCCESS
271+
272+
except RpcError as error:
273+
if error.code() in [
274+
StatusCode.CANCELLED,
275+
StatusCode.DEADLINE_EXCEEDED,
276+
StatusCode.RESOURCE_EXHAUSTED,
277+
StatusCode.ABORTED,
278+
StatusCode.OUT_OF_RANGE,
279+
StatusCode.UNAVAILABLE,
280+
StatusCode.DATA_LOSS,
281+
]:
282+
retry_info_bin = dict(error.trailing_metadata()).get(
283+
"google.rpc.retryinfo-bin"
284+
)
285+
if retry_info_bin is not None:
286+
retry_info = RetryInfo()
287+
retry_info.ParseFromString(retry_info_bin)
288+
delay = (
289+
retry_info.retry_delay.seconds
290+
+ retry_info.retry_delay.nanos / 1.0e9
304291
)
292+
else:
293+
delay = None
294+
295+
logger.warning(
296+
(
297+
"Transient error %s encountered while exporting "
298+
"%s to %s"
299+
),
300+
error.code(),
301+
self._exporting,
302+
self._endpoint,
303+
)
304+
raise RetryableExportError(delay)
305+
306+
logger.error(
307+
"Failed to export %s to %s, error code: %s",
308+
self._exporting,
309+
self._endpoint,
310+
error.code(),
311+
exc_info=error.code() == StatusCode.UNKNOWN,
312+
)
305313

306-
return self._result.SUCCESS
307-
308-
except RpcError as error:
309-
310-
if error.code() in [
311-
StatusCode.CANCELLED,
312-
StatusCode.DEADLINE_EXCEEDED,
313-
StatusCode.RESOURCE_EXHAUSTED,
314-
StatusCode.ABORTED,
315-
StatusCode.OUT_OF_RANGE,
316-
StatusCode.UNAVAILABLE,
317-
StatusCode.DATA_LOSS,
318-
]:
319-
320-
retry_info_bin = dict(error.trailing_metadata()).get(
321-
"google.rpc.retryinfo-bin"
322-
)
323-
if retry_info_bin is not None:
324-
retry_info = RetryInfo()
325-
retry_info.ParseFromString(retry_info_bin)
326-
delay = (
327-
retry_info.retry_delay.seconds
328-
+ retry_info.retry_delay.nanos / 1.0e9
329-
)
330-
331-
logger.warning(
332-
(
333-
"Transient error %s encountered while exporting "
334-
"%s to %s, retrying in %ss."
335-
),
336-
error.code(),
337-
self._exporting,
338-
self._endpoint,
339-
delay,
340-
)
341-
sleep(delay)
342-
continue
343-
else:
344-
logger.error(
345-
"Failed to export %s to %s, error code: %s",
346-
self._exporting,
347-
self._endpoint,
348-
error.code(),
349-
exc_info=error.code() == StatusCode.UNKNOWN,
350-
)
351-
352-
if error.code() == StatusCode.OK:
353-
return self._result.SUCCESS
354-
355-
return self._result.FAILURE
356-
357-
return self._result.FAILURE
314+
if error.code() == StatusCode.OK:
315+
return self._result.SUCCESS
316+
317+
return self._result.FAILURE
318+
319+
@abstractmethod
320+
def export(self, data) -> ExportResultT:
321+
pass
358322

359323
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
360324
if self._shutdown:
361325
logger.warning("Exporter already shutdown, ignoring call")
362326
return
363-
# wait for the last export if any
364-
self._export_lock.acquire(timeout=timeout_millis / 1e3)
327+
self._exporter.shutdown(timeout_millis=timeout_millis)
365328
self._shutdown = True
366-
self._export_lock.release()
367329

368330
@property
369331
@abstractmethod

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,14 @@ def export(
162162
) -> MetricExportResult:
163163
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
164164
if self._max_export_batch_size is None:
165-
return self._export(data=metrics_data)
165+
return self._exporter.export_with_retry(metrics_data)
166166

167167
export_result = MetricExportResult.SUCCESS
168168

169169
for split_metrics_data in self._split_metrics_data(metrics_data):
170-
split_export_result = self._export(data=split_metrics_data)
170+
split_export_result = self._exporter.export_with_retry(
171+
split_metrics_data
172+
)
171173

172174
if split_export_result is MetricExportResult.FAILURE:
173175
export_result = MetricExportResult.FAILURE

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,17 @@
1515

1616
import logging
1717
from os import environ
18-
from typing import Dict, Optional, Sequence, Tuple, Union
19-
from typing import Sequence as TypingSequence
20-
18+
from typing import Dict, Optional
19+
from typing import Sequence
20+
from typing import Tuple, Union
2121

2222
from grpc import ChannelCredentials, Compression
2323

24-
from opentelemetry.exporter.otlp.proto.common.trace_encoder import (
25-
encode_spans,
26-
)
27-
from opentelemetry.exporter.otlp.proto.grpc.exporter import (
24+
from opentelemetry.exporter.otlp.proto.common.trace_encoder import encode_spans
25+
from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401
2826
OTLPExporterMixin,
2927
_get_credentials,
3028
environ_to_compression,
31-
)
32-
from opentelemetry.exporter.otlp.proto.grpc.exporter import ( # noqa: F401
3329
get_resource_data,
3430
)
3531
from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import (
@@ -41,12 +37,14 @@
4137
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
4238
InstrumentationScope,
4339
)
44-
from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401
45-
ScopeSpans,
40+
from opentelemetry.proto.trace.v1.trace_pb2 import Status # noqa: F401
41+
from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F40
4642
ResourceSpans,
43+
ScopeSpans,
44+
)
45+
from opentelemetry.proto.trace.v1.trace_pb2 import ( # noqa: F401
4746
Span as CollectorSpan,
4847
)
49-
from opentelemetry.proto.trace.v1.trace_pb2 import Status # noqa: F401
5048
from opentelemetry.sdk.environment_variables import (
5149
OTEL_EXPORTER_OTLP_TRACES_CLIENT_CERTIFICATE,
5250
OTEL_EXPORTER_OTLP_TRACES_CLIENT_KEY,
@@ -91,12 +89,11 @@ def __init__(
9189
insecure: Optional[bool] = None,
9290
credentials: Optional[ChannelCredentials] = None,
9391
headers: Optional[
94-
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
92+
Union[Sequence[Tuple[str, str]], Dict[str, str], str]
9593
] = None,
9694
timeout: Optional[int] = None,
9795
compression: Optional[Compression] = None,
9896
):
99-
10097
if insecure is None:
10198
insecure = environ.get(OTEL_EXPORTER_OTLP_TRACES_INSECURE)
10299
if insecure is not None:
@@ -143,7 +140,7 @@ def _translate_data(
143140
return encode_spans(data)
144141

145142
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
146-
return self._export(spans)
143+
return self._exporter.export_with_retry(spans)
147144

148145
def shutdown(self) -> None:
149146
OTLPExporterMixin.shutdown(self)

0 commit comments

Comments
 (0)