Skip to content

Commit 12be12d

Browse files
authored
Merge pull request #25 from tinybirdco/fix_log
Improve retries
2 parents 441833d + d1dbe75 commit 12be12d

File tree

1 file changed

+27
-11
lines changed

1 file changed

+27
-11
lines changed

clickhouse_mysql/writer/tbcsvwriter.py

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def __init__(
5757

5858

5959
def uploadCSV(self, table, filename, tries=1):
60-
limit_of_retries = 3
60+
limit_of_retries = 9
6161
self.not_uploaded = False
6262
params = {
6363
'name': table,
@@ -66,7 +66,6 @@ def uploadCSV(self, table, filename, tries=1):
6666
}
6767

6868
try:
69-
7069
with open(filename, 'rb') as f:
7170
m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')})
7271
url = f"{self.tb_host}/v0/datasources"
@@ -81,34 +80,51 @@ def uploadCSV(self, table, filename, tries=1):
8180
params=params,
8281
verify=False)
8382

84-
logging.info(response.content) # this is ugly, but we need to check what is in the response for some detected errors
83+
logging.debug(response.content)
8584
if response.status_code == 200:
8685
json_object = json.loads(response.content)
8786
logging.debug(f"Import id: {json_object['import_id']}")
87+
return
88+
8889
elif response.status_code == 429:
90+
# retry with the recommended value
8991
retry_after = int(response.headers['Retry-After']) + tries
9092
logging.error(f"Too many requests retrying in {retry_after} seconds to upload {filename} to {table}")
9193
time.sleep(retry_after)
9294
self.uploadCSV(table, filename, tries + 1)
93-
else:
94-
# In case of error let's retry only `limit_of_retries` times
95-
logging.exception(response.content)
95+
elif response.status_code >= 500:
96+
# In case of server error let's retry `limit_of_retries` times, but exponentially
97+
logging.error(response.content)
9698
if tries > limit_of_retries:
99+
logging.debug(f'Limit of retries reached for {filename}')
97100
self.not_uploaded = True
98101
return
99-
logging.info(f"Retrying {filename} when status {response.status_code}, try {tries} of {limit_of_retries}")
100-
time.sleep(tries)
102+
103+
retry_after = 2 ** tries
104+
logging.info(f"Retrying {filename} when status {response.status_code}, waiting {retry_after}, try {tries} of {limit_of_retries}")
105+
time.sleep(retry_after)
101106
self.uploadCSV(table, filename, tries + 1)
107+
else:
108+
# In case of other client errors (400, 404, etc...) don't retry
109+
logging.error(response.content)
110+
logging.error(f"Not retrying {filename} when status {response.status_code}")
111+
self.not_uploaded = True
112+
return
113+
102114
except Exception as e:
115+
# As we don't know what went wrong... let's retry `limit_of_retries` times, but exponentially
103116
logging.exception(e)
104117
if tries > limit_of_retries:
118+
logging.debug(f'Limit of retries reached for {filename}')
105119
self.not_uploaded = True
106120
return
107-
# We wait tries^2 sec to try again
108-
logging.info(f"Retrying {filename} when exception: try {tries} of {limit_of_retries}")
109-
time.sleep(tries * tries)
121+
122+
retry_after = 2 ** tries
123+
logging.info(f"Retrying {filename} when exception, waiting {retry_after} try {tries} of {limit_of_retries}")
124+
time.sleep(retry_after)
110125
self.uploadCSV(table, filename, tries + 1)
111126

127+
112128
def insert(self, event_or_events=None):
113129
# event_or_events = [
114130
# event: {

0 commit comments

Comments
 (0)