Skip to content

Feature/update delete with inserts #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[flake8]
ignore =
; except
E722,
; inline regex
W605,
; long lines
E501,
; too complex
C901
max-complexity = 10
max-line-length = 120
application-import-names = flake8
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ _build

# Tinibird
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😒

bl-*
out-*
out-*
.e
23 changes: 23 additions & 0 deletions clickhouse_mysql/clioptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class CLIOptions(Options):
#
# general app section
#

'tb_host': 'https://ui.tinybird.co',
'tb_token': None,

'config_file': '/etc/clickhouse-mysql/clickhouse-mysql.conf',
'log_file': None,
'log_level': None,
Expand Down Expand Up @@ -171,6 +175,20 @@ def options(self):
#
# general app section
#
argparser.add_argument(
'--tb-host',
type=str,
default=self.default_options['tb_host'],
help='Tinybird host'
)

argparser.add_argument(
'--tb-token',
type=str,
default=self.default_options['tb_token'],
help='Tinybird host'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
help='Tinybird host'
help='Tinybird token'

)

argparser.add_argument(
'--config-file',
type=str,
Expand Down Expand Up @@ -508,6 +526,11 @@ def options(self):
#
# general app section
#

'tb_host': args.tb_host,
'tb_token': args.tb_token,


'config_file': args.config_file,
'log_file': args.log_file,
'log_level': args.log_level,
Expand Down
23 changes: 17 additions & 6 deletions clickhouse_mysql/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import os
from clickhouse_mysql.reader.mysqlreader import MySQLReader
from clickhouse_mysql.reader.csvreader import CSVReader

from clickhouse_mysql.writer.chwriter import CHWriter
from clickhouse_mysql.writer.csvwriter import CSVWriter
from clickhouse_mysql.writer.chcsvwriter import CHCSVWriter
from clickhouse_mysql.writer.tbcsvwriter import TBCSVWriter
from clickhouse_mysql.writer.poolwriter import PoolWriter
from clickhouse_mysql.writer.processwriter import ProcessWriter
from clickhouse_mysql.objectbuilder import ObjectBuilder
Expand Down Expand Up @@ -39,7 +41,7 @@ def __init__(self):

log_file = None
log_pos = None
if self.options['binlog_position_file'] and self.options.get_bool('src_resume'):
if self.options['binlog_position_file'] and self.options.get_bool('src_resume') and os.path.exists(self.options['binlog_position_file']):
try:
with open(self.options['binlog_position_file'], 'r') as f:
position = f.read()
Expand All @@ -50,17 +52,22 @@ def __init__(self):
log_file,
log_pos
))
except:
except Exception as e:
log_file = None
log_pos = None
print("can't read binlog position from file {}".format(
logging.exception(e)
logging.info("can't read binlog position from file {}".format(
self.options['binlog_position_file'],
))
# build application config out of aggregated options
self.config = {
#
#
#
'tinybird': {
'host': self.options['tb_host'],
'token': self.options['tb_token'],
},
'app': {
'config_file': self.options['config_file'],
'log_file': self.options['log_file'],
Expand Down Expand Up @@ -359,8 +366,12 @@ def writer_builder_csvpool(self):
'dst_table': self.config['writer']['file']['dst_table'],
'dst_table_prefix': self.config['writer']['file']['dst_table_prefix'],
'next_writer_builder': ObjectBuilder(
class_name=CHCSVWriter,
constructor_params=self.config['writer']['clickhouse']
class_name=TBCSVWriter,
constructor_params={
'tb_host': self.config['tinybird']['host'],
'tb_token': self.config['tinybird']['token'],
'dst_table': self.config['writer']['clickhouse']['dst_table']
}
),
'converter_builder': self.converter_builder(CONVERTER_CSV),
})
Expand Down
6 changes: 5 additions & 1 deletion clickhouse_mysql/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def __next__(self):

if self.pymysqlreplication_event is not None:
# in native replication event actual data are in row['values'] dict item
return item['values']
if 'after_values' in item:
return item['after_values']
else:
return item['values']

else:
# local-kept data
return item
Expand Down
5 changes: 5 additions & 0 deletions clickhouse_mysql/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import signal
import sys
import multiprocessing as mp
import logging
Expand Down Expand Up @@ -145,6 +146,10 @@ def run(self):
reader=self.config.reader(),
writer=self.config.writer(),
)

signal.signal(signal.SIGINT, pumper.exit_gracefully)
signal.signal(signal.SIGTERM, pumper.exit_gracefully)

pumper.run()

except Exception as ex:
Expand Down
14 changes: 11 additions & 3 deletions clickhouse_mysql/pumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
# -*- coding: utf-8 -*-


from clickhouse_mysql.reader.reader import Reader
from clickhouse_mysql.writer.writer import Writer
import signal


class Pumper(object):
"""
Pump data - read data from reader and push into writer
"""

reader = None
writer = None
reader: Reader = None
writer: Writer = None

def __init__(self, reader=None, writer=None):
self.reader = reader
Expand All @@ -21,7 +26,7 @@ def __init__(self, reader=None, writer=None):
'UpdateRowsEvent': self.update_rows_event,
'DeleteRowsEvent': self.delete_rows_event,
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
'ReaderIdleEvent': self.reader_idle_event,
# 'ReaderIdleEvent': self.reader_idle_event,
})

def run(self):
Expand Down Expand Up @@ -61,6 +66,9 @@ def update_rows_event(self, event=None):
"""
self.writer.update(event)

def exit_gracefully(self, sig, frame):
self.reader.close()


if __name__ == '__main__':
print("pumper")
46 changes: 23 additions & 23 deletions clickhouse_mysql/reader/mysqlreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from clickhouse_mysql.event.event import Event
from clickhouse_mysql.tableprocessor import TableProcessor
from clickhouse_mysql.util import Util
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent


class MySQLReader(Reader):
Expand All @@ -29,9 +28,10 @@ class MySQLReader(Reader):
resume_stream = None
binlog_stream = None
nice_pause = 0
exit_gracefully = False

write_rows_event_num = 0
write_rows_event_each_row_num = 0;
write_rows_event_each_row_num = 0

binlog_position_file = None

Expand Down Expand Up @@ -316,21 +316,21 @@ def process_update_rows_event(self, mysql_event):
return

# statistics
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))

if self.subscribers('UpdateRowsEvent'):
# dispatch event to subscribers

# statistics
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
# self.stat_write_rows_event_all_rows(mysql_event=mysql_event)

# dispatch Event
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.pymysqlreplication_event = mysql_event

#self.process_first_event(event=event)
self.process_first_event(event=event)
self.notify('UpdateRowsEvent', event=event)

# self.stat_write_rows_event_finalyse()
Expand All @@ -355,13 +355,13 @@ def process_delete_rows_event(self, mysql_event):
return

# statistics
#self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
# self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))

if self.subscribers('DeleteRowsEvent'):
# dispatch event to subscribers

# statistics
#self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
# self.stat_write_rows_event_all_rows(mysql_event=mysql_event)

# dispatch Event
event = Event()
Expand Down Expand Up @@ -389,7 +389,7 @@ def read(self):

# fetch events
try:
while True:
while not self.exit_gracefully:
logging.debug('Check events in binlog stream')

self.init_fetch_loop()
Expand All @@ -403,8 +403,9 @@ def read(self):

# fetch available events from MySQL
for mysql_event in self.binlog_stream:
# new event has come
# check what to do with it

if self.exit_gracefully:
break

logging.debug(
'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos))
Expand All @@ -420,23 +421,19 @@ def read(self):
# skip other unhandled events
pass

# after event processed, we need to handle current binlog position
self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos)
# after event processed, we need to handle current binlog position
self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos)

except KeyboardInterrupt:
# pass SIGINT further
logging.info("SIGINT received. Pass it further.")
raise
except Exception as ex:
if self.blocking:
# we'd like to continue waiting for data
# report and continue cycle
logging.warning("Got an exception, skip it in blocking mode")
logging.warning(ex)
logging.exception(ex)
else:
# do not continue, report error and exit
logging.critical("Got an exception, abort it in non-blocking mode")
logging.critical(ex)
logging.exception(ex)
sys.exit(1)

# all events fetched (or none of them available)
Expand All @@ -453,25 +450,28 @@ def read(self):
time.sleep(self.nice_pause)

self.notify('ReaderIdleEvent')

except KeyboardInterrupt:
logging.info("SIGINT received. Time to exit.")
except Exception as ex:
logging.warning("Got an exception, handle it")
logging.warning(ex)
logging.exception(ex)

try:
self.binlog_stream.close()
logging.info("Stop reading from MySQL")
except Exception as ex:
logging.warning("Unable to close binlog stream correctly")
logging.warning(ex)
logging.exception(ex)

end_timestamp = int(time.time())

logging.info('start %d', self.start_timestamp)
logging.info('end %d', end_timestamp)
logging.info('len %d', end_timestamp - self.start_timestamp)

def close(self):
self.exit_gracefully = True
self.nice_pause = 0
logging.info("MySQL should stop in the next loop")


if __name__ == '__main__':
connection_settings = {
Expand Down
3 changes: 3 additions & 0 deletions clickhouse_mysql/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ def __init__(self, converter=None, callbacks={}):

def read(self):
pass

def close(self):
pass
Loading