@@ -22,6 +22,8 @@ class TBCSVWriter(Writer):
22
22
tb_host = None
23
23
tb_token = None
24
24
25
+ not_uploaded = None
26
+
25
27
def __init__ (
26
28
self ,
27
29
tb_host ,
@@ -30,6 +32,7 @@ def __init__(
30
32
dst_table = None ,
31
33
dst_table_prefix = None ,
32
34
dst_distribute = False ,
35
+ not_uploaded = False ,
33
36
):
34
37
# if dst_distribute and dst_schema is not None:
35
38
# dst_schema += "_all"
@@ -44,16 +47,18 @@ def __init__(
44
47
if self .tb_host is None or self .tb_token is None :
45
48
logging .critical (
46
49
f" Host: { self .tb_host } or token { self .tb_token } is missing" )
47
- return None
50
+ return
48
51
49
52
self .dst_schema = dst_schema
50
53
self .dst_table = dst_table
51
54
self .dst_table_prefix = dst_table_prefix
52
55
self .dst_distribute = dst_distribute
56
+ self .not_uploaded = not_uploaded
53
57
54
58
55
59
def uploadCSV (self , table , filename , tries = 1 ):
56
60
limit_of_retries = 3
61
+ self .not_uploaded = False
57
62
params = {
58
63
'name' : table ,
59
64
'mode' : 'append' ,
@@ -76,32 +81,32 @@ def uploadCSV(self, table, filename, tries=1):
76
81
params = params ,
77
82
verify = False )
78
83
79
- # logging.debug(response.text)
80
- logging .info (response .json ())
84
+ logging .info (response .content ) # this is ugly, but we need to check what is in the response for some detected errors
81
85
if response .status_code == 200 :
82
86
json_object = json .loads (response .content )
83
87
logging .debug (f"Import id: { json_object ['import_id' ]} " )
84
88
elif response .status_code == 429 :
85
89
retry_after = int (response .headers ['Retry-After' ]) + tries
86
- logging .error (
87
- f"Too many requests retrying in { retry_after } seconds to upload { filename } to { table } " )
90
+ logging .error (f"Too many requests retrying in { retry_after } seconds to upload { filename } to { table } " )
88
91
time .sleep (retry_after )
89
92
self .uploadCSV (table , filename , tries + 1 )
90
93
else :
91
- # In case of error let's retry only
92
- logging .exception (response .json ())
93
- time .sleep (tries )
94
- logging .info (f"Retrying { tries } of { limit_of_retries } " )
94
+ # In case of error let's retry only `limit_of_retries` times
95
+ logging .exception (response .content )
95
96
if tries > limit_of_retries :
97
+ self .not_uploaded = True
96
98
return
99
+ logging .info (f"Retrying { filename } when status { response .status_code } , try { tries } of { limit_of_retries } " )
100
+ time .sleep (tries )
97
101
self .uploadCSV (table , filename , tries + 1 )
98
102
except Exception as e :
99
103
logging .exception (e )
100
- # We wait tries^2 sec to try again
101
- time .sleep (tries * tries )
102
- logging .info (f"Retrying { tries } of { limit_of_retries } " )
103
104
if tries > limit_of_retries :
105
+ self .not_uploaded = True
104
106
return
107
+ # We wait tries^2 sec to try again
108
+ logging .info (f"Retrying { filename } when exception: try { tries } of { limit_of_retries } " )
109
+ time .sleep (tries * tries )
105
110
self .uploadCSV (table , filename , tries + 1 )
106
111
107
112
def insert (self , event_or_events = None ):
@@ -124,7 +129,7 @@ def insert(self, event_or_events=None):
124
129
logging .debug ('class:%s insert %d rows' , __class__ , len (events ))
125
130
126
131
for event in events :
127
- #schema = self.dst_schema if self.dst_schema else event.schema
132
+ # schema = self.dst_schema if self.dst_schema else event.schema
128
133
table = self .dst_table if self .dst_table else event .table
129
134
self .uploadCSV (table , event .filename )
130
135
@@ -231,7 +236,7 @@ def update(self, event_or_events=None):
231
236
# )
232
237
233
238
# choptions = ""
234
- # if self.host:
239
+ # if self.host::wq
235
240
# choptions += " --host=" + shlex.quote(self.host)
236
241
# if self.port:
237
242
# choptions += " --port=" + str(self.port)
0 commit comments