Skip to content

Commit 1743942

Browse files
committed
fix tables migration
1 parent e54734f commit 1743942

File tree

11 files changed

+260
-113
lines changed

11 files changed

+260
-113
lines changed

clickhouse_mysql/clioptions.py

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,25 @@ def join_lists_into_dict(lists_to_join):
1616
[['a=b', 'c=d'], ['e=f', 'z=x'], ]
1717
1818
:return: None or dictionary
19-
{'a': 'b', 'c': 'd', 'e': 'f', 'z': 'x'}
19+
{'a': 'b', 'c': 'd', 'e': 'f', 'y': 'z'}
2020
2121
"""
2222

23+
# lists_to_join must be a list
2324
if not isinstance(lists_to_join, list):
2425
return None
2526

2627
res = {}
27-
for lst in lists_to_join:
28-
# lst = ['a=b', 'c=d']
29-
for column_value_pair in lst:
30-
# column_value_value = 'a=b'
31-
column, value = column_value_pair.split('=', 2)
32-
res[column] = value
33-
3428
# res = dict {
35-
# 'col1': 'value1',
36-
# 'col2': 'value2',
29+
# 'name1': 'value1',
30+
# 'name2': 'value2',
3731
# }
32+
for _list in lists_to_join:
33+
# _list = ['a=b', 'c=d']
34+
for name_value_pair in _list:
35+
# name_value_pair contains 'a=b'
36+
name, value = name_value_pair.split('=', 2)
37+
res[name] = value
3838

3939
# return with sanity check
4040
if len(res) > 0:
@@ -52,6 +52,7 @@ def join_lists(lists_to_join):
5252
['a', 'b', 'c', 'd', 'e', 'f']
5353
"""
5454

55+
# lists_to_join must be a list
5556
if not isinstance(lists_to_join, list):
5657
return None
5758

@@ -145,6 +146,7 @@ class CLIOptions(Options):
145146
'dst_distribute': False,
146147
'dst_cluster': None,
147148
'dst_table': None,
149+
'dst_table_prefix': None,
148150
'dst_create_table': False,
149151

150152
#
@@ -191,13 +193,13 @@ def options(self):
191193
'--nice-pause',
192194
type=int,
193195
default=self.default_options['nice_pause'],
194-
help='make nice pause between attempts to read binlog stream'
196+
help='Make specified (in sec) pause between attempts to read binlog stream'
195197
)
196198
argparser.add_argument(
197199
'--dry',
198200
action='store_true',
199201
help='Dry mode - do not do anything that can harm. '
200-
'Useful for debugging.'
202+
'Useful for debugging.'
201203
)
202204
argparser.add_argument(
203205
'--daemon',
@@ -208,13 +210,13 @@ def options(self):
208210
'--pid-file',
209211
type=str,
210212
default=self.default_options['pid_file'],
211-
help='Pid file to be used by app in daemon mode'
213+
help='Pid file to be used by the app in daemon mode'
212214
)
213215
argparser.add_argument(
214216
'--binlog-position-file',
215217
type=str,
216218
default=self.default_options['binlog_position_file'],
217-
help='File to write binlog position to'
219+
help='File to write binlog position to during bin log reading and to read position from on start'
218220
)
219221
argparser.add_argument(
220222
'--mempool',
@@ -242,7 +244,8 @@ def options(self):
242244
argparser.add_argument(
243245
'--csvpool',
244246
action='store_true',
245-
help='Cache data in CSV pool files on disk. Requires memory pooling, thus enables --mempool even if it is not explicitly specified'
247+
help='Cache data in CSV pool files on disk. Requires memory pooling, '
248+
'thus enables --mempool even if it is not explicitly specified'
246249
)
247250
argparser.add_argument(
248251
'--csvpool-file-path-prefix',
@@ -278,14 +281,19 @@ def options(self):
278281
argparser.add_argument(
279282
'--migrate-table',
280283
action='store_true',
281-
help='Migrate table(s). IMPORTANT!. Target table has to be created in ClickHouse '
282-
'or it has to be created with --create-table and possibly with --with-create-database options'
283-
'See --table-template and --table-create options for additional info.'
284+
help='Migrate table(s). Copy existing data from MySQL table(s) with SELECT statement. '
285+
'Binlog is not read during this procedure - just copy data from the src table(s). '
286+
'IMPORTANT!. Target table has to be created in ClickHouse '
287+
'or it has to be created with --dst-create-table and possibly with --with-create-database options. '
288+
'See --create-table-sql-template and --create-table-sql options for additional info. '
284289
)
285290
argparser.add_argument(
286291
'--pump-data',
287292
action='store_true',
288-
help='Pump data into ClickHouse'
293+
help='Pump data from MySQL binlog into ClickHouse. Copy rows from binlog until the end of binlog reached. '
294+
'When end of binlog reached, process ends. '
295+
'Use in combination with --src-wait in case would like to continue and wait for new rows '
296+
'after end of binlog reached'
289297
)
290298
argparser.add_argument(
291299
'--install',
@@ -330,19 +338,25 @@ def options(self):
330338
'--src-schemas',
331339
type=str,
332340
default=self.default_options['src_schemas'],
333-
help='Comma-separated list of schemas to be used when reading from src. Ex.: db1,db2,db3'
341+
help='Comma-separated list of databases (a.k.a schemas) to be used when reading from src. Ex.: db1,db2,db3'
334342
)
335343
argparser.add_argument(
336344
'--src-tables',
337345
type=str,
338346
default=self.default_options['src_tables'],
339-
help='Comma-separated list of tables to be used when reading from src. Ex.: table1,table2,table3'
347+
help='Comma-separated list of tables to be used when reading from src. '
348+
'Ex.: table1,table2,table3'
349+
'Ex.: db1.table1,db2.table2,db3.table3'
350+
'Ex.: table1,db2.table2,table3'
340351
)
341352
argparser.add_argument(
342353
'--src-tables-where-clauses',
343354
type=str,
344355
default=self.default_options['src_tables_where_clauses'],
345-
help='Comma-separated list of WHERE clauses for tables to be migrated. Ex.: db1.t1="a=1 and b=2",db2.t2="c=3 and k=4". Accepts both (comma-separated) clause (useful for short clauses) or file where clause is located (useful for long clauses)'
356+
help='Comma-separated list of WHERE clauses for tables to be migrated. '
357+
'Ex.: db1.t1="a=1 and b=2",db2.t2="c=3 and k=4". '
358+
'Accepts both (comma-separated) clause (useful for short clauses) or '
359+
'file where clause is located (useful for long clauses)'
346360
)
347361
argparser.add_argument(
348362
'--src-tables-prefixes',
@@ -360,19 +374,21 @@ def options(self):
360374
argparser.add_argument(
361375
'--src-resume',
362376
action='store_true',
363-
help='Resume reading from previous position.'
377+
help='Resume reading from previous position. Previous position is read from `binlog-position-file`'
364378
)
365379
argparser.add_argument(
366380
'--src-binlog-file',
367381
type=str,
368382
default=self.default_options['src_binlog_file'],
369-
help='Binlog file to be used when reading from src. Ex.: mysql-bin.000024'
383+
help='Binlog file to be used to read from src. Related to `binlog-position-file`. '
384+
'Ex.: mysql-bin.000024'
370385
)
371386
argparser.add_argument(
372387
'--src-binlog-position',
373388
type=int,
374389
default=self.default_options['src_binlog_position'],
375-
help='Binlog position to be used when reading from src. Ex.: 5703'
390+
help='Binlog position to be used when reading from src. Related to `binlog-position-file`. '
391+
'Ex.: 5703'
376392
)
377393
argparser.add_argument(
378394
'--src-file',
@@ -418,26 +434,35 @@ def options(self):
418434
'--dst-schema',
419435
type=str,
420436
default=self.default_options['dst_schema'],
421-
help='Database/schema to be used when writing to dst. Ex.: db1'
437+
help='Database (a.k.a schema) to be used to create tables in ClickHouse. '
438+
'It overwrites source database(s) name(s), so tables in ClickHouse '
439+
'would be located in differently named db than in MySQL. '
440+
'Ex.: db1'
422441
)
423442
argparser.add_argument(
424443
'--dst-distribute',
425444
action='store_true',
426445
default=self.default_options['dst_distribute'],
427-
help='is to add distribute table'
446+
help='Whether to add distribute table'
428447
)
429448
argparser.add_argument(
430449
'--dst-cluster',
431450
type=str,
432451
default=self.default_options['dst_cluster'],
433-
help='Cluster to be used when writing to dst. Ex.: db1'
452+
help='Cluster to be used when writing to dst. Ex.: cluster1'
434453
)
435454
argparser.add_argument(
436455
'--dst-table',
437456
type=str,
438457
default=self.default_options['dst_table'],
439458
help='Table to be used when writing to dst. Ex.: table1'
440459
)
460+
argparser.add_argument(
461+
'--dst-table-prefix',
462+
type=str,
463+
default=self.default_options['dst_table_prefix'],
464+
help='Prefix to be used when creating dst table. Ex.: copy_table_'
465+
)
441466
argparser.add_argument(
442467
'--dst-create-table',
443468
action='store_true',
@@ -453,7 +478,8 @@ def options(self):
453478
nargs='*',
454479
action='append',
455480
default=self.default_options['column_default_value'],
456-
help='Set of key=value pairs for columns default values. Ex.: date_1=2000-01-01 timestamp_1=2002-01-01\ 01:02:03'
481+
help='Set of key=value pairs for columns default values. '
482+
'Ex.: date_1=2000-01-01 timestamp_1=2002-01-01\ 01:02:03'
457483
)
458484
argparser.add_argument(
459485
'--column-skip',
@@ -535,6 +561,7 @@ def options(self):
535561
'dst_distribute': args.dst_distribute,
536562
'dst_cluster': args.dst_cluster,
537563
'dst_table': args.dst_table,
564+
'dst_table_prefix': args.dst_table_prefix,
538565
'dst_create_table': args.dst_create_table,
539566

540567
#
@@ -557,8 +584,8 @@ def options(filename):
557584

558585
#
559586
def transform(section, key):
560-
newkey = key.replace('-', '_')
561-
section.rename(key, newkey)
587+
new_key = key.replace('-', '_')
588+
section.rename(key, new_key)
562589

563590
# fetch base config
564591
try:
@@ -567,7 +594,7 @@ def transform(section, key):
567594
encoding="utf-8",
568595
default_encoding="utf-8",
569596
list_values=True,
570-
create_empty=False, # create empty config file
597+
create_empty=False, # create empty config file
571598
stringify=True,
572599
raise_errors=False,
573600
file_error=False,
@@ -582,7 +609,7 @@ def transform(section, key):
582609
encoding="utf-8",
583610
default_encoding="utf-8",
584611
list_values=True,
585-
create_empty=False, # create empty config file
612+
create_empty=False, # create empty config file
586613
stringify=True,
587614
raise_errors=False,
588615
file_error=False,

clickhouse_mysql/config.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def __init__(self):
107107
'port': self.options.get_int('src_port'),
108108
'user': self.options['src_user'],
109109
'password': self.options['src_password'],
110-
'dbs': self.options.get_list('src_schemas'),
110+
'schemas': self.options.get_list('src_schemas'),
111111
'tables': self.options.get_list('src_tables'),
112112
'tables_prefixes': self.options.get_list('src_tables_prefixes'),
113113
'column_skip': self.options['column_skip']
@@ -122,6 +122,8 @@ def __init__(self):
122122
'dst_schema': self.options['dst_schema'],
123123
'dst_distribute': self.options['dst_distribute'],
124124
'dst_cluster': self.options['dst_cluster'],
125+
'dst_table': self.options['dst_table'],
126+
'dst_table_prefix': self.options['dst_table_prefix'],
125127
'dst_create_table': self.options.get_bool('dst_create_table'),
126128
},
127129
},
@@ -135,7 +137,7 @@ def __init__(self):
135137
'port': self.options.get_int('src_port'),
136138
'user': self.options['src_user'],
137139
'password': self.options['src_password'],
138-
'dbs': self.options.get_list('src_schemas'),
140+
'schemas': self.options.get_list('src_schemas'),
139141
'tables': self.options.get_list('src_tables'),
140142
'tables_prefixes': self.options.get_list('src_tables_prefixes'),
141143
'tables_where_clauses': self.options.get_list('src_tables_where_clauses'),
@@ -152,6 +154,7 @@ def __init__(self):
152154
'dst_distribute': self.options['dst_distribute'],
153155
'dst_cluster': self.options['dst_cluster'],
154156
'dst_table': self.options['dst_table'],
157+
'dst_table_prefix': self.options['dst_table_prefix'],
155158
'dst_create_table': self.options.get_bool('dst_create_table'),
156159
},
157160
},
@@ -195,16 +198,19 @@ def __init__(self):
195198
'password': self.options['dst_password'],
196199
},
197200
'dst_schema': self.options['dst_schema'],
198-
'dst_table': self.options['dst_table'],
199201
'dst_distribute': self.options['dst_distribute'],
202+
'dst_table': self.options['dst_table'],
203+
'dst_table_prefix': self.options['dst_table_prefix'],
200204
},
201205
'file': {
202206
'csv_file_path': self.options['dst_file'],
203207
'csv_file_path_prefix': self.options['csvpool_file_path_prefix'],
204208
'csv_file_path_suffix_parts': [],
205209
'csv_keep_file': self.options['csvpool_keep_files'],
206210
'dst_schema': self.options['dst_schema'],
211+
'dst_distribute': self.options['dst_distribute'],
207212
'dst_table': self.options['dst_table'],
213+
'dst_table_prefix': self.options['dst_table_prefix'],
208214
},
209215
},
210216
}
@@ -254,8 +260,10 @@ def table_sql_builder(self):
254260
port=self.config['table_builder']['mysql']['port'],
255261
user=self.config['table_builder']['mysql']['user'],
256262
password=self.config['table_builder']['mysql']['password'],
257-
dbs=self.config['table_builder']['mysql']['dbs'],
258-
schema=self.config['table_builder']['clickhouse']['dst_schema'],
263+
dbs=self.config['table_builder']['mysql']['schemas'],
264+
dst_schema=self.config['table_builder']['clickhouse']['dst_schema'],
265+
dst_table=self.config['table_builder']['clickhouse']['dst_table'],
266+
dst_table_prefix=self.config['table_builder']['clickhouse']['dst_table_prefix'],
259267
distribute=self.config['table_builder']['clickhouse']['dst_distribute'],
260268
cluster=self.config['table_builder']['clickhouse']['dst_cluster'],
261269
tables=self.config['table_builder']['mysql']['tables'],
@@ -278,8 +286,10 @@ def table_migrator(self):
278286
port=self.config['table_migrator']['mysql']['port'],
279287
user=self.config['table_migrator']['mysql']['user'],
280288
password=self.config['table_migrator']['mysql']['password'],
281-
dbs=self.config['table_migrator']['mysql']['dbs'],
282-
schema=self.config['table_migrator']['clickhouse']['dst_schema'],
289+
dbs=self.config['table_migrator']['mysql']['schemas'],
290+
dst_schema=self.config['table_migrator']['clickhouse']['dst_schema'],
291+
dst_table=self.config['table_builder']['clickhouse']['dst_table'],
292+
dst_table_prefix=self.config['table_builder']['clickhouse']['dst_table_prefix'],
283293
distribute=self.config['table_migrator']['clickhouse']['dst_distribute'],
284294
cluster=self.config['table_migrator']['clickhouse']['dst_cluster'],
285295
tables=self.config['table_migrator']['mysql']['tables'],
@@ -347,6 +357,7 @@ def writer_builder_csvpool(self):
347357
'csv_keep_file': self.config['writer']['file']['csv_keep_file'],
348358
'dst_schema': self.config['writer']['file']['dst_schema'],
349359
'dst_table': self.config['writer']['file']['dst_table'],
360+
'dst_table_prefix': self.config['writer']['file']['dst_table_prefix'],
350361
'next_writer_builder': ObjectBuilder(
351362
class_name=CHCSVWriter,
352363
constructor_params=self.config['writer']['clickhouse']
@@ -363,6 +374,7 @@ def writer_builder_csv_file(self):
363374
'csv_keep_file': self.config['writer']['file']['csv_keep_file'],
364375
'dst_schema': self.config['writer']['file']['dst_schema'],
365376
'dst_table': self.config['writer']['file']['dst_table'],
377+
'dst_table_prefix': self.config['writer']['file']['dst_table_prefix'],
366378
'next_writer_builder': None,
367379
'converter_builder': self.converter_builder(CONVERTER_CSV),
368380
})
@@ -377,6 +389,7 @@ def writer_builder_chwriter(self):
377389
},
378390
'dst_schema': self.config['writer']['clickhouse']['dst_schema'],
379391
'dst_table': self.config['writer']['clickhouse']['dst_table'],
392+
'dst_table_prefix': self.config['writer']['clickhouse']['dst_table_prefix'],
380393
'dst_distribute': self.config['writer']['clickhouse']['dst_distribute'],
381394
'next_writer_builder': None,
382395
'converter_builder': self.converter_builder(CONVERTER_CH),

clickhouse_mysql/pumper.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4-
import time
54

65
class Pumper(object):
76
"""
@@ -20,7 +19,7 @@ def __init__(self, reader=None, writer=None):
2019
# subscribe on reader's event notifications
2120
self.reader.subscribe({
2221
'WriteRowsEvent': self.write_rows_event,
23-
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
22+
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
2423
'ReaderIdleEvent': self.reader_idle_event,
2524
})
2625

0 commit comments

Comments
 (0)