Skip to content

Commit a269786

Browse files
authored
Merge pull request #50 from sunsingerus/master
data migrator - initial commit
2 parents cee0e14 + 85a787b commit a269786

8 files changed

+304
-159
lines changed

clickhouse-mysql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ if sys.version_info[0] < 3:
1515
raise "Must be using Python 3"
1616

1717

18-
1918
class Main(Daemon):
2019

2120
config = None
@@ -43,6 +42,11 @@ class Main(Daemon):
4342
elif self.config.is_table_templates_json():
4443
print(json.dumps(self.config.table_builder().templates(self.config.is_table_templates_json())))
4544

45+
elif self.config.is_table_migrate():
46+
migrator = self.config.table_migrator()
47+
migrator.chwriter = self.config.writer()
48+
migrator.migrate()
49+
4650
else:
4751
pumper = Pumper(
4852
reader=self.config.reader(),

src/cliopts.py

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,11 @@ def config():
156156
action='store_true',
157157
help='Prepare table templates as JSON.'
158158
)
159-
159+
argparser.add_argument(
160+
'--table-migrate',
161+
action='store_true',
162+
help='Migrate table(s).'
163+
)
160164

161165
argparser.add_argument(
162166
'--src-server-id',
@@ -282,6 +286,7 @@ def config():
282286
'daemon': args.daemon,
283287
'table-templates': args.table_templates,
284288
'table-templates-json': args.table_templates_json,
289+
'table-migrate': args.table_migrate,
285290
'pid_file': args.pid_file,
286291
'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled
287292
'mempool-max-events-num': args.mempool_max_events_num,
@@ -298,13 +303,36 @@ def config():
298303
},
299304
},
300305

301-
'tablebuilder-config': {
302-
'host': args.src_host,
303-
'port': args.src_port,
304-
'user': args.src_user,
305-
'password': args.src_password,
306-
'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None,
307-
'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
306+
'table-builder-config': {
307+
'mysql': {
308+
'host': args.src_host,
309+
'port': args.src_port,
310+
'user': args.src_user,
311+
'password': args.src_password,
312+
'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None,
313+
'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
314+
},
315+
},
316+
317+
'table-migrator-config': {
318+
'mysql': {
319+
'host': args.src_host,
320+
'port': args.src_port,
321+
'user': args.src_user,
322+
'password': args.src_password,
323+
'dbs': [x for x in args.src_only_schemas.split(',') if x] if args.src_only_schemas else None,
324+
'tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None,
325+
},
326+
'clickhouse': {
327+
'connection_settings': {
328+
'host': args.dst_host,
329+
'port': args.dst_port,
330+
'user': args.dst_user,
331+
'password': args.dst_password,
332+
},
333+
'dst_schema': args.dst_schema,
334+
'dst_table': args.dst_table,
335+
},
308336
},
309337

310338
'reader-config': {

src/config.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from .converter.csvwriteconverter import CSVWriteConverter
1515
from .converter.chwriteconverter import CHWriteConverter
1616
from .tablebuilder import TableBuilder
17+
from .tablemigrator import TableMigrator
1718

1819
class Config(object):
1920

@@ -53,7 +54,13 @@ def is_table_templates_json(self):
5354
return self.config['app-config']['table-templates-json']
5455

5556
def table_builder(self):
56-
return TableBuilder(**self.config['tablebuilder-config'])
57+
return TableBuilder(**self.config['table-builder-config']['mysql'])
58+
59+
def is_table_migrate(self):
60+
return self.config['app-config']['table-migrate']
61+
62+
def table_migrator(self):
63+
return TableMigrator(**self.config['table-builder-config']['mysql'])
5764

5865
def reader(self):
5966
if self.config['reader-config']['file']['csv_file_path']:

src/tablebuilder.py

Lines changed: 8 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -1,127 +1,10 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4-
import logging
5-
import MySQLdb
4+
from .tableprocessor import TableProcessor
65

76

8-
class TableBuilder(object):
9-
10-
connection = None
11-
cursor = None
12-
13-
host = None
14-
port = None
15-
user = None
16-
password = None
17-
dbs = None
18-
tables = None
19-
20-
def __init__(
21-
self,
22-
host=None,
23-
port=None,
24-
user=None,
25-
password=None,
26-
dbs=None,
27-
tables=None
28-
):
29-
self.host = host
30-
self.port = port
31-
self.user = user
32-
self.password = password
33-
self.dbs = [] if dbs is None else dbs
34-
self.tables = [] if tables is None else tables
35-
36-
def dbs_tables_lists(self):
37-
"""
38-
:return:
39-
{
40-
'db1' : ['table1', 'table2', 'table3']
41-
'db2' : ['table1', 'table2', 'table3']
42-
}
43-
"""
44-
45-
if len(self.dbs) == 0:
46-
# no dbs specified
47-
# means we have to have
48-
# at least 1 full table specified
49-
50-
if len(self.tables) == 0:
51-
# nothing specified
52-
return None
53-
54-
# verify that all tables have full name specified
55-
for table in self.tables:
56-
db, table = self.parse_full_table_name(table)
57-
if db is None:
58-
# short table name found
59-
return None
60-
61-
dbs = {}
62-
for table in self.tables:
63-
db, table = self.parse_full_table_name(table)
64-
if db not in dbs:
65-
dbs[db] = set()
66-
dbs[db].add(table)
67-
68-
return dbs
69-
70-
elif len(self.dbs) == 1:
71-
# one db specified
72-
73-
# verify that none table specified at all
74-
if len(self.tables) == 0:
75-
return {
76-
self.dbs[0]: self.tables_list(self.dbs[0])
77-
}
78-
79-
# OR all tables have short name specification
80-
# meaning they all belong to this table
81-
for table in self.tables:
82-
db, table = self.parse_full_table_name(table)
83-
if db is not None:
84-
# long table name found
85-
return None
86-
87-
return {
88-
self.dbs[0]: self.tables
89-
}
90-
91-
else:
92-
# multiple dbs specified
93-
# verify that no tables specified
94-
if len(self.tables) > 0:
95-
return None
96-
97-
dbs = {}
98-
for db in self.dbs:
99-
dbs[db] = self.tables_list(db)
100-
101-
return dbs
102-
103-
return None
104-
105-
def tables_list(self, db):
106-
"""
107-
:param db:
108-
:return: ['table1', 'table2', etc]
109-
"""
110-
self.connection = MySQLdb.connect(
111-
host=self.host,
112-
user=self.user,
113-
passwd=self.password,
114-
db=db,
115-
)
116-
self.cursor = self.connection.cursor()
117-
self.cursor.execute("USE " + db)
118-
tables = []
119-
self.cursor.execute("SHOW TABLES") # execute 'SHOW TABLES' (but data is not returned)
120-
for (table_name,) in self.cursor:
121-
tables.append(table_name)
122-
123-
return tables
124-
7+
class TableBuilder(TableProcessor):
1258

1269
def templates(self, json=False):
12710
"""
@@ -198,13 +81,7 @@ def create_table_columns_description(self, db=None, table=None, ):
19881
columns_description = []
19982

20083
# issue 'DESCRIBE table' statement
201-
self.connection = MySQLdb.connect(
202-
host=self.host,
203-
user=self.user,
204-
passwd=self.password,
205-
db=db,
206-
)
207-
self.cursor = self.connection.cursor()
84+
self.connect(db=db)
20885
self.cursor.execute("DESC {0}".format(self.create_full_table_name(db=db, table=table)))
20986
for (_field, _type, _null, _key, _default, _extra,) in self.cursor:
21087
# Field | Type | Null | Key | Default | Extra
@@ -224,21 +101,12 @@ def create_table_columns_description(self, db=None, table=None, ):
224101

225102
return columns_description
226103

227-
def create_full_table_name(self, db=None, table=None):
228-
# `db`.`table` or just `table`
229-
return '`{0}`.`{1}`'.format(db, table) if db else '`{0}`'.format(table)
230-
231-
def parse_full_table_name(self, full_name):
232-
db, dot, name = full_name.partition('.')
233-
if not dot:
234-
name = db
235-
db = None
236-
237-
return db if db is None else db.strip('`'), name.strip('`')
238-
239-
240104
def is_field_nullable(self, nullable):
241-
# Deal with NULLs
105+
"""
106+
Chack whether `nullable` can be interpreted as True
107+
:param nullable: bool, string
108+
:return: bool
109+
"""
242110
if isinstance(nullable, bool):
243111
# for bool - simple statement
244112
return nullable

src/tablemigrator.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import logging
5+
import MySQLdb
6+
from MySQLdb.cursors import SSDictCursor
7+
from .tableprocessor import TableProcessor
8+
9+
10+
class TableMigrator(TableProcessor):
11+
12+
cursorclass = SSDictCursor
13+
chwriter = None
14+
15+
def migrate(self):
16+
dbs = self.dbs_tables_lists()
17+
18+
if dbs is None:
19+
return None
20+
21+
for db in dbs:
22+
for table in dbs[db]:
23+
self.migrate_table(db=db, table=table)
24+
25+
def migrate_table(self, db=None, table=None, ):
26+
self.connect(db=db)
27+
self.cursor.execute("SELECT * FROM {0}".format(self.create_full_table_name(db=db, table=table)))
28+
cnt = 0;
29+
while True:
30+
rows = self.cursor.fetchmany(10000)
31+
if not rows:
32+
break
33+
self.chwriter.dst_schema = db
34+
self.chwriter.dst_table = table
35+
self.chwriter.insert(rows)
36+
cnt += len(rows)
37+
38+
return cnt
39+
40+
41+
if __name__ == '__main__':
42+
tb = TableBuilder(
43+
host='127.0.0.1',
44+
user='reader',
45+
password='qwerty',
46+
dbs=['db'],
47+
# tables='datatypes, enum_datatypes, json_datatypes',
48+
tables=['datatypes', 'enum_datatypes', 'json_datatypes'],
49+
)
50+
templates = tb.templates()
51+
for db in templates:
52+
for table in templates[db]:
53+
print(table, '=', templates[db][table])

0 commit comments

Comments
 (0)