diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..420a518 --- /dev/null +++ b/.flake8 @@ -0,0 +1,13 @@ +[flake8] +ignore = + ; except + E722, + ; inline regex + W605, + ; long lines + E501, + ; too complex + C901 +max-complexity = 10 +max-line-length = 120 +application-import-names = flake8 \ No newline at end of file diff --git a/.gitignore b/.gitignore index c44086a..1b8fd9a 100644 --- a/.gitignore +++ b/.gitignore @@ -46,4 +46,5 @@ _build # Tinibird bl-* -out-* \ No newline at end of file +out-* +.e diff --git a/clickhouse_mysql/clioptions.py b/clickhouse_mysql/clioptions.py index 4be23a2..c46897e 100644 --- a/clickhouse_mysql/clioptions.py +++ b/clickhouse_mysql/clioptions.py @@ -93,6 +93,10 @@ class CLIOptions(Options): # # general app section # + + 'tb_host': 'https://ui.tinybird.co', + 'tb_token': None, + 'config_file': '/etc/clickhouse-mysql/clickhouse-mysql.conf', 'log_file': None, 'log_level': None, @@ -171,6 +175,20 @@ def options(self): # # general app section # + argparser.add_argument( + '--tb-host', + type=str, + default=self.default_options['tb_host'], + help='Tinybird host' + ) + + argparser.add_argument( + '--tb-token', + type=str, + default=self.default_options['tb_token'], + help='Tinybird host' + ) + argparser.add_argument( '--config-file', type=str, @@ -508,6 +526,11 @@ def options(self): # # general app section # + + 'tb_host': args.tb_host, + 'tb_token': args.tb_token, + + 'config_file': args.config_file, 'log_file': args.log_file, 'log_level': args.log_level, diff --git a/clickhouse_mysql/config.py b/clickhouse_mysql/config.py index e4551c8..217aa0b 100644 --- a/clickhouse_mysql/config.py +++ b/clickhouse_mysql/config.py @@ -1,12 +1,14 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import logging +import os from clickhouse_mysql.reader.mysqlreader import MySQLReader from clickhouse_mysql.reader.csvreader import CSVReader from clickhouse_mysql.writer.chwriter import CHWriter from clickhouse_mysql.writer.csvwriter import CSVWriter -from clickhouse_mysql.writer.chcsvwriter import CHCSVWriter +from clickhouse_mysql.writer.tbcsvwriter import TBCSVWriter from clickhouse_mysql.writer.poolwriter import PoolWriter from clickhouse_mysql.writer.processwriter import ProcessWriter from clickhouse_mysql.objectbuilder import ObjectBuilder @@ -39,7 +41,7 @@ def __init__(self): log_file = None log_pos = None - if self.options['binlog_position_file'] and self.options.get_bool('src_resume'): + if self.options['binlog_position_file'] and self.options.get_bool('src_resume') and os.path.exists(self.options['binlog_position_file']): try: with open(self.options['binlog_position_file'], 'r') as f: position = f.read() @@ -50,10 +52,11 @@ def __init__(self): log_file, log_pos )) - except: + except Exception as e: log_file = None log_pos = None - print("can't read binlog position from file {}".format( + logging.exception(e) + logging.info("can't read binlog position from file {}".format( self.options['binlog_position_file'], )) # build application config out of aggregated options @@ -61,6 +64,10 @@ def __init__(self): # # # + 'tinybird': { + 'host': self.options['tb_host'], + 'token': self.options['tb_token'], + }, 'app': { 'config_file': self.options['config_file'], 'log_file': self.options['log_file'], @@ -359,8 +366,12 @@ def writer_builder_csvpool(self): 'dst_table': self.config['writer']['file']['dst_table'], 'dst_table_prefix': self.config['writer']['file']['dst_table_prefix'], 'next_writer_builder': ObjectBuilder( - class_name=CHCSVWriter, - constructor_params=self.config['writer']['clickhouse'] + class_name=TBCSVWriter, + constructor_params={ + 'tb_host': self.config['tinybird']['host'], + 'tb_token': self.config['tinybird']['token'], + 'dst_table': self.config['writer']['clickhouse']['dst_table'] + } ), 'converter_builder': self.converter_builder(CONVERTER_CSV), }) diff --git a/clickhouse_mysql/event/event.py b/clickhouse_mysql/event/event.py index e018e57..e38f80b 100644 --- a/clickhouse_mysql/event/event.py +++ b/clickhouse_mysql/event/event.py @@ -64,7 +64,11 @@ def __next__(self): if self.pymysqlreplication_event is not None: # in native replication event actual data are in row['values'] dict item - return item['values'] + if 'after_values' in item: + return item['after_values'] + else: + return item['values'] + else: # local-kept data return item diff --git a/clickhouse_mysql/main.py b/clickhouse_mysql/main.py index 662fd4d..d751573 100644 --- a/clickhouse_mysql/main.py +++ b/clickhouse_mysql/main.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import signal import sys import multiprocessing as mp import logging @@ -145,6 +146,10 @@ def run(self): reader=self.config.reader(), writer=self.config.writer(), ) + + signal.signal(signal.SIGINT, pumper.exit_gracefully) + signal.signal(signal.SIGTERM, pumper.exit_gracefully) + pumper.run() except Exception as ex: diff --git a/clickhouse_mysql/pumper.py b/clickhouse_mysql/pumper.py index 0b5d0c3..245f9c2 100644 --- a/clickhouse_mysql/pumper.py +++ b/clickhouse_mysql/pumper.py @@ -2,13 +2,18 @@ # -*- coding: utf-8 -*- +from clickhouse_mysql.reader.reader import Reader +from clickhouse_mysql.writer.writer import Writer +import signal + + class Pumper(object): """ Pump data - read data from reader and push into writer """ - reader = None - writer = None + reader: Reader = None + writer: Writer = None def __init__(self, reader=None, writer=None): self.reader = reader @@ -21,7 +26,7 @@ def __init__(self, reader=None, writer=None): 'UpdateRowsEvent': self.update_rows_event, 'DeleteRowsEvent': self.delete_rows_event, # 'WriteRowsEvent.EachRow': self.write_rows_event_each_row, - 'ReaderIdleEvent': self.reader_idle_event, + # 'ReaderIdleEvent': self.reader_idle_event, }) def run(self): @@ -61,6 +66,9 @@ def update_rows_event(self, event=None): """ self.writer.update(event) + def exit_gracefully(self, sig, frame): + self.reader.close() + if __name__ == '__main__': print("pumper") diff --git a/clickhouse_mysql/reader/mysqlreader.py b/clickhouse_mysql/reader/mysqlreader.py index 659ab77..5cb6c5c 100644 --- a/clickhouse_mysql/reader/mysqlreader.py +++ b/clickhouse_mysql/reader/mysqlreader.py @@ -12,7 +12,6 @@ from clickhouse_mysql.event.event import Event from clickhouse_mysql.tableprocessor import TableProcessor from clickhouse_mysql.util import Util -from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent class MySQLReader(Reader): @@ -29,9 +28,10 @@ class MySQLReader(Reader): resume_stream = None binlog_stream = None nice_pause = 0 + exit_gracefully = False write_rows_event_num = 0 - write_rows_event_each_row_num = 0; + write_rows_event_each_row_num = 0 binlog_position_file = None @@ -316,13 +316,13 @@ def process_update_rows_event(self, mysql_event): return # statistics - #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) if self.subscribers('UpdateRowsEvent'): # dispatch event to subscribers # statistics - #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + # self.stat_write_rows_event_all_rows(mysql_event=mysql_event) # dispatch Event event = Event() @@ -330,7 +330,7 @@ def process_update_rows_event(self, mysql_event): event.table = mysql_event.table event.pymysqlreplication_event = mysql_event - #self.process_first_event(event=event) + self.process_first_event(event=event) self.notify('UpdateRowsEvent', event=event) # self.stat_write_rows_event_finalyse() @@ -355,13 +355,13 @@ def process_delete_rows_event(self, mysql_event): return # statistics - #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) + # self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows)) if self.subscribers('DeleteRowsEvent'): # dispatch event to subscribers # statistics - #self.stat_write_rows_event_all_rows(mysql_event=mysql_event) + # self.stat_write_rows_event_all_rows(mysql_event=mysql_event) # dispatch Event event = Event() @@ -389,7 +389,7 @@ def read(self): # fetch events try: - while True: + while not self.exit_gracefully: logging.debug('Check events in binlog stream') self.init_fetch_loop() @@ -403,8 +403,9 @@ def read(self): # fetch available events from MySQL for mysql_event in self.binlog_stream: - # new event has come - # check what to do with it + + if self.exit_gracefully: + break logging.debug( 'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos)) @@ -420,23 +421,19 @@ def read(self): # skip other unhandled events pass - # after event processed, we need to handle current binlog position - self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos) + # after event processed, we need to handle current binlog position + self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos) - except KeyboardInterrupt: - # pass SIGINT further - logging.info("SIGINT received. Pass it further.") - raise except Exception as ex: if self.blocking: # we'd like to continue waiting for data # report and continue cycle logging.warning("Got an exception, skip it in blocking mode") - logging.warning(ex) + logging.exception(ex) else: # do not continue, report error and exit logging.critical("Got an exception, abort it in non-blocking mode") - logging.critical(ex) + logging.exception(ex) sys.exit(1) # all events fetched (or none of them available) @@ -453,18 +450,16 @@ def read(self): time.sleep(self.nice_pause) self.notify('ReaderIdleEvent') - - except KeyboardInterrupt: - logging.info("SIGINT received. Time to exit.") except Exception as ex: logging.warning("Got an exception, handle it") - logging.warning(ex) + logging.exception(ex) try: self.binlog_stream.close() + logging.info("Stop reading from MySQL") except Exception as ex: logging.warning("Unable to close binlog stream correctly") - logging.warning(ex) + logging.exception(ex) end_timestamp = int(time.time()) @@ -472,6 +467,11 @@ def read(self): logging.info('end %d', end_timestamp) logging.info('len %d', end_timestamp - self.start_timestamp) + def close(self): + self.exit_gracefully = True + self.nice_pause = 0 + logging.info("MySQL should stop in the next loop") + if __name__ == '__main__': connection_settings = { diff --git a/clickhouse_mysql/reader/reader.py b/clickhouse_mysql/reader/reader.py index c4f5246..107d04a 100644 --- a/clickhouse_mysql/reader/reader.py +++ b/clickhouse_mysql/reader/reader.py @@ -33,3 +33,6 @@ def __init__(self, converter=None, callbacks={}): def read(self): pass + + def close(self): + pass diff --git a/clickhouse_mysql/writer/chcsvwriter.py b/clickhouse_mysql/writer/chcsvwriter.py deleted file mode 100644 index 88571c3..0000000 --- a/clickhouse_mysql/writer/chcsvwriter.py +++ /dev/null @@ -1,222 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import os -import logging -import shlex - -from clickhouse_mysql.writer.writer import Writer -from clickhouse_mysql.tableprocessor import TableProcessor - - -class CHCSVWriter(Writer): - """Write into ClickHouse via CSV file and clickhouse-client tool""" - - dst_schema = None - dst_table = None - dst_distribute = None - - host = None - port = None - user = None - password = None - - def __init__( - self, - connection_settings, - dst_schema=None, - dst_table=None, - dst_table_prefix=None, - dst_distribute=False, - ): - if dst_distribute and dst_schema is not None: - dst_schema += "_all" - if dst_distribute and dst_table is not None: - dst_table += "_all" - logging.info( - "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, - dst_table)) - self.host = connection_settings['host'] - self.port = connection_settings['port'] - self.user = connection_settings['user'] - self.password = connection_settings['password'] - self.dst_schema = dst_schema - self.dst_table = dst_table - self.dst_table_prefix = dst_table_prefix - self.dst_distribute = dst_distribute - - def insert(self, event_or_events=None): - # event_or_events = [ - # event: { - # row: {'id': 3, 'a': 3} - # }, - # event: { - # row: {'id': 3, 'a': 3} - # }, - # ] - - events = self.listify(event_or_events) - if len(events) < 1: - logging.warning('No events to insert. class: %s', __class__) - return - - # assume we have at least one Event - - logging.debug('class:%s insert %d rows', __class__, len(events)) - - for event in events: - schema = self.dst_schema if self.dst_schema else event.schema - table = None - if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) - else: - table = self.dst_table if self.dst_table else event.table - if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) - - sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( - schema, - table, - ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - ) - - choptions = "" - if self.host: - choptions += " --host=" + shlex.quote(self.host) - if self.port: - choptions += " --port=" + str(self.port) - if self.user: - choptions += " --user=" + shlex.quote(self.user) - if self.password: - choptions += " --password=" + shlex.quote(self.password) - bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( - event.filename, - choptions, - sql, - ) - - logging.info('starting clickhouse-client process') - logging.debug('starting %s', bash) - os.system(bash) - - pass - - def deleteRow(self, event_or_events=None): - # event_or_events = [ - # event: { - # row: {'id': 3, 'a': 3} - # }, - # event: { - # row: {'id': 3, 'a': 3} - # }, - # ] - - events = self.listify(event_or_events) - if len(events) < 1: - logging.warning('No events to delete. class: %s', __class__) - return - - # assume we have at least one Event - - logging.debug('class:%s delete %d rows', __class__, len(events)) - - for event in events: - schema = self.dst_schema if self.dst_schema else event.schema - table = None - if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) - else: - table = self.dst_table if self.dst_table else event.table - if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) - - sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( - schema, - table, - ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - ) - - choptions = "" - if self.host: - choptions += " --host=" + shlex.quote(self.host) - if self.port: - choptions += " --port=" + str(self.port) - if self.user: - choptions += " --user=" + shlex.quote(self.user) - if self.password: - choptions += " --password=" + shlex.quote(self.password) - bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( - event.filename, - choptions, - sql, - ) - - logging.info('starting clickhouse-client process for delete operation') - logging.debug('starting %s', bash) - os.system(bash) - - pass - - def update(self, event_or_events=None): - # event_or_events = [ - # event: { - # row: {'id': 3, 'a': 3} - # }, - # event: { - # row: {'id': 3, 'a': 3} - # }, - # ] - - logging.info('starting clickhouse-client process for update operation') - - events = self.listify(event_or_events) - if len(events) < 1: - logging.warning('No events to update. class: %s', __class__) - return - - # assume we have at least one Event - - logging.debug('class:%s update %d rows', __class__, len(events)) - - for event in events: - schema = self.dst_schema if self.dst_schema else event.schema - table = None - if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) - else: - table = self.dst_table if self.dst_table else event.table - if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) - - sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( - schema, - table, - ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - ) - - sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {3}'.format( - schema, - table, - ', '.join(map(lambda column, value: '`%s`=`%s' % column, event.fieldnames, event.fieldnames)) - ) - - choptions = "" - if self.host: - choptions += " --host=" + shlex.quote(self.host) - if self.port: - choptions += " --port=" + str(self.port) - if self.user: - choptions += " --user=" + shlex.quote(self.user) - if self.password: - choptions += " --password=" + shlex.quote(self.password) - bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( - event.filename, - choptions, - sql, - ) - - logging.info('starting clickhouse-client process') - logging.debug('starting %s', bash) - os.system(bash) - - pass diff --git a/clickhouse_mysql/writer/chwriter.py b/clickhouse_mysql/writer/chwriter.py index 6cca8ef..c43ec42 100644 --- a/clickhouse_mysql/writer/chwriter.py +++ b/clickhouse_mysql/writer/chwriter.py @@ -68,15 +68,25 @@ def insert(self, event_or_events=None): event_converted = None for event in events: if not event.verify: - logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) - continue # for event + logging.warning( + 'Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) + continue # for event event_converted = self.convert(event) for row in event_converted: + # These columns are added to identify the last change (tb_upd) and the kind of operation performed + # 0 - INSERT, 1 - UPDATE, 2 - DELETE + row['tb_upd'] = datetime.datetime.now() + row['operation'] = 0 + for key in row.keys(): - # we need to convert Decimal value to str value for suitable for table structure - if type(row[key]) == Decimal: + # we need to convert Decimal or timedelta value to str value for suitable for table structure + if type(row[key]) == [Decimal, datetime.timedelta]: row[key] = str(row[key]) + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + # row['tb_upd'] = datetime.datetime.now() + # row['operation'] = 0 rows.append(row) logging.debug('class:%s insert %d row(s)', __class__, len(rows)) @@ -86,13 +96,16 @@ def insert(self, event_or_events=None): schema = self.dst_schema if self.dst_schema else event_converted.schema table = None if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + table = TableProcessor.create_distributed_table_name( + db=event_converted.schema, table=event_converted.table) else: table = self.dst_table if self.dst_table else event_converted.table if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + table = TableProcessor.create_migrated_table_name( + prefix=self.dst_table_prefix, table=table) - logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, self.dst_table)) + logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format( + schema, table, self.dst_schema, self.dst_table)) # and INSERT converted rows @@ -103,6 +116,7 @@ def insert(self, event_or_events=None): table, ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) + logging.debug(f"CHWRITER QUERY INSERT: {sql}") self.client.execute(sql, rows) except Exception as ex: logging.critical('QUERY FAILED') @@ -138,7 +152,6 @@ def delete_row(self, event_or_events): rows = [] event_converted = None - pk = None for event in events: if not event.verify: logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), @@ -146,12 +159,20 @@ def delete_row(self, event_or_events): continue # for event event_converted = self.convert(event) - pk = event_converted.pymysqlreplication_event.primary_key for row in event_converted: + # These columns are added to identify the last change (tb_upd) and the kind of operation performed + # 0 - INSERT, 1 - UPDATE, 2 - DELETE + row['tb_upd'] = datetime.datetime.now() + row['operation'] = 2 + for key in row.keys(): - # we need to convert Decimal value to str value for suitable for table structure - if type(row[key]) == Decimal: + # we need to convert Decimal or timedelta value to str value for suitable for table structure + if type(row[key]) in [Decimal, datetime.timedelta]: row[key] = str(row[key]) + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + # row['tb_upd'] = datetime.datetime.now() + # row['operation'] = 2 rows.append(row) logging.debug('class:%s delete %d row(s)', __class__, len(rows)) @@ -161,37 +182,45 @@ def delete_row(self, event_or_events): schema = self.dst_schema if self.dst_schema else event_converted.schema table = None if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + table = TableProcessor.create_distributed_table_name( + db=event_converted.schema, table=event_converted.table) else: table = self.dst_table if self.dst_table else event_converted.table if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + table = TableProcessor.create_migrated_table_name( + prefix=self.dst_table_prefix, table=table) logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, self.dst_table)) # and DELETE converted rows - sql = '' - # try: - # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( - # schema, - # table, - # ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), - # ) - # self.client.execute(sql, rows) + # These columns are added to identify the last change (tb_upd) and the kind of operation performed + # 0 - INSERT, 1 - UPDATE, 2 - DELETE + rows[0]['tb_upd'] = datetime.datetime.now() + rows[0]['operation'] = 2 sql = '' try: - sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2}'.format( + sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format( schema, table, - ' and '.join(filter(None, map( - lambda column, value: "" if column != pk else self.get_data_format(column, value), - row.keys(), row.values()))) + ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) + logging.debug(f"CHWRITER QUERY DELETE: {sql}") + self.client.execute(sql, rows) - self.client.execute(sql) + # sql = '' + # try: + # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2}'.format( + # schema, + # table, + # ' and '.join(filter(None, map( + # lambda column, value: "" if column != pk else self.get_data_format(column, value), + # row.keys(), row.values()))) + # ) + # + # self.client.execute(sql) except Exception as ex: logging.critical('QUERY FAILED') @@ -204,6 +233,7 @@ def delete_row(self, event_or_events): """ Get string format pattern for update and delete operations """ + def get_data_format(self, column, value): t = type(value) if t == str: @@ -245,7 +275,6 @@ def update(self, event_or_events): rows = [] event_converted = None - pk = None for event in events: if not event.verify: logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), @@ -253,18 +282,18 @@ def update(self, event_or_events): continue # for event event_converted = self.convert(event) - pk = [event_converted.pymysqlreplication_event.primary_key] - if event_converted.table == 'assets': - pk.append('name') - pk.append('title_id') - pk.append('company_id') - pk.append('asset_type_enumeration_entry_id') for row in event_converted.pymysqlreplication_event.rows: + for key in row['after_values'].keys(): - # we need to convert Decimal value to str value for suitable for table structure - if type(row['after_values'][key]) == Decimal: - row['after_values'][key] = str(row['after_values'][key]) - rows.append(row) + # we need to convert Decimal or timedelta value to str value for suitable for table structure + if type(row['after_values'][key]) in [Decimal, datetime.timedelta]: + row['after_values'][key] = str( + row['after_values'][key]) + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + row['after_values']['tb_upd'] = datetime.datetime.now() + row['after_values']['operation'] = 1 + rows.append(row['after_values']) logging.debug('class:%s update %d row(s)', __class__, len(rows)) @@ -273,55 +302,42 @@ def update(self, event_or_events): schema = self.dst_schema if self.dst_schema else event_converted.schema table = None if self.dst_distribute: - table = TableProcessor.create_distributed_table_name(db=event_converted.schema, table=event_converted.table) + table = TableProcessor.create_distributed_table_name( + db=event_converted.schema, table=event_converted.table) else: table = self.dst_table if self.dst_table else event_converted.table if self.dst_schema: - table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + table = TableProcessor.create_migrated_table_name( + prefix=self.dst_table_prefix, table=table) logging.debug("schema={} table={} self.dst_schema={} self.dst_table={}".format(schema, table, self.dst_schema, self.dst_table)) # and UPDATE converted rows - # improve performance updating just those fields which have actually changed - updated_values = dict(set(row['after_values'].items()).difference(set(row['before_values'].items()))) - + + # These columns are added to identify the last change (tb_upd) and when a row is deleted (1) + rows[0]['tb_upd'] = datetime.datetime.now() + rows[0]['operation'] = 1 + sql = '' try: - # sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( - # schema, - # table, - # ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), row['after_values'].keys(), row['after_values'].values()))), - # "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - # ' and '.join(filter(None, map( - # lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), - # row['before_values'].keys(), row['before_values'].values()))) - # ) - - sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {2}, `tb_upd`={3} where {4}'.format( + sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format( schema, table, - ', '.join(filter(None, map(lambda column, value: "" if column in pk or value is None else self.get_data_format(column, value), updated_values.keys(), updated_values.values()))), - "'%s'" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - ' and '.join(filter(None, map( - lambda column, value: "" if column not in pk or value is None else self.get_data_format(column, value), - row['before_values'].keys(), row['before_values'].values()))) + ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) - - logging.debug("SQL UPDATE: \n\n " + sql + "\n\n") - - self.client.execute(sql) + logging.debug(f"CHWRITER QUERY UPDATE: {sql}") + self.client.execute(sql, rows) except Exception as ex: logging.critical('QUERY FAILED') logging.critical('ex={}'.format(ex)) logging.critical('sql={}'.format(sql)) + logging.critical('data={}'.format(rows)) # sys.exit(0) # all DONE - - if __name__ == '__main__': connection_settings = { 'host': '192.168.74.230', diff --git a/clickhouse_mysql/writer/csvwriter.py b/clickhouse_mysql/writer/csvwriter.py index 18cfda6..b9cf762 100644 --- a/clickhouse_mysql/writer/csvwriter.py +++ b/clickhouse_mysql/writer/csvwriter.py @@ -11,6 +11,10 @@ from clickhouse_mysql.writer.writer import Writer from clickhouse_mysql.event.event import Event +import datetime + +from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent + class CSVWriter(Writer): """Write CSV files""" @@ -89,6 +93,7 @@ def open(self): # open file for write-at-the-end mode self.file = open(self.path, 'a+') + def insert(self, event_or_events): # event_or_events = [ # event: { @@ -118,13 +123,20 @@ def insert(self, event_or_events): logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__) return - self.fieldnames = sorted(self.convert(copy.copy(event.first_row())).keys()) + event_converted = self.convert(event) + rows = event_converted.pymysqlreplication_event.rows + headers = list(rows[0]['values'].keys()) + headers.append('operation') + headers.append('tb_upd') + + # self.fieldnames = sorted(self.convert(copy.copy(event.first_row())).keys()) + self.fieldnames = headers if self.dst_schema is None: self.dst_schema = event.schema if self.dst_table is None: self.dst_table = event.table - self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames) + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) if not self.header_written: self.writer.writeheader() @@ -132,21 +144,148 @@ def insert(self, event_or_events): if not event.verify: logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) continue # for event - for row in event: - self.writer.writerow(self.convert(row)) + self.generate_row(event) + + def delete_row(self, event_or_events): + + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] - def deleteRow(self, event_or_events): - """ - TODO - """ logging.debug("Delete CSV Writer") + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to delete. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s delete %d events', __class__, len(events)) + + if not self.opened(): + self.open() + + if not self.writer: + # pick any event from the list + event = events[0] + if not event.verify: + logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__) + return + + event_converted = self.convert(event) + rows = event_converted.pymysqlreplication_event.rows + headers = list(rows[0]['values'].keys()) + headers.append('operation') + headers.append('tb_upd') + + self.fieldnames = headers + if self.dst_schema is None: + self.dst_schema = event.schema + if self.dst_table is None: + self.dst_table = event.table + + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) + if not self.header_written: + self.writer.writeheader() + + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) + continue # for event + self.generate_row(event) + + + def update(self, event_or_events): - """ - TODO - """ + + # event_or_events = [ + # event: { + # row: { + # 'before_values': {'id': 3, 'a': 3}, + # 'after_values': {'id': 3, 'a': 2} + # } + # }, + # event: { + # row: { + # 'before_values': {'id': 2, 'a': 3}, + # 'after_values': {'id': 2, 'a': 2} + # } + # }, + # ] + logging.debug("Update CSV Writer") + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to update. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s updated %d events', __class__, len(events)) + + if not self.opened(): + self.open() + + if not self.writer: + # pick any event from the list + event = events[0] + if not event.verify: + logging.warning('Event verification failed. Skip insert(). Event: %s Class: %s', event.meta(), __class__) + return + + event_converted = self.convert(event) + rows = event_converted.pymysqlreplication_event.rows + headers = list(rows[0]['after_values'].keys()) + headers.append('operation') + headers.append('tb_upd') + + # self.fieldnames = sorted(headers) + self.fieldnames = headers + if self.dst_schema is None: + self.dst_schema = event.schema + if self.dst_table is None: + self.dst_table = event.table + + self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames, quoting=csv.QUOTE_ALL) + if not self.header_written: + self.writer.writeheader() + + for event in events: + if not event.verify: + logging.warning('Event verification failed. Skip one event. Event: %s Class: %s', event.meta(), __class__) + continue # for event + + event_converted = self.convert(event) + self.generate_row(event_converted) + + + def generate_row(self, event): + """ When using mempool or csvpool events are cached so you can receive different kind of events in the same list. These events should be handled in a different way """ + + if isinstance(event.pymysqlreplication_event, WriteRowsEvent): + for row in event: + row['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + row['operation'] = 0 + self.writer.writerow(self.convert(row)) + elif isinstance(event.pymysqlreplication_event, DeleteRowsEvent): + for row in event: + row['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + row['operation'] = 2 + self.writer.writerow(self.convert(row)) + elif isinstance(event.pymysqlreplication_event, UpdateRowsEvent): + for row in event.pymysqlreplication_event.rows: + row['after_values']['tb_upd'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + row['after_values']['operation'] = 1 + self.writer.writerow(self.convert(row['after_values'])) + + def push(self): if not self.next_writer_builder or not self.fieldnames: return diff --git a/clickhouse_mysql/writer/poolwriter.py b/clickhouse_mysql/writer/poolwriter.py index 129f05a..303ed84 100644 --- a/clickhouse_mysql/writer/poolwriter.py +++ b/clickhouse_mysql/writer/poolwriter.py @@ -37,21 +37,26 @@ def insert(self, event_or_events): logging.debug('class:%s insert', __class__) self.pool.insert(event_or_events) - + # TODO delete if delete_row works def delete(self, event_or_events): """Insert delete data into Pool""" logging.debug('class:%s delete', __class__) self.pool.insert(event_or_events) + def delete_row(self, event_or_events): + """Insert delete data into Pool""" + logging.debug('class:%s delete', __class__) + self.pool.insert(event_or_events) + def update(self, event_or_events): """Insert update data into Pool""" logging.debug('class:%s update', __class__) self.pool.insert(event_or_events) - def flush(self): self.pool.flush() + if __name__ == '__main__': path = 'file.csv' diff --git a/clickhouse_mysql/writer/processwriter.py b/clickhouse_mysql/writer/processwriter.py index 8177345..b3584f2 100644 --- a/clickhouse_mysql/writer/processwriter.py +++ b/clickhouse_mysql/writer/processwriter.py @@ -40,22 +40,22 @@ def processDelete(self, event_or_events=None): logging.debug('class:%s process()', __class__) writer = self.next_writer_builder.get() - writer.deleteRow(event_or_events) + writer.delete_row(event_or_events) writer.close() writer.push() writer.destroy() - logging.debug('class:%s process() done', __class__) + logging.debug('class:%s processDelete() done', __class__) def processUpdate(self, event_or_events=None): """Separate process body to be run""" logging.debug('class:%s process()', __class__) writer = self.next_writer_builder.get() - writer.delete(event_or_events) + writer.update(event_or_events) writer.close() writer.push() writer.destroy() - logging.debug('class:%s process() done', __class__) + logging.debug('class:%s processUpdate() done', __class__) def insert(self, event_or_events=None): # event_or_events = [ diff --git a/clickhouse_mysql/writer/tbcsvwriter.py b/clickhouse_mysql/writer/tbcsvwriter.py new file mode 100644 index 0000000..9684f25 --- /dev/null +++ b/clickhouse_mysql/writer/tbcsvwriter.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import time + +from clickhouse_mysql.writer.writer import Writer + +import requests +from requests_toolbelt.multipart.encoder import MultipartEncoder +import json + + +class TBCSVWriter(Writer): + """Write into Tinybird via CSV file""" + + dst_schema = None + dst_table = None + dst_distribute = None + + tb_host = None + tb_token = None + + def __init__( + self, + tb_host, + tb_token, + dst_schema=None, + dst_table=None, + dst_table_prefix=None, + dst_distribute=False, + ): + # if dst_distribute and dst_schema is not None: + # dst_schema += "_all" + # if dst_distribute and dst_table is not None: + # dst_table += "_all" + # logging.info( + # "CHCSWriter() connection_settings={} dst_schema={} dst_table={}".format(connection_settings, dst_schema, + # dst_table)) + self.tb_host = tb_host + self.tb_token = tb_token + + if self.tb_host is None or self.tb_token is None: + logging.critical( + f" Host: {self.tb_host} or token {self.tb_token} is missing") + return None + + self.dst_schema = dst_schema + self.dst_table = dst_table + self.dst_table_prefix = dst_table_prefix + self.dst_distribute = dst_distribute + + def uploadCSV(self, table, filename, tries=1): + limit_of_retries = 3 + params = { + 'name': table, + 'mode': 'append' + } + + try: + with open(filename, 'rb') as f: + m = MultipartEncoder(fields={'csv': ('csv', f, 'text/csv')}) + url = f"{self.tb_host}/v0/datasources" + + response = requests.post( + url, + data=m, + headers={ + 'Authorization': 'Bearer ' + self.tb_token, + 'Content-Type': m.content_type + }, + params=params, + verify=False) + + # logging.debug(response.text) + logging.info(response.json()) + if response.status_code == 200: + json_object = json.loads(response.content) + logging.debug(f"Import id: {json_object['import_id']}") + elif response.status_code == 429: + retry_after = int(response.headers['Retry-After']) + tries + logging.error( + f"Too many requests retrying in {retry_after} seconds to upload {filename } to {table}") + time.sleep(retry_after) + self.uploadCSV(table, filename, tries + 1) + else: + # In case of error let's retry only + logging.exception(response.json()) + time.sleep(tries) + logging.info(f"Retrying { tries } of { limit_of_retries }") + if tries > limit_of_retries: + return + self.uploadCSV(table, filename, tries + 1) + except Exception as e: + logging.exception(e) + # We wait tries^2 sec to try again + time.sleep(tries * tries) + logging.info(f"Retrying { tries } of { limit_of_retries }") + if tries > limit_of_retries: + return + self.uploadCSV(table, filename, tries + 1) + + def insert(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + events = self.listify(event_or_events) + if len(events) < 1: + logging.warning('No events to insert. class: %s', __class__) + return + + # assume we have at least one Event + + logging.debug('class:%s insert %d rows', __class__, len(events)) + + for event in events: + #schema = self.dst_schema if self.dst_schema else event.schema + table = self.dst_table if self.dst_table else event.table + self.uploadCSV(table, event.filename) + + pass + + def deleteRow(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # events = self.listify(event_or_events) + # if len(events) < 1: + # logging.warning('No events to delete. class: %s', __class__) + # return + + # # assume we have at least one Event + + # logging.debug('class:%s delete %d rows', __class__, len(events)) + + # for event in events: + # schema = self.dst_schema if self.dst_schema else event.schema + # table = None + # if self.dst_distribute: + # table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + # else: + # table = self.dst_table if self.dst_table else event.table + # if self.dst_schema: + # table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + # sql = 'ALTER TABLE `{0}`.`{1}` DELETE WHERE {2} = {3} '.format( + # schema, + # table, + # ' AND '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + # ) + + # choptions = "" + # if self.host: + # choptions += " --host=" + shlex.quote(self.host) + # if self.port: + # choptions += " --port=" + str(self.port) + # if self.user: + # choptions += " --user=" + shlex.quote(self.user) + # if self.password: + # choptions += " --password=" + shlex.quote(self.password) + # bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + # event.filename, + # choptions, + # sql, + # ) + + # logging.info('starting clickhouse-client process for delete operation') + # logging.debug('starting %s', bash) + # os.system(bash) + + logging.debug("CHCSVWriter: delete row") + pass + + def update(self, event_or_events=None): + # event_or_events = [ + # event: { + # row: {'id': 3, 'a': 3} + # }, + # event: { + # row: {'id': 3, 'a': 3} + # }, + # ] + + # logging.info('starting clickhouse-client process for update operation') + + # events = self.listify(event_or_events) + # if len(events) < 1: + # logging.warning('No events to update. class: %s', __class__) + # return + + # # assume we have at least one Event + + # logging.debug('class:%s update %d rows', __class__, len(events)) + + # for event in events: + # schema = self.dst_schema if self.dst_schema else event.schema + # table = None + # if self.dst_distribute: + # table = TableProcessor.create_distributed_table_name(db=event.schema, table=event.table) + # else: + # table = self.dst_table if self.dst_table else event.table + # if self.dst_schema: + # table = TableProcessor.create_migrated_table_name(prefix=self.dst_table_prefix, table=table) + + # sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( + # schema, + # table, + # ', '.join(map(lambda column: '`%s`' % column, event.fieldnames)), + # ) + + # sql = 'ALTER TABLE `{0}`.`{1}` UPDATE {3}'.format( + # schema, + # table, + # ', '.join(map(lambda column, value: '`%s`=`%s' % column, event.fieldnames, event.fieldnames)) + # ) + + # choptions = "" + # if self.host: + # choptions += " --host=" + shlex.quote(self.host) + # if self.port: + # choptions += " --port=" + str(self.port) + # if self.user: + # choptions += " --user=" + shlex.quote(self.user) + # if self.password: + # choptions += " --password=" + shlex.quote(self.password) + # bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( + # event.filename, + # choptions, + # sql, + # ) + + # logging.info('starting clickhouse-client process') + # logging.debug('starting %s', bash) + # os.system(bash) + + logging.debug("CHCSVWriter: delete row") + + pass diff --git a/init/dump-tables.sh b/init/dump-tables.sh deleted file mode 100755 index 25eb02b..0000000 --- a/init/dump-tables.sh +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/bash - -if [ "$#" -ne 1 ]; then - echo "Usage: $0 " - exit -1 -fi - -DUMP_PATH=$1 - -source tb_tables.config - -########### -### titles -########### - -echo "Dumping titles" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction titles > $DUMP_PATH/titles.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/titles-insert-tb.sql -cat $DUMP_PATH/titles.sql | grep "INSERT INTO" >> $DUMP_PATH/titles-insert-tb.sql -sed -i 's/INSERT INTO `titles` VALUES/INSERT INTO `t_8a192b9c7ece4572a5a2fc9858e26d5c` (`id`, `name`, `licensor_id`, `created_at`, `updated_at`, `company_id`, `series_id`, `external_id`, `poster_file_name`, `poster_content_type`, `poster_file_size`, `poster_updated_at`, `episode_number`, `dirty_episode_number`, `rights_count`, `blackouts_count`, `denied_rights_count`, `images_count`, `cover_image_id`, `title_type`, `metadata_updated_at`, `promoted_content_id`, `promoted_content_type`, `soft_destroyed`, `credits_count`, `translated_attributes`, `rules_count`, `discarded`, `episode_reference_id`, `brand_id`) VALUES/g' $DUMP_PATH/titles-insert-tb.sql - -echo "Truncate titles table" -echo "truncate $TB_DATABASE.$TITLES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading titles into CH" -cat $DUMP_PATH/titles-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Titles loaded" - -read -p "Press enter to continue" - -########### -### assets -########### - -echo "Dumping assets" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction assets > $DUMP_PATH/assets.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/assets-insert-tb.sql -cat $DUMP_PATH/assets.sql | grep "INSERT INTO" >> $DUMP_PATH/assets-insert-tb.sql -sed -i 's/INSERT INTO `assets` VALUES/INSERT INTO `t_4c03fdeb4e3e4db784ead40b06ec8617` (`id`, `name`, `title_id`, `created_at`, `updated_at`, `description`, `runtime_in_milliseconds`, `metadata_updated_at`, `company_id`, `asset_type_enumeration_entry_id`, `external_id`) VALUES/g' $DUMP_PATH/assets-insert-tb.sql - -echo "Truncate assets table" -echo "truncate $TB_DATABASE.$ASSETS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading assets into CH" -cat $DUMP_PATH/assets-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Assets loaded" - -read -p "Press enter to continue" - -####################### -### Collection-entries -####################### - -echo "Dumping collection-entries" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction collection_entries > $DUMP_PATH/collections.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/collections-insert-tb.sql -cat $DUMP_PATH/collections.sql | grep "INSERT INTO" >> $DUMP_PATH/collections-insert-tb.sql -sed -i 's/INSERT INTO `collection_entries` VALUES/INSERT INTO `t_3dd7b323438943c687bd4e13a0e181a1` (`collection_id`, `title_id`, `id`, `position`) VALUES/g' $DUMP_PATH/collections-insert-tb.sql - -echo "Truncate collections table" -echo "truncate $TB_DATABASE.$COLLECTIONS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading collection-entries into CH" -cat $DUMP_PATH/collections-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Collection-entries loaded" - -read -p "Press enter to continue" - -############## -### Features -############## - -echo "Dumping features" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction features > $DUMP_PATH/features.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/features-insert-tb.sql -read -p "Press enter to continue use" -cat $DUMP_PATH/features.sql | grep "INSERT INTO" >> $DUMP_PATH/features-insert-tb.sql -read -p "Press enter to continue insert" -sed -i 's/INSERT INTO `features` VALUES/INSERT INTO `t_23f41723e0eb480088cbb1c8f890a38c` (`id`, `name`, `enabled`, `company_id`, `created_at`, `updated_at`) VALUES/g' $DUMP_PATH/features-insert-tb.sql -read -p "Press enter to continue sed" -echo "Truncate features table" -echo "truncate $TB_DATABASE.$FEATURES_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading features into CH" -cat $DUMP_PATH/features-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Features loaded" - -read -p "Press enter to continue" - -############## -### Platforms -############## - -echo "Dumping platforms" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction platforms > $DUMP_PATH/platforms.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/platforms-insert-tb.sql -cat $DUMP_PATH/platforms.sql | grep "INSERT INTO" >> $DUMP_PATH/platforms-insert-tb.sql -sed -i 's/INSERT INTO `platforms` VALUES/INSERT INTO `t_83f598dc74254de68216a7c7735caffb` (`id`, `company_id`, `name`, `created_at`, `updated_at`, `sequence_service_titles_url`, `_deprecated_sequence_template_name`, `_deprecated_owned`, `sequence_template_url`, `metadata_constant_name`, `outlet_id`, `automatic_publication_enabled`, `metadata_updated_at`, `granted_categories`, `external_id`, `timezone`) VALUES/g' $DUMP_PATH/platforms-insert-tb.sql - -echo "Truncate platforms table" -echo "truncate $TB_DATABASE.$PLATFORMS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading platforms into CH" -cat $DUMP_PATH/platforms-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Platforms loaded" - -read -p "Press enter to continue" - -################# -### Schedulings -################# - -echo "Dumping schedulings" -mysqldump --host=127.0.0.1 --port=3307 --user=tinybird --password=goo7eu9AeS3i --single-transaction --quick movida_preproduction schedulings > $DUMP_PATH/schedulings.sql - -echo "use $TB_DATABASE;" > $DUMP_PATH/schedulings-insert-tb.sql -cat $DUMP_PATH/schedulings.sql | grep "INSERT INTO" >> $DUMP_PATH/schedulings-insert-tb.sql -sed -i 's/INSERT INTO `schedulings` VALUES/INSERT INTO `t_b5e541d4e73d4301ba736c427bd667c5` (`id`, `title_id`, `put_up`, `take_down`, `created_at`, `updated_at`, `cleared`, `platform_id`, `rule_id`, `workflow_offset`, `sequence_asset_url`, `sequence_asset_name`, `workflow_sent`, `status`, `asset_id`, `rule_asset_id`, `title_group_id`, `workflow_web_url`, `_deprecated_publication_status`, `published_at`, `_prev_put_up`, `_prev_take_down`, `_pending_simulation`, `workflow_template_url`, `original_draft_scheduling_id`, `playlist_id`, `updating_playlist`, `workflow_job_url`, `workflow_status`, `conflict_types`, `metadata_updated_at`, `company_id`, `cached_title_episode_number`, `metadata_status`, `publication_status`, `publication_status_updated_at`, `metadata_status_updated_at`, `external_id`, `disabled_at`, `scheduling_type`, `overridden_rule_attributes`, `update_in_progress`, `metadata_error_digest`) VALUES/g' $DUMP_PATH/schedulings-insert-tb.sql - -echo "Truncate schedulings table" -echo "truncate $TB_DATABASE.$SCHEDULINGS_TABLE" | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn - -echo "Loading schedulings into CH" -cat $DUMP_PATH/schedulings-insert-tb.sql | ~/tinybird/bin/ch/ch-20.7.2.30/ClickHouse/build/programs/clickhouse-client -mn -echo "Schedulings loaded" - -echo "Process finished!" \ No newline at end of file diff --git a/init/first-processing.sh b/init/first-processing.sh deleted file mode 100755 index 7daa44c..0000000 --- a/init/first-processing.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -if [ "$#" -ne 1 ]; then - echo "Usage: $0 " - exit -1 -fi - -echo "Generate binlog timelog" -./run-listener.sh -./stop-listeners.sh - -echo "Generating dumps and loading data ..." -./dump-tables.sh $1 - -echo "Starting listeners" -./run-listener.sh - -echo "Done!" \ No newline at end of file diff --git a/init/run-listeners.sh b/init/run-listeners.sh deleted file mode 100755 index 21a7502..0000000 --- a/init/run-listeners.sh +++ /dev/null @@ -1,130 +0,0 @@ -#!/bin/bash - -LOG_LEVEL=debug - -SOURCE_HOST=127.0.0.1 -SOURCE_PORT=3307 -DESTINATION_HOST=127.0.0.1 -SOURCE_USER=tinybird -SOURCE_PASSWD=goo7eu9AeS3i - -PID_LOG_FILE=/tmp/listeners-pid.log - -source tb_tables.config - -############################################################ -# Run a process to synchronize MySQL table using binlog. -# -# $1 --> Source schema -# $2 --> Source table -# $3 --> Destination schema -# $4 --> Destination table -# $5 --> Server id -# $6 --> Log file -# $7 --> Binlog position file -# -############################################################# -function run_listener() { - - (clickhouse-mysql --src-server-id=$5 --src-wait --src-resume --binlog-position-file $7 --nice-pause=1 --src-host=$SOURCE_HOST --src-port=$SOURCE_PORT --src-user=$SOURCE_USER --src-password=$SOURCE_PASSWD --src-schemas=$1 --src-tables=$2 --dst-host=$DESTINATION_HOST --dst-schema=$3 --dst-table=$4 --log-level=$LOG_LEVEL --pump-data 2>> $6)& - -} - -function run_schedulings() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "schedulings" "$TB_DATABASE" "$SCHEDULINGS_TABLE" "91" "out-schedulings.log" "bl-pos-schedulings" - echo $! > $PID_LOG_FILE - -} - -function run_platforms() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "platforms" "$TB_DATABASE" "$PLATFORMS_TABLE" "92" "out-platforms.log" "bl-pos-platforms" - echo $! >> $PID_LOG_FILE - -} - -function run_titles() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "titles" "$TB_DATABASE" "$TITLES_TABLE" "93" "out-titles.log" "bl-pos-titles" - echo $! >> $PID_LOG_FILE -} - -function run_assets() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "assets" "$TB_DATABASE" "$ASSETS_TABLE" "94" "out-assets.log" "bl-pos-assets" - echo $! >> $PID_LOG_FILE -} - -function run_features() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "features" "$TB_DATABASE" "$FEATURES_TABLE" "95" "out-features.log" "bl-pos-features" - echo $! >> $PID_LOG_FILE -} - -function run_collections() { - if [ $binlog == "true" ]; then - rm "bl-pos-collections" - fi - - run_listener "movida_preproduction" "collection_entries" "$TB_DATABASE" "$COLLECTIONS_TABLE" "96" "out-collections.log" "bl-pos-collections" - echo $! >> $PID_LOG_FILE -} - -function usage { - echo "usage: $0 -d datasource [-b clean_binlog]" - echo " -d datasource datasource to syn. Use all for synchronizing all available datasources." - echo " - all" - echo " - schedulings" - echo " - platforms" - echo " - titles" - echo " - assets" - echo " - features" - echo " - collections" - echo " -b clean_binlog clean binlog before running (true | false) False by default" - exit -1 -} - -datasource="NONE" -while getopts d:b: flag -do - case "${flag}" in - d) datasource=${OPTARG};; - b) binlog=${OPTARG};; - esac -done - -case "${datasource}" in - NONE) usage;; - all) run_schedulings binlog - run_platforms binlog - run_titles binlog - run_assets binlog - run_features binlog - run_collections binlog - ;; - schedulings) run_schedulings binlog;; - platforms) run_platforms binlog;; - titles) run_titles binlog;; - assets) run_assets binlog;; - features) run_features binlog;; - collections) run_collections binlog;; - *) usage;; -esac - -echo "PID processes in $PID_LOG_FILE" \ No newline at end of file diff --git a/init/stop-listeners.sh b/init/stop-listeners.sh deleted file mode 100755 index 582e97c..0000000 --- a/init/stop-listeners.sh +++ /dev/null @@ -1,23 +0,0 @@ -#!/bin/bash - -PID_LOG_FILE=/tmp/listeners-pid.log - -count_processes() { - echo `ps aux | grep clickhouse-mysql-data-reader | wc -l` -} - -total_before=$(count_processes) - -while IFS= read -r line -do - echo "$line" - kill $line -done < "$PID_LOG_FILE" - -total_after=$(count_processes) - -procs=`echo "$total_after - 1" | bc` - -if [ $total_after -gt 1 ]; then - echo "You still have $procs processes running" -fi \ No newline at end of file diff --git a/init/tb_tables.config b/init/tb_tables.config deleted file mode 100644 index be59079..0000000 --- a/init/tb_tables.config +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -TB_DATABASE='d_073c5e' -TITLES_TABLE='t_8a192b9c7ece4572a5a2fc9858e26d5c' -ASSETS_TABLE='t_4c03fdeb4e3e4db784ead40b06ec8617' -COLLECTIONS_TABLE='t_3dd7b323438943c687bd4e13a0e181a1' -FEATURES_TABLE='t_23f41723e0eb480088cbb1c8f890a38c' -PLATFORMS_TABLE='t_83f598dc74254de68216a7c7735caffb' -SCHEDULINGS_TABLE='t_b5e541d4e73d4301ba736c427bd667c5' \ No newline at end of file diff --git a/notes.txt b/notes.txt new file mode 100644 index 0000000..20da4c9 --- /dev/null +++ b/notes.txt @@ -0,0 +1,3 @@ +# Add delete field + +awk -F"," 'BEGIN { OFS = "," } {$45="0"; print}' test.csv > test-out.csv \ No newline at end of file diff --git a/setup.py b/setup.py index f5be528..a8b39e2 100644 --- a/setup.py +++ b/setup.py @@ -79,6 +79,8 @@ 'clickhouse-driver', 'configobj', 'setuptools', + 'requests_toolbelt', + 'requests' ], # cross-platform support for pip to create the appropriate form of executable