diff --git a/clickhouse-mysql b/clickhouse-mysql index b967d39..a7432cb 100755 --- a/clickhouse-mysql +++ b/clickhouse-mysql @@ -15,7 +15,6 @@ if sys.version_info[0] < 3: raise "Must be using Python 3" - class Main(Daemon): config = None @@ -43,6 +42,11 @@ class Main(Daemon): elif self.config.is_table_templates_json(): print(json.dumps(self.config.table_builder().templates(self.config.is_table_templates_json()))) + elif self.config.is_table_migrate(): + migrator = self.config.table_migrator() + migrator.chwriter = self.config.writer() + migrator.migrate() + else: pumper = Pumper( reader=self.config.reader(), diff --git a/src/cliopts.py b/src/cliopts.py index 5811795..658792a 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -156,7 +156,11 @@ def config(): action='store_true', help='Prepare table templates as JSON.' ) - + argparser.add_argument( + '--table-migrate', + action='store_true', + help='Migrate table(s).' + ) argparser.add_argument( '--src-server-id', @@ -282,6 +286,7 @@ def config(): 'daemon': args.daemon, 'table-templates': args.table_templates, 'table-templates-json': args.table_templates_json, + 'table-migrate': args.table_migrate, 'pid_file': args.pid_file, 'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled 'mempool-max-events-num': args.mempool_max_events_num, @@ -298,13 +303,36 @@ def config(): }, }, - 'tablebuilder-config': { - 'host': args.src_host, - 'port': args.src_port, - 'user': args.src_user, - 'password': args.src_password, - 'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None, - 'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None, + 'table-builder-config': { + 'mysql': { + 'host': args.src_host, + 'port': args.src_port, + 'user': args.src_user, + 'password': args.src_password, + 'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None, + 'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None, + }, + }, + + 'table-migrator-config': { + 'mysql': { + 'host': args.src_host, + 'port': args.src_port, + 'user': args.src_user, + 'password': args.src_password, + 'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None, + 'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None, + }, + 'clickhouse': { + 'connection_settings': { + 'host': args.dst_host, + 'port': args.dst_port, + 'user': args.dst_user, + 'password': args.dst_password, + }, + 'dst_schema': args.dst_schema, + 'dst_table': args.dst_table, + }, }, 'reader-config': { diff --git a/src/config.py b/src/config.py index 2a5f595..926a883 100644 --- a/src/config.py +++ b/src/config.py @@ -14,6 +14,7 @@ from .converter.csvwriteconverter import CSVWriteConverter from .converter.chwriteconverter import CHWriteConverter from .tablebuilder import TableBuilder +from .tablemigrator import TableMigrator class Config(object): @@ -53,7 +54,13 @@ def is_table_templates_json(self): return self.config['app-config']['table-templates-json'] def table_builder(self): - return TableBuilder(**self.config['tablebuilder-config']) + return TableBuilder(**self.config['table-builder-config']['mysql']) + + def is_table_migrate(self): + return self.config['app-config']['table-migrate'] + + def table_migrator(self): + return TableMigrator(**self.config['table-builder-config']['mysql']) def reader(self): if self.config['reader-config']['file']['csv_file_path']: diff --git a/src/tablebuilder.py b/src/tablebuilder.py index f32990a..83ef55f 100644 --- a/src/tablebuilder.py +++ b/src/tablebuilder.py @@ -1,127 +1,10 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import logging -import MySQLdb +from .tableprocessor import TableProcessor -class TableBuilder(object): - - connection = None - cursor = None - - host = None - port = None - user = None - password = None - dbs = None - tables = None - - def __init__( - self, - host=None, - port=None, - user=None, - password=None, - dbs=None, - tables=None - ): - self.host = host - self.port = port - self.user = user - self.password = password - self.dbs = [] if dbs is None else dbs - self.tables = [] if tables is None else tables - - def dbs_tables_lists(self): - """ - :return: - { - 'db1' : ['table1', 'table2', 'table3'] - 'db2' : ['table1', 'table2', 'table3'] - } - """ - - if len(self.dbs) == 0: - # no dbs specified - # means we have to have - # at least 1 full table specified - - if len(self.tables) == 0: - # nothing specified - return None - - # verify that all tables have full name specified - for table in self.tables: - db, table = self.parse_full_table_name(table) - if db is None: - # short table name found - return None - - dbs = {} - for table in self.tables: - db, table = self.parse_full_table_name(table) - if db not in dbs: - dbs[db] = set() - dbs[db].add(table) - - return dbs - - elif len(self.dbs) == 1: - # one db specified - - # verify that none table specified at all - if len(self.tables) == 0: - return { - self.dbs[0]: self.tables_list(self.dbs[0]) - } - - # OR all tables have short name specification - # meaning they all belong to this table - for table in self.tables: - db, table = self.parse_full_table_name(table) - if db is not None: - # long table name found - return None - - return { - self.dbs[0]: self.tables - } - - else: - # multiple dbs specified - # verify that no tables specified - if len(self.tables) > 0: - return None - - dbs = {} - for db in self.dbs: - dbs[db] = self.tables_list(db) - - return dbs - - return None - - def tables_list(self, db): - """ - :param db: - :return: ['table1', 'table2', etc] - """ - self.connection = MySQLdb.connect( - host=self.host, - user=self.user, - passwd=self.password, - db=db, - ) - self.cursor = self.connection.cursor() - self.cursor.execute("USE " + db) - tables = [] - self.cursor.execute("SHOW TABLES") # execute 'SHOW TABLES' (but data is not returned) - for (table_name,) in self.cursor: - tables.append(table_name) - - return tables - +class TableBuilder(TableProcessor): def templates(self, json=False): """ @@ -198,13 +81,7 @@ def create_table_columns_description(self, db=None, table=None, ): columns_description = [] # issue 'DESCRIBE table' statement - self.connection = MySQLdb.connect( - host=self.host, - user=self.user, - passwd=self.password, - db=db, - ) - self.cursor = self.connection.cursor() + self.connect(db=db) self.cursor.execute("DESC {0}".format(self.create_full_table_name(db=db, table=table))) for (_field, _type, _null, _key, _default, _extra,) in self.cursor: # Field | Type | Null | Key | Default | Extra @@ -224,21 +101,12 @@ def create_table_columns_description(self, db=None, table=None, ): return columns_description - def create_full_table_name(self, db=None, table=None): - # `db`.`table` or just `table` - return '`{0}`.`{1}`'.format(db, table) if db else '`{0}`'.format(table) - - def parse_full_table_name(self, full_name): - db, dot, name = full_name.partition('.') - if not dot: - name = db - db = None - - return db if db is None else db.strip('`'), name.strip('`') - - def is_field_nullable(self, nullable): - # Deal with NULLs + """ + Chack whether `nullable` can be interpreted as True + :param nullable: bool, string + :return: bool + """ if isinstance(nullable, bool): # for bool - simple statement return nullable diff --git a/src/tablemigrator.py b/src/tablemigrator.py new file mode 100644 index 0000000..c1b830b --- /dev/null +++ b/src/tablemigrator.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import MySQLdb +from MySQLdb.cursors import SSDictCursor +from .tableprocessor import TableProcessor + + +class TableMigrator(TableProcessor): + + cursorclass = SSDictCursor + chwriter = None + + def migrate(self): + dbs = self.dbs_tables_lists() + + if dbs is None: + return None + + for db in dbs: + for table in dbs[db]: + self.migrate_table(db=db, table=table) + + def migrate_table(self, db=None, table=None, ): + self.connect(db=db) + self.cursor.execute("SELECT * FROM {0}".format(self.create_full_table_name(db=db, table=table))) + cnt = 0; + while True: + rows = self.cursor.fetchmany(10000) + if not rows: + break + self.chwriter.dst_schema = db + self.chwriter.dst_table = table + self.chwriter.insert(rows) + cnt += len(rows) + + return cnt + + +if __name__ == '__main__': + tb = TableBuilder( + host='127.0.0.1', + user='reader', + password='qwerty', + dbs=['db'], + # tables='datatypes, enum_datatypes, json_datatypes', + tables=['datatypes', 'enum_datatypes', 'json_datatypes'], + ) + templates = tb.templates() + for db in templates: + for table in templates[db]: + print(table, '=', templates[db][table]) diff --git a/src/tableprocessor.py b/src/tableprocessor.py new file mode 100644 index 0000000..bac3f12 --- /dev/null +++ b/src/tableprocessor.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import MySQLdb +from MySQLdb.cursors import Cursor + + +class TableProcessor(object): + + connection = None + cursor = None + cursorclass = Cursor + + host = None + port = None + user = None + password = None + dbs = None + tables = None + + def __init__( + self, + host=None, + port=None, + user=None, + password=None, + dbs=None, + tables=None + ): + self.host = host + self.port = port + self.user = user + self.password = password + self.dbs = [] if dbs is None else dbs + self.tables = [] if tables is None else tables + + def connect(self, db): + if self.cursor: + try: + self.cursor.close() + del self.cursor + except: + pass + + if self.connection: + del self.connection + + self.connection = MySQLdb.connect( + host=self.host, + user=self.user, + passwd=self.password, + db=db, + cursorclass=self.cursorclass, + ) + self.cursor = self.connection.cursor() + + def dbs_tables_lists(self): + """ + Prepare dict of databases and with list of tables for each db + For convenient iteration over all tables + + :return: + { + 'db1' : ['table1', 'table2', 'table3'], + 'db2' : ['table1', 'table2', 'table3'], + } + """ + + if len(self.dbs) == 0: + # no dbs specified + # means need to have at least 1 full table specified as `db`.`table` + + if len(self.tables) == 0: + # nothing specified - neither db nor table + return None + + # have something specified in tables list + + # verify that all tables have full name specified as `db`.`table` + for table in self.tables: + db, table = self.parse_full_table_name(table) + if db is None: + # short table name found - not enough - need full specification + return None + + # all tables are specified in full format as `db`.`table` + + # build result dict + dbs = {} + for table in self.tables: + db, table = self.parse_full_table_name(table) + if db not in dbs: + dbs[db] = set() + dbs[db].add(table) + + return dbs + + elif len(self.dbs) == 1: + # exactly one db specified + + if len(self.tables) == 0: + # in case none table specified - means 'all tables from this DB' + # return list of tables for this db + return { + self.dbs[0]: self.tables_list(self.dbs[0]) + } + + # multiple tables specified + + # ensure all tables have short name specification + # meaning they all belong to this one specified table + for table in self.tables: + db, table = self.parse_full_table_name(table) + if db is not None: + # long table name found + return None + + return { + self.dbs[0]: self.tables + } + + else: + # multiple dbs specified + + # verify that no tables specified + if len(self.tables) > 0: + return None + + # build result dict + dbs = {} + for db in self.dbs: + dbs[db] = self.tables_list(db) + + return dbs + + return None + + def tables_list(self, db): + """ + List tables in specified DB + + :param db: database to list tables in + :return: ['table1', 'table2', ...] + """ + self.connect(db=db) + self.cursor.execute("USE " + db) + self.cursor.execute("SHOW TABLES") + tables = [] + for (table_name,) in self.cursor: + tables.append(table_name) + + return tables + + def create_full_table_name(self, db=None, table=None): + """ + Create fully-specified table name as `db`.`table` or just `table` + :param db: + :param table: + :return: `db`.`table` or just `table` + """ + return '`{0}`.`{1}`'.format(db, table) if db else '`{0}`'.format(table) + + def parse_full_table_name(self, full_name): + """ + Extract db and table names from fully-specified table name. + Ex.: extract 'db', 'name' out of `db`.`name` + :param full_name: `db`.`name` + :return: (db, name, ) + """ + db, dot, name = full_name.partition('.') + if not dot: + name = db + db = None + + return None if db is None else db.strip('`'), name.strip('`') diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index cd307b3..0e54d4c 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -3,7 +3,9 @@ from clickhouse_driver.client import Client from .writer import Writer +from ..event.event import Event import logging +import sys class CHWriter(Writer): @@ -46,12 +48,16 @@ def insert(self, event_or_events=None): event_converted = None for event in events: event_converted = self.convert(event) - for row in event_converted: - rows.append(row) + if isinstance(event_converted, Event): + for row in event_converted: + rows.append(row) + else: + rows.append(event_converted) schema = self.dst_schema if self.dst_schema else event_converted.schema table = self.dst_table if self.dst_table else event_converted.table + sql = '' try: sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format( schema, @@ -59,10 +65,13 @@ def insert(self, event_or_events=None): ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) self.client.execute(sql, rows) - except: - print('QUERY FAILED -------------------------') - print(sql) - print(rows) + except Exception as ex: + print('QUERY FAILED:') + print('ex=', ex) + print('sql=', sql) + print('rows=', rows) + sys.exit(0) + if __name__ == '__main__': diff --git a/src/writer/writer.py b/src/writer/writer.py index 6d5c681..be77abe 100644 --- a/src/writer/writer.py +++ b/src/writer/writer.py @@ -28,12 +28,12 @@ def listify(self, obj_or_list): # no value - return empty list return [] - elif isinstance(obj_or_list, list): + elif isinstance(obj_or_list, list) or isinstance(obj_or_list, tuple) or isinstance(obj_or_list, set): if len(obj_or_list) < 1: - # list is empty - nothing to do + # list/set/tuple is empty - nothing to do return [] else: - # list is good + # list/set/tuple is good return obj_or_list else: