Skip to content

data migrator - initial commit #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion clickhouse-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ if sys.version_info[0] < 3:
raise "Must be using Python 3"



class Main(Daemon):

config = None
Expand Down Expand Up @@ -43,6 +42,11 @@ class Main(Daemon):
elif self.config.is_table_templates_json():
print(json.dumps(self.config.table_builder().templates(self.config.is_table_templates_json())))

elif self.config.is_table_migrate():
migrator = self.config.table_migrator()
migrator.chwriter = self.config.writer()
migrator.migrate()

else:
pumper = Pumper(
reader=self.config.reader(),
Expand Down
44 changes: 36 additions & 8 deletions src/cliopts.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ def config():
action='store_true',
help='Prepare table templates as JSON.'
)

argparser.add_argument(
'--table-migrate',
action='store_true',
help='Migrate table(s).'
)

argparser.add_argument(
'--src-server-id',
Expand Down Expand Up @@ -282,6 +286,7 @@ def config():
'daemon': args.daemon,
'table-templates': args.table_templates,
'table-templates-json': args.table_templates_json,
'table-migrate': args.table_migrate,
'pid_file': args.pid_file,
'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled
'mempool-max-events-num': args.mempool_max_events_num,
Expand All @@ -298,13 +303,36 @@ def config():
},
},

'tablebuilder-config': {
'host': args.src_host,
'port': args.src_port,
'user': args.src_user,
'password': args.src_password,
'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None,
'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
'table-builder-config': {
'mysql': {
'host': args.src_host,
'port': args.src_port,
'user': args.src_user,
'password': args.src_password,
'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None,
'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
},
},

'table-migrator-config': {
'mysql': {
'host': args.src_host,
'port': args.src_port,
'user': args.src_user,
'password': args.src_password,
'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None,
'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
},
'clickhouse': {
'connection_settings': {
'host': args.dst_host,
'port': args.dst_port,
'user': args.dst_user,
'password': args.dst_password,
},
'dst_schema': args.dst_schema,
'dst_table': args.dst_table,
},
},

'reader-config': {
Expand Down
9 changes: 8 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .converter.csvwriteconverter import CSVWriteConverter
from .converter.chwriteconverter import CHWriteConverter
from .tablebuilder import TableBuilder
from .tablemigrator import TableMigrator

class Config(object):

Expand Down Expand Up @@ -53,7 +54,13 @@ def is_table_templates_json(self):
return self.config['app-config']['table-templates-json']

def table_builder(self):
return TableBuilder(**self.config['tablebuilder-config'])
return TableBuilder(**self.config['table-builder-config']['mysql'])

def is_table_migrate(self):
return self.config['app-config']['table-migrate']

def table_migrator(self):
return TableMigrator(**self.config['table-builder-config']['mysql'])

def reader(self):
if self.config['reader-config']['file']['csv_file_path']:
Expand Down
148 changes: 8 additions & 140 deletions src/tablebuilder.py
Original file line number Diff line number Diff line change
@@ -1,127 +1,10 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import MySQLdb
from .tableprocessor import TableProcessor


class TableBuilder(object):

connection = None
cursor = None

host = None
port = None
user = None
password = None
dbs = None
tables = None

def __init__(
self,
host=None,
port=None,
user=None,
password=None,
dbs=None,
tables=None
):
self.host = host
self.port = port
self.user = user
self.password = password
self.dbs = [] if dbs is None else dbs
self.tables = [] if tables is None else tables

def dbs_tables_lists(self):
"""
:return:
{
'db1' : ['table1', 'table2', 'table3']
'db2' : ['table1', 'table2', 'table3']
}
"""

if len(self.dbs) == 0:
# no dbs specified
# means we have to have
# at least 1 full table specified

if len(self.tables) == 0:
# nothing specified
return None

# verify that all tables have full name specified
for table in self.tables:
db, table = self.parse_full_table_name(table)
if db is None:
# short table name found
return None

dbs = {}
for table in self.tables:
db, table = self.parse_full_table_name(table)
if db not in dbs:
dbs[db] = set()
dbs[db].add(table)

return dbs

elif len(self.dbs) == 1:
# one db specified

# verify that none table specified at all
if len(self.tables) == 0:
return {
self.dbs[0]: self.tables_list(self.dbs[0])
}

# OR all tables have short name specification
# meaning they all belong to this table
for table in self.tables:
db, table = self.parse_full_table_name(table)
if db is not None:
# long table name found
return None

return {
self.dbs[0]: self.tables
}

else:
# multiple dbs specified
# verify that no tables specified
if len(self.tables) > 0:
return None

dbs = {}
for db in self.dbs:
dbs[db] = self.tables_list(db)

return dbs

return None

def tables_list(self, db):
"""
:param db:
:return: ['table1', 'table2', etc]
"""
self.connection = MySQLdb.connect(
host=self.host,
user=self.user,
passwd=self.password,
db=db,
)
self.cursor = self.connection.cursor()
self.cursor.execute("USE " + db)
tables = []
self.cursor.execute("SHOW TABLES") # execute 'SHOW TABLES' (but data is not returned)
for (table_name,) in self.cursor:
tables.append(table_name)

return tables

class TableBuilder(TableProcessor):

def templates(self, json=False):
"""
Expand Down Expand Up @@ -198,13 +81,7 @@ def create_table_columns_description(self, db=None, table=None, ):
columns_description = []

# issue 'DESCRIBE table' statement
self.connection = MySQLdb.connect(
host=self.host,
user=self.user,
passwd=self.password,
db=db,
)
self.cursor = self.connection.cursor()
self.connect(db=db)
self.cursor.execute("DESC {0}".format(self.create_full_table_name(db=db, table=table)))
for (_field, _type, _null, _key, _default, _extra,) in self.cursor:
# Field | Type | Null | Key | Default | Extra
Expand All @@ -224,21 +101,12 @@ def create_table_columns_description(self, db=None, table=None, ):

return columns_description

def create_full_table_name(self, db=None, table=None):
# `db`.`table` or just `table`
return '`{0}`.`{1}`'.format(db, table) if db else '`{0}`'.format(table)

def parse_full_table_name(self, full_name):
db, dot, name = full_name.partition('.')
if not dot:
name = db
db = None

return db if db is None else db.strip('`'), name.strip('`')


def is_field_nullable(self, nullable):
# Deal with NULLs
"""
Chack whether `nullable` can be interpreted as True
:param nullable: bool, string
:return: bool
"""
if isinstance(nullable, bool):
# for bool - simple statement
return nullable
Expand Down
53 changes: 53 additions & 0 deletions src/tablemigrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import MySQLdb
from MySQLdb.cursors import SSDictCursor
from .tableprocessor import TableProcessor


class TableMigrator(TableProcessor):

cursorclass = SSDictCursor
chwriter = None

def migrate(self):
dbs = self.dbs_tables_lists()

if dbs is None:
return None

for db in dbs:
for table in dbs[db]:
self.migrate_table(db=db, table=table)

def migrate_table(self, db=None, table=None, ):
self.connect(db=db)
self.cursor.execute("SELECT * FROM {0}".format(self.create_full_table_name(db=db, table=table)))
cnt = 0;
while True:
rows = self.cursor.fetchmany(10000)
if not rows:
break
self.chwriter.dst_schema = db
self.chwriter.dst_table = table
self.chwriter.insert(rows)
cnt += len(rows)

return cnt


if __name__ == '__main__':
tb = TableBuilder(
host='127.0.0.1',
user='reader',
password='qwerty',
dbs=['db'],
# tables='datatypes, enum_datatypes, json_datatypes',
tables=['datatypes', 'enum_datatypes', 'json_datatypes'],
)
templates = tb.templates()
for db in templates:
for table in templates[db]:
print(table, '=', templates[db][table])
Loading