Skip to content

Commit 4b2e891

Browse files
committed
Pipe timeout through RetryingExporter
Propagate the given timeout value from all exporters to the RetryingExporter, which will hand it over to the individual `_export` function call. Currently the same unaltered value is handed over. In following commits the Retrying exporter will update the timeout handed over to `_export` depending on the remaining time.
1 parent ea65dc2 commit 4b2e891

File tree

10 files changed

+56
-38
lines changed

10 files changed

+56
-38
lines changed

exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ def __init__(self, retry_delay_sec: Optional[int]):
3333
class RetryingExporter(Generic[ExportResultT]):
3434
def __init__(
3535
self,
36-
export_function: Callable[[ExportPayloadT], ExportResultT],
36+
export_function: Callable[[ExportPayloadT, float], ExportResultT],
3737
result: Type[ExportResultT],
38+
timeout_sec: float,
3839
):
3940
self._export_function = export_function
4041
self._result = result
42+
self._timeout_sec = timeout_sec
4143

4244
self._shutdown = False
4345
self._export_lock = threading.Lock()
@@ -67,7 +69,7 @@ def export_with_retry(self, payload: ExportPayloadT) -> ExportResultT:
6769

6870
with self._export_lock:
6971
try:
70-
return self._export_function(payload)
72+
return self._export_function(payload, self._timeout_sec)
7173
except RetryableExportError as exc:
7274
delay_sec = (
7375
exc.retry_delay_sec

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __init__(
6060
headers: Optional[
6161
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
6262
] = None,
63-
timeout: Optional[int] = None,
63+
timeout: Optional[float] = None,
6464
compression: Optional[Compression] = None,
6565
):
6666
if insecure is None:
@@ -81,7 +81,7 @@ def __init__(
8181

8282
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
8383
environ_timeout = (
84-
int(environ_timeout) if environ_timeout is not None else None
84+
float(environ_timeout) if environ_timeout is not None else None
8585
)
8686

8787
compression = (

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def _get_credentials(
164164
class OTLPExporterMixin(
165165
ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT]
166166
):
167-
"""OTLP span exporter
167+
"""OTLP exporter
168168
169169
Args:
170170
endpoint: OpenTelemetry Collector receiver endpoint
@@ -183,7 +183,7 @@ def __init__(
183183
headers: Optional[
184184
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
185185
] = None,
186-
timeout: Optional[int] = None,
186+
timeout: Optional[float] = None,
187187
compression: Optional[Compression] = None,
188188
):
189189
super().__init__()
@@ -220,8 +220,8 @@ def __init__(
220220
else:
221221
self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS)
222222

223-
self._timeout = timeout or int(
224-
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10)
223+
timeout_sec = timeout or float(
224+
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10.0)
225225
)
226226
self._collector_kwargs = None
227227

@@ -249,7 +249,9 @@ def __init__(
249249
)
250250

251251
self._shutdown = False
252-
self._exporter = RetryingExporter(self._export, self._result)
252+
self._exporter = RetryingExporter(
253+
self._export, self._result, timeout_sec
254+
)
253255

254256
@abstractmethod
255257
def _translate_data(
@@ -258,13 +260,15 @@ def _translate_data(
258260
pass
259261

260262
def _export(
261-
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
263+
self,
264+
data: Union[TypingSequence[ReadableSpan], MetricsData],
265+
timeout_sec: float,
262266
) -> ExportResultT:
263267
try:
264268
self._client.Export(
265269
request=self._translate_data(data),
266270
metadata=self._headers,
267-
timeout=self._timeout,
271+
timeout=timeout_sec,
268272
)
269273

270274
return self._result.SUCCESS

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def __init__(
9898
headers: Optional[
9999
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
100100
] = None,
101-
timeout: Optional[int] = None,
101+
timeout: Optional[float] = None,
102102
compression: Optional[Compression] = None,
103103
preferred_temporality: Dict[type, AggregationTemporality] = None,
104104
preferred_aggregation: Dict[type, Aggregation] = None,
@@ -123,7 +123,7 @@ def __init__(
123123

124124
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
125125
environ_timeout = (
126-
int(environ_timeout) if environ_timeout is not None else None
126+
float(environ_timeout) if environ_timeout is not None else None
127127
)
128128

129129
compression = (

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__(
9191
headers: Optional[
9292
Union[Sequence[Tuple[str, str]], Dict[str, str], str]
9393
] = None,
94-
timeout: Optional[int] = None,
94+
timeout: Optional[float] = None,
9595
compression: Optional[Compression] = None,
9696
):
9797
if insecure is None:
@@ -112,7 +112,7 @@ def __init__(
112112

113113
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
114114
environ_timeout = (
115-
int(environ_timeout) if environ_timeout is not None else None
115+
float(environ_timeout) if environ_timeout is not None else None
116116
)
117117

118118
compression = (

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def __init__(
7474
client_key_file: Optional[str] = None,
7575
client_certificate_file: Optional[str] = None,
7676
headers: Optional[Dict[str, str]] = None,
77-
timeout: Optional[int] = None,
77+
timeout: Optional[float] = None,
7878
compression: Optional[Compression] = None,
7979
session: Optional[requests.Session] = None,
8080
):
@@ -109,7 +109,7 @@ def __init__(
109109
self._headers = headers or parse_env_headers(
110110
headers_string, liberal=True
111111
)
112-
self._timeout = timeout or int(
112+
timeout_sec = timeout or float(
113113
environ.get(
114114
OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
115115
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
@@ -124,9 +124,13 @@ def __init__(
124124
{"Content-Encoding": self._compression.value}
125125
)
126126
self._shutdown = False
127-
self._exporter = RetryingExporter(self._export, LogExportResult)
127+
self._exporter = RetryingExporter(
128+
self._export, LogExportResult, timeout_sec
129+
)
128130

129-
def _export(self, serialized_data: bytes):
131+
def _export(
132+
self, serialized_data: bytes, timeout_sec: float
133+
) -> LogExportResult:
130134
data = serialized_data
131135
if self._compression == Compression.Gzip:
132136
gzip_data = BytesIO()
@@ -140,7 +144,7 @@ def _export(self, serialized_data: bytes):
140144
url=self._endpoint,
141145
data=data,
142146
verify=self._certificate_file,
143-
timeout=self._timeout,
147+
timeout=timeout_sec,
144148
cert=self._client_cert,
145149
)
146150

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def __init__(
105105
client_key_file: Optional[str] = None,
106106
client_certificate_file: Optional[str] = None,
107107
headers: Optional[Dict[str, str]] = None,
108-
timeout: Optional[int] = None,
108+
timeout: Optional[float] = None,
109109
compression: Optional[Compression] = None,
110110
session: Optional[requests.Session] = None,
111111
preferred_temporality: Dict[type, AggregationTemporality] = None,
@@ -141,7 +141,7 @@ def __init__(
141141
self._headers = headers or parse_env_headers(
142142
headers_string, liberal=True
143143
)
144-
self._timeout = timeout or int(
144+
self._timeout = timeout or float(
145145
environ.get(
146146
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
147147
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
@@ -163,9 +163,13 @@ def __init__(
163163
)
164164

165165
self._shutdown = False
166-
self._exporter = RetryingExporter(self._export, MetricExportResult)
166+
self._exporter = RetryingExporter(
167+
self._export, MetricExportResult, self._timeout
168+
)
167169

168-
def _export(self, serialized_data: bytes):
170+
def _export(
171+
self, serialized_data: bytes, timeout_sec: float
172+
) -> MetricExportResult:
169173
data = serialized_data
170174
if self._compression == Compression.Gzip:
171175
gzip_data = BytesIO()
@@ -179,7 +183,7 @@ def _export(self, serialized_data: bytes):
179183
url=self._endpoint,
180184
data=data,
181185
verify=self._certificate_file,
182-
timeout=self._timeout,
186+
timeout=timeout_sec,
183187
cert=self._client_cert,
184188
)
185189

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def __init__(
7272
client_key_file: Optional[str] = None,
7373
client_certificate_file: Optional[str] = None,
7474
headers: Optional[Dict[str, str]] = None,
75-
timeout: Optional[int] = None,
75+
timeout: Optional[float] = None,
7676
compression: Optional[Compression] = None,
7777
session: Optional[requests.Session] = None,
7878
):
@@ -106,7 +106,7 @@ def __init__(
106106
self._headers = headers or parse_env_headers(
107107
headers_string, liberal=True
108108
)
109-
self._timeout = timeout or int(
109+
timeout_sec = timeout or float(
110110
environ.get(
111111
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
112112
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, DEFAULT_TIMEOUT),
@@ -121,9 +121,13 @@ def __init__(
121121
{"Content-Encoding": self._compression.value}
122122
)
123123
self._shutdown = False
124-
self._exporter = RetryingExporter(self._export, SpanExportResult)
124+
self._exporter = RetryingExporter(
125+
self._export, SpanExportResult, timeout_sec
126+
)
125127

126-
def _export(self, serialized_data: bytes):
128+
def _export(
129+
self, serialized_data: bytes, timeout_sec: float
130+
) -> SpanExportResult:
127131
data = serialized_data
128132
if self._compression == Compression.Gzip:
129133
gzip_data = BytesIO()
@@ -137,7 +141,7 @@ def _export(self, serialized_data: bytes):
137141
url=self._endpoint,
138142
data=data,
139143
verify=self._certificate_file,
140-
timeout=self._timeout,
144+
timeout=timeout_sec,
141145
cert=self._client_cert,
142146
)
143147

exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def test_constructor_default(self):
7777
self.assertEqual(exporter._certificate_file, True)
7878
self.assertEqual(exporter._client_certificate_file, None)
7979
self.assertEqual(exporter._client_key_file, None)
80-
self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT)
80+
self.assertEqual(exporter._exporter._timeout_sec, DEFAULT_TIMEOUT)
8181
self.assertIs(exporter._compression, DEFAULT_COMPRESSION)
8282
self.assertEqual(exporter._headers, {})
8383
self.assertIsInstance(exporter._session, requests.Session)
@@ -119,7 +119,7 @@ def test_exporter_metrics_env_take_priority(self):
119119
exporter._client_certificate_file, "logs/client-cert.pem"
120120
)
121121
self.assertEqual(exporter._client_key_file, "logs/client-key.pem")
122-
self.assertEqual(exporter._timeout, 40)
122+
self.assertEqual(exporter._exporter._timeout_sec, 40)
123123
self.assertIs(exporter._compression, Compression.Deflate)
124124
self.assertEqual(
125125
exporter._headers,
@@ -160,7 +160,7 @@ def test_exporter_constructor_take_priority(self):
160160
self.assertEqual(exporter._certificate_file, "/hello.crt")
161161
self.assertEqual(exporter._client_certificate_file, "/client-cert.pem")
162162
self.assertEqual(exporter._client_key_file, "/client-key.pem")
163-
self.assertEqual(exporter._timeout, 70)
163+
self.assertEqual(exporter._exporter._timeout_sec, 70)
164164
self.assertIs(exporter._compression, Compression.NoCompression)
165165
self.assertEqual(
166166
exporter._headers,
@@ -192,7 +192,7 @@ def test_exporter_env(self):
192192
exporter._client_certificate_file, ENV_CLIENT_CERTIFICATE
193193
)
194194
self.assertEqual(exporter._client_key_file, ENV_CLIENT_KEY)
195-
self.assertEqual(exporter._timeout, int(ENV_TIMEOUT))
195+
self.assertEqual(exporter._exporter._timeout_sec, int(ENV_TIMEOUT))
196196
self.assertIs(exporter._compression, Compression.Gzip)
197197
self.assertEqual(
198198
exporter._headers, {"envheader1": "val1", "envheader2": "val2"}

exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def test_constructor_default(self):
6666
self.assertEqual(exporter._certificate_file, True)
6767
self.assertEqual(exporter._client_certificate_file, None)
6868
self.assertEqual(exporter._client_key_file, None)
69-
self.assertEqual(exporter._timeout, DEFAULT_TIMEOUT)
69+
self.assertEqual(exporter._exporter._timeout_sec, DEFAULT_TIMEOUT)
7070
self.assertIs(exporter._compression, DEFAULT_COMPRESSION)
7171
self.assertEqual(exporter._headers, {})
7272
self.assertIsInstance(exporter._session, requests.Session)
@@ -108,7 +108,7 @@ def test_exporter_traces_env_take_priority(self):
108108
exporter._client_certificate_file, "traces/client-cert.pem"
109109
)
110110
self.assertEqual(exporter._client_key_file, "traces/client-key.pem")
111-
self.assertEqual(exporter._timeout, 40)
111+
self.assertEqual(exporter._exporter._timeout_sec, 40)
112112
self.assertIs(exporter._compression, Compression.Deflate)
113113
self.assertEqual(
114114
exporter._headers,
@@ -151,7 +151,7 @@ def test_exporter_constructor_take_priority(self):
151151
exporter._client_certificate_file, "path/to/client-cert.pem"
152152
)
153153
self.assertEqual(exporter._client_key_file, "path/to/client-key.pem")
154-
self.assertEqual(exporter._timeout, 20)
154+
self.assertEqual(exporter._exporter._timeout_sec, 20)
155155
self.assertIs(exporter._compression, Compression.NoCompression)
156156
self.assertEqual(
157157
exporter._headers,
@@ -179,7 +179,7 @@ def test_exporter_env(self):
179179
exporter._client_certificate_file, OS_ENV_CLIENT_CERTIFICATE
180180
)
181181
self.assertEqual(exporter._client_key_file, OS_ENV_CLIENT_KEY)
182-
self.assertEqual(exporter._timeout, int(OS_ENV_TIMEOUT))
182+
self.assertEqual(exporter._exporter._timeout_sec, int(OS_ENV_TIMEOUT))
183183
self.assertIs(exporter._compression, Compression.Gzip)
184184
self.assertEqual(
185185
exporter._headers, {"envheader1": "val1", "envheader2": "val2"}

0 commit comments

Comments
 (0)