Skip to content

Feature/update delete with inserts #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jan 16, 2023

Conversation

ygnuss
Copy link

@ygnuss ygnuss commented Jun 8, 2021

No description provided.

@alexon1234 alexon1234 force-pushed the feature/update-delete-with-inserts branch from 39769d0 to 06a19d0 Compare August 10, 2021 16:59
@alexon1234 alexon1234 requested a review from alrocar September 13, 2021 10:19
@alexon1234 alexon1234 changed the title [WIP] Feature/update delete with inserts Feature/update delete with inserts Sep 13, 2021
@alexon1234
Copy link

  • Added support to updated and deletes as inserts using the operation field to identify the type of operation
  • Added gracefully exit to avoid csv created, but not publish when service is restarted

@@ -46,4 +46,5 @@ _build

# Tinibird
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😒

Copy link
Member

@alrocar alrocar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lack some context, so didn't go into the details of what's going on in the code.


event_converted = self.convert(event)
for row in event_converted:
# These columns are added to identify the last change (tb_upd) and the kind of operation performed
# 0 - INSERT, 1 - UPDATE, 2 - DELETE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have these as constants.

# we need to convert Decimal value to str value for suitable for table structure
if type(row[key]) == Decimal:
# we need to convert Decimal or timedelta value to str value for suitable for table structure
if type(row[key]) == [Decimal, datetime.timedelta]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in?

Suggested change
if type(row[key]) == [Decimal, datetime.timedelta]:
if type(row[key]) in [Decimal, datetime.timedelta]:

updated_values = dict(set(row['after_values'].items()).difference(set(row['before_values'].items())))


# These columns are added to identify the last change (tb_upd) and when a row is deleted (1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment right? and when a row is deleted (1) or it should be when a row is updated (1)?


event_converted = self.convert(event)
for row in event_converted:
# These columns are added to identify the last change (tb_upd) and the kind of operation performed
# 0 - INSERT, 1 - UPDATE, 2 - DELETE
row['tb_upd'] = datetime.datetime.now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see in other places we do this row['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

would it make sense here (and in other places) as well?

writer.close()
writer.push()
writer.destroy()
logging.debug('class:%s process() done', __class__)
logging.debug('class:%s processDelete() done', __class__)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this and the method below could be snake case as well

# logging.debug('starting %s', bash)
# os.system(bash)

logging.debug("CHCSVWriter: delete row")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBCSVWriter?

@@ -79,6 +79,8 @@
'clickhouse-driver',
'configobj',
'setuptools',
'requests_toolbelt',
'requests'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a rule of thumb I'd force the version of each dependency to avoid having deployments with different versions of the same libraries.

'--tb-token',
type=str,
default=self.default_options['tb_token'],
help='Tinybird host'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help='Tinybird host'
help='Tinybird token'

@@ -86,13 +96,16 @@ def insert(self, event_or_events=None):
schema = self.dst_schema if self.dst_schema else event_converted.schema
table = None
if self.dst_distribute:
table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table)
table = TableProcessor.create_distributed_table_name(
db=event_converted.schema, table=event_converted.table)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the naming comes from the library itself, but I find referring to the database as schema misleading.

# In case of error let's retry only
logging.exception(response.json())
time.sleep(tries)
logging.info(f"Retrying { tries } of { limit_of_retries }")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this log after the condition

@alexon1234 alexon1234 merged commit 3a45bb5 into master Jan 16, 2023
alexon1234 added a commit that referenced this pull request Jan 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants