Skip to content

Commit 296cd25

Browse files
committed
--table-templates and --table-templates-json CLI options
1 parent a39c550 commit 296cd25

File tree

4 files changed

+208
-73
lines changed

4 files changed

+208
-73
lines changed

main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import multiprocessing as mp
1010
import logging
1111
import pprint
12+
import json
1213

1314
if sys.version_info[0] < 3:
1415
raise "Must be using Python 3"
@@ -34,10 +35,14 @@ def __init__(self):
3435

3536
def run(self):
3637
if self.config.is_table_templates():
37-
templates = self.config.table_builder().templates()
38+
templates = self.config.table_builder().templates(self.config.is_table_templates_json())
3839
for db in templates:
3940
for table in templates[db]:
40-
print(db, ':', table, ':', templates[db][table])
41+
print(templates[db][table])
42+
43+
elif self.config.is_table_templates_json():
44+
print(json.dumps(self.config.table_builder().templates(self.config.is_table_templates_json())))
45+
4146
else:
4247
pumper = Pumper(
4348
reader=self.config.reader(),

src/cliopts.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@ def config():
151151
action='store_true',
152152
help='Prepare table templates.'
153153
)
154+
argparser.add_argument(
155+
'--table-templates-json',
156+
action='store_true',
157+
help='Prepare table templates as JSON.'
158+
)
159+
154160

155161
argparser.add_argument(
156162
'--src-server-id',
@@ -275,6 +281,7 @@ def config():
275281
'dry': args.dry,
276282
'daemon': args.daemon,
277283
'table-templates': args.table_templates,
284+
'table-templates-json': args.table_templates_json,
278285
'pid_file': args.pid_file,
279286
'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled
280287
'mempool-max-events-num': args.mempool_max_events_num,

src/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ def is_pool(self):
4949
def is_table_templates(self):
5050
return self.config['app-config']['table-templates']
5151

52+
def is_table_templates_json(self):
53+
return self.config['app-config']['table-templates-json']
54+
5255
def table_builder(self):
5356
return TableBuilder(**self.config['tablebuilder-config'])
5457

src/tablebuilder.py

Lines changed: 191 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -17,114 +17,240 @@ class TableBuilder(object):
1717
dbs = None
1818
tables = None
1919

20-
def __init__(self, host=None, port=None, user=None, password=None, dbs=None, tables=None):
20+
def __init__(
21+
self,
22+
host=None,
23+
port=None,
24+
user=None,
25+
password=None,
26+
dbs=None,
27+
tables=None
28+
):
2129
self.host = host
2230
self.port = port
2331
self.user = user
2432
self.password = password
25-
self.dbs = dbs
26-
self.tables = tables
33+
self.dbs = [] if dbs is None else dbs
34+
self.tables = [] if tables is None else tables
2735

28-
def templates(self):
36+
def dbs_tables_lists(self):
2937
"""
30-
Create templates for specified MySQL tables. In case no tables specified all tables from specified db are templated
31-
32-
:param host: string MySQL host
33-
:param user: string MySQL user
34-
:param password: string MySQL password
35-
:param dbs: list of string MySQL datatabse/ May be omitted, in this case tables has to contain full table names, Ex.: db.table1
36-
:param tables: list of string list of table names. May be short (in case db specified) or full (in the form db.table, in case no db specified)
37-
:return: dict of CREATE TABLE () templates
38+
:return:
39+
{
40+
'db1' : ['table1', 'table2', 'table3']
41+
'db2' : ['table1', 'table2', 'table3']
42+
}
3843
"""
39-
res = {}
4044

41-
db = None
45+
if len(self.dbs) == 0:
46+
# no dbs specified
47+
# means we have to have
48+
# at least 1 full table specified
49+
50+
if len(self.tables) == 0:
51+
# nothing specified
52+
return None
53+
54+
# verify that all tables have full name specified
55+
for table in self.tables:
56+
db, table = self.parse_full_table_name(table)
57+
if db is None:
58+
# short table name found
59+
return None
60+
61+
dbs = {}
62+
for table in self.tables:
63+
db, table = self.parse_full_table_name(table)
64+
if db not in dbs:
65+
dbs[db] = set()
66+
dbs[db].add(table)
67+
68+
return dbs
69+
70+
elif len(self.dbs) == 1:
71+
# one db specified
72+
73+
# verify that none table specified at all
74+
if len(self.tables) == 0:
75+
return {
76+
self.dbs[0]: self.tables_list(self.dbs[0])
77+
}
78+
79+
# OR all tables have short name specification
80+
# meaning they all belong to this table
81+
for table in self.tables:
82+
db, table = self.parse_full_table_name(table)
83+
if db is not None:
84+
# long table name found
85+
return None
86+
87+
return {
88+
self.dbs[0]: self.tables
89+
}
4290

43-
try:
44-
db = self.dbs[0]
45-
except:
46-
pass
91+
else:
92+
# multiple dbs specified
93+
# verify that no tables specified
94+
if len(self.tables) > 0:
95+
return None
96+
97+
dbs = {}
98+
for db in self.dbs:
99+
dbs[db] = self.tables_list(db)
47100

48-
# sanity check
49-
if db is None and self.tables is None:
50-
return res
101+
return dbs
51102

52-
# MySQL connections
103+
return None
104+
105+
def tables_list(self, db):
106+
"""
107+
:param db:
108+
:return: ['table1', 'table2', etc]
109+
"""
53110
self.connection = MySQLdb.connect(
54111
host=self.host,
55112
user=self.user,
56113
passwd=self.password,
57114
db=db,
58115
)
59116
self.cursor = self.connection.cursor()
117+
self.cursor.execute("USE " + db)
118+
tables = []
119+
self.cursor.execute("SHOW TABLES") # execute 'SHOW TABLES' (but data is not returned)
120+
for (table_name,) in self.cursor:
121+
tables.append(table_name)
122+
123+
return tables
124+
60125

61-
# in case to tables specified - list all tables of the DB specified
62-
if db is not None and self.tables is None:
63-
self.cursor.execute("USE " + db)
64-
self.tables = []
65-
self.cursor.execute("SHOW TABLES") # execute 'SHOW TABLES' (but data is not returned)
66-
for (table_name,) in self.cursor:
67-
self.tables.append(table_name)
68-
69-
# create dict of table templates
70-
for table in self.tables:
71-
if not db in res:
72-
res[db] = {}
73-
res[db][table] = self.create_table_template(table, db)
74-
75-
# {
76-
# 'db': {
77-
# 'table1': 'CREATE TABLE(...)...',
78-
# 'table2': 'CREATE TABLE(...)...',
79-
# }
80-
# }
81-
return res
82-
83-
def create_table_template(self, table_name, db=None):
126+
def templates(self, json=False):
127+
"""
128+
Create templates for specified MySQL tables. In case no tables specified all tables from specified db are templated
129+
130+
:param host: string MySQL host
131+
:param user: string MySQL user
132+
:param password: string MySQL password
133+
:param dbs: list of string MySQL datatabse/ May be omitted, in this case tables has to contain full table names, Ex.: db.table1
134+
:param tables: list of string list of table names. May be short (in case db specified) or full (in the form db.table, in case no db specified)
135+
:return: dict of CREATE TABLE () templates
136+
"""
137+
dbs = self.dbs_tables_lists()
138+
139+
if dbs is None:
140+
return None
141+
142+
templates = {}
143+
for db in dbs:
144+
templates[db] = {}
145+
for table in dbs[db]:
146+
templates[db][table] = self.create_table_description(db=db, table=table, json=json)
147+
148+
return templates
149+
150+
def create_table_description(self, db=None, table=None, json=False):
151+
columns_description = self.create_table_columns_description(db=db, table=table)
152+
sql_template = self.create_table_sql_template(db=db, table=table, columns_descrption=columns_description)
153+
if json:
154+
return {
155+
"template": sql_template,
156+
"fields": columns_description,
157+
}
158+
else:
159+
return sql_template
160+
161+
def create_table_sql_template(self, db=None, table=None, columns_descrption=None):
84162
"""
85163
Produce template for CH's
86164
CREATE TABLE(
87165
...
88166
columns specification
89167
...
90-
) ENGINE = MergeTree(_SPECIFY_DateField_HERE, (SPECIFY_INDEX_FIELD1, SPECIFY_INDEX_FIELD2, ...etc...), 8192)
168+
) ENGINE = MergeTree(_<PRIMARY_DATE_FIELD>, (<COMMA_SEPARATED_INDEX_FIELDS_LIST>), 8192)
91169
for specified MySQL's table
92-
:param table_name: string - name of the table in MySQL which will be used as a base for CH's CREATE TABLE template
170+
:param table: string - name of the table in MySQL which will be used as a base for CH's CREATE TABLE template
93171
:param db: string - name of the DB in MySQL
94172
:return: string - almost-ready-to-use CREATE TABLE statement
95173
"""
96174

97-
# `db`.`table` or just `table`
98-
name = '`{0}`.`{1}`'.format(db, table_name) if db else '`{0}`'.format(table_name)
99-
100-
# list of ready-to-sql CH columns
101175
ch_columns = []
176+
for column_description in columns_descrption:
177+
ch_columns.append('`{0}` {1}'.format(column_description['field'], column_description['clickhouse_type']))
178+
179+
sql = """CREATE TABLE {0} (
180+
{1}
181+
) ENGINE = MergeTree(<PRIMARY_DATE_FIELD>, (<COMMA_SEPARATED_INDEX_FIELDS_LIST>), 8192)
182+
""".format(
183+
self.create_full_table_name(db=db, table=table),
184+
",\n ".join(ch_columns)
185+
)
186+
return sql
187+
188+
def create_table_columns_description(self, db=None, table=None, ):
189+
# list of table columns specifications
190+
# [{ 'field': 'f1',
191+
# 'mysql_type': 'int',
192+
# 'clickhouse_type': 'UInt32'
193+
# 'nullable': True,
194+
# 'key': 'PRI',
195+
# 'default': 'CURRENT TIMESTAMP',
196+
# 'extra': 'on update CURRENT_TIMESTAMP',
197+
# }, {}, {}]
198+
columns_description = []
102199

103200
# issue 'DESCRIBE table' statement
104-
self.cursor.execute("DESC {0}".format(name))
201+
self.connection = MySQLdb.connect(
202+
host=self.host,
203+
user=self.user,
204+
passwd=self.password,
205+
db=db,
206+
)
207+
self.cursor = self.connection.cursor()
208+
self.cursor.execute("DESC {0}".format(self.create_full_table_name(db=db, table=table)))
105209
for (_field, _type, _null, _key, _default, _extra,) in self.cursor:
106210
# Field | Type | Null | Key | Default | Extra
107211

108212
# build ready-to-sql column specification Ex.:
109213
# `integer_1` Nullable(Int32)
110214
# `u_integer_1` Nullable(UInt32)
111-
ch_columns.append('`{0}` {1}'.format(_field, self.map_type(mysql_type=_type, nullable=_null, )))
215+
columns_description.append({
216+
'field': _field,
217+
'mysql_type': _type,
218+
'clickhouse_type': self.map_type(mysql_type=_type, nullable=self.is_field_nullable(_null)),
219+
'nullable': self.is_field_nullable(_null),
220+
'key': _key,
221+
'default': _default,
222+
'extra': _extra,
223+
})
224+
225+
return columns_description
226+
227+
def create_full_table_name(self, db=None, table=None):
228+
# `db`.`table` or just `table`
229+
return '`{0}`.`{1}`'.format(db, table) if db else '`{0}`'.format(table)
112230

113-
sql = """
114-
CREATE TABLE {0} (
115-
{1}
116-
) ENGINE = MergeTree(_SPECIFY_DateField_HERE, (SPECIFY_INDEX_FIELD1, SPECIFY_INDEX_FIELD2, ...etc...), 8192)
117-
""".format(
118-
name,
119-
",\n ".join(ch_columns)
120-
)
121-
return sql
231+
def parse_full_table_name(self, full_name):
232+
db, dot, name = full_name.partition('.')
233+
if not dot:
234+
name = db
235+
db = None
236+
237+
return db if db is None else db.strip('`'), name.strip('`')
238+
239+
240+
def is_field_nullable(self, nullable):
241+
# Deal with NULLs
242+
if isinstance(nullable, bool):
243+
# for bool - simple statement
244+
return nullable
245+
elif isinstance(nullable, str):
246+
# also accept case-insensitive string 'yes'
247+
return True if nullable.upper() == "YES" else False
122248

123249
def map_type(self, mysql_type, nullable=False):
124250
"""
125251
Map MySQL type (as a string from DESC table statement) to CH type (as string)
126252
:param mysql_type: string MySQL type (from DESC statement). Ex.: 'INT(10) UNSIGNED', 'BOOLEAN'
127-
:param nullable: bool|string True|'yes' is this field nullable
253+
:param nullable: bool is this field nullable
128254
:return: string CH's type specification directly usable in CREATE TABLE statement. Ex.:
129255
Nullable(Int32)
130256
Nullable(UInt32)
@@ -209,14 +335,8 @@ def map_type(self, mysql_type, nullable=False):
209335
ch_type = 'UNKNOWN'
210336

211337
# Deal with NULLs
212-
if isinstance(nullable, bool):
213-
# for bool - simple statement
214-
if nullable:
215-
ch_type = 'Nullable(' + ch_type + ')'
216-
elif isinstance(nullable, str):
217-
# also accept case-insencitive string 'yes'
218-
if nullable.upper() == "YES":
219-
ch_type = 'Nullable(' + ch_type + ')'
338+
if nullable:
339+
ch_type = 'Nullable(' + ch_type + ')'
220340

221341
return ch_type
222342

0 commit comments

Comments
 (0)