-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/update delete with inserts #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
39769d0
to
06a19d0
Compare
|
@@ -46,4 +46,5 @@ _build | |||
|
|||
# Tinibird |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😒
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lack some context, so didn't go into the details of what's going on in the code.
|
||
event_converted = self.convert(event) | ||
for row in event_converted: | ||
# These columns are added to identify the last change (tb_upd) and the kind of operation performed | ||
# 0 - INSERT, 1 - UPDATE, 2 - DELETE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have these as constants.
# we need to convert Decimal value to str value for suitable for table structure | ||
if type(row[key]) == Decimal: | ||
# we need to convert Decimal or timedelta value to str value for suitable for table structure | ||
if type(row[key]) == [Decimal, datetime.timedelta]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in
?
if type(row[key]) == [Decimal, datetime.timedelta]: | |
if type(row[key]) in [Decimal, datetime.timedelta]: |
updated_values = dict(set(row['after_values'].items()).difference(set(row['before_values'].items()))) | ||
|
||
|
||
# These columns are added to identify the last change (tb_upd) and when a row is deleted (1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment right? and when a row is deleted (1)
or it should be when a row is updated (1)
?
|
||
event_converted = self.convert(event) | ||
for row in event_converted: | ||
# These columns are added to identify the last change (tb_upd) and the kind of operation performed | ||
# 0 - INSERT, 1 - UPDATE, 2 - DELETE | ||
row['tb_upd'] = datetime.datetime.now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see in other places we do this row['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
would it make sense here (and in other places) as well?
writer.close() | ||
writer.push() | ||
writer.destroy() | ||
logging.debug('class:%s process() done', __class__) | ||
logging.debug('class:%s processDelete() done', __class__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this and the method below could be snake case as well
# logging.debug('starting %s', bash) | ||
# os.system(bash) | ||
|
||
logging.debug("CHCSVWriter: delete row") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBCSVWriter?
@@ -79,6 +79,8 @@ | |||
'clickhouse-driver', | |||
'configobj', | |||
'setuptools', | |||
'requests_toolbelt', | |||
'requests' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a rule of thumb I'd force the version of each dependency to avoid having deployments with different versions of the same libraries.
'--tb-token', | ||
type=str, | ||
default=self.default_options['tb_token'], | ||
help='Tinybird host' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
help='Tinybird host' | |
help='Tinybird token' |
@@ -86,13 +96,16 @@ def insert(self, event_or_events=None): | |||
schema = self.dst_schema if self.dst_schema else event_converted.schema | |||
table = None | |||
if self.dst_distribute: | |||
table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) | |||
table = TableProcessor.create_distributed_table_name( | |||
db=event_converted.schema, table=event_converted.table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the naming comes from the library itself, but I find referring to the database as schema
misleading.
# In case of error let's retry only | ||
logging.exception(response.json()) | ||
time.sleep(tries) | ||
logging.info(f"Retrying { tries } of { limit_of_retries }") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this log after the condition
Feature/update delete with inserts
No description provided.