18
18
from io import BytesIO
19
19
from os import environ
20
20
from typing import Dict , Optional
21
- from time import sleep
22
21
23
22
import requests
24
23
25
- from opentelemetry .exporter .otlp .proto .common ._internal import (
26
- _create_exp_backoff_generator ,
24
+ from opentelemetry .exporter .otlp .proto .common .exporter import (
25
+ RetryableExportError ,
26
+ RetryingExporter ,
27
27
)
28
28
from opentelemetry .exporter .otlp .proto .common .trace_encoder import (
29
29
encode_spans ,
@@ -121,6 +121,7 @@ def __init__(
121
121
{"Content-Encoding" : self ._compression .value }
122
122
)
123
123
self ._shutdown = False
124
+ self ._exporter = RetryingExporter (self ._export , SpanExportResult )
124
125
125
126
def _export (self , serialized_data : bytes ):
126
127
data = serialized_data
@@ -132,14 +133,31 @@ def _export(self, serialized_data: bytes):
132
133
elif self ._compression == Compression .Deflate :
133
134
data = zlib .compress (serialized_data )
134
135
135
- return self ._session .post (
136
+ resp = self ._session .post (
136
137
url = self ._endpoint ,
137
138
data = data ,
138
139
verify = self ._certificate_file ,
139
140
timeout = self ._timeout ,
140
141
cert = self ._client_cert ,
141
142
)
142
143
144
+ if resp .ok :
145
+ return SpanExportResult .SUCCESS
146
+
147
+ if self ._retryable (resp ):
148
+ _logger .warning (
149
+ "Transient error %s encountered while exporting span batch" ,
150
+ resp .reason ,
151
+ )
152
+ raise RetryableExportError (None )
153
+
154
+ _logger .error (
155
+ "Failed to export batch code: %s, reason: %s" ,
156
+ resp .status_code ,
157
+ resp .text ,
158
+ )
159
+ return SpanExportResult .FAILURE
160
+
143
161
@staticmethod
144
162
def _retryable (resp : requests .Response ) -> bool :
145
163
if resp .status_code == 408 :
@@ -151,34 +169,6 @@ def _retryable(resp: requests.Response) -> bool:
151
169
def _serialize_spans (self , spans ):
152
170
return encode_spans (spans ).SerializePartialToString ()
153
171
154
- def _export_serialized_spans (self , serialized_data ):
155
- for delay in _create_exp_backoff_generator (
156
- max_value = self ._MAX_RETRY_TIMEOUT
157
- ):
158
- if delay == self ._MAX_RETRY_TIMEOUT :
159
- return SpanExportResult .FAILURE
160
-
161
- resp = self ._export (serialized_data )
162
- # pylint: disable=no-else-return
163
- if resp .ok :
164
- return SpanExportResult .SUCCESS
165
- elif self ._retryable (resp ):
166
- _logger .warning (
167
- "Transient error %s encountered while exporting span batch, retrying in %ss." ,
168
- resp .reason ,
169
- delay ,
170
- )
171
- sleep (delay )
172
- continue
173
- else :
174
- _logger .error (
175
- "Failed to export batch code: %s, reason: %s" ,
176
- resp .status_code ,
177
- resp .text ,
178
- )
179
- return SpanExportResult .FAILURE
180
- return SpanExportResult .FAILURE
181
-
182
172
def export (self , spans ) -> SpanExportResult :
183
173
# After the call to Shutdown subsequent calls to Export are
184
174
# not allowed and should return a Failure result.
@@ -187,8 +177,7 @@ def export(self, spans) -> SpanExportResult:
187
177
return SpanExportResult .FAILURE
188
178
189
179
serialized_data = self ._serialize_spans (spans )
190
-
191
- return self ._export_serialized_spans (serialized_data )
180
+ return self ._exporter .export_with_retry (serialized_data )
192
181
193
182
def shutdown (self ):
194
183
if self ._shutdown :
0 commit comments