forked from Altinity/clickhouse-mysql-data-reader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysqlreader.py
490 lines (395 loc) · 17.8 KB
/
mysqlreader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import logging
import sys
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
from clickhouse_mysql.reader.reader import Reader
from clickhouse_mysql.event.event import Event
from clickhouse_mysql.tableprocessor import TableProcessor
from clickhouse_mysql.util import Util
class MySQLReader(Reader):
"""Read data from MySQL as replication ls"""
connection_settings = None
server_id = None
log_file = None
log_pos = None
schemas = None
tables = None
tables_prefixes = None
blocking = None
resume_stream = None
binlog_stream = None
nice_pause = 0
exit_gracefully = False
write_rows_event_num = 0
write_rows_event_each_row_num = 0
binlog_position_file = None
def __init__(
self,
connection_settings,
server_id,
log_file=None,
log_pos=None,
schemas=None,
tables=None,
tables_prefixes=None,
blocking=None,
resume_stream=None,
nice_pause=None,
binlog_position_file=None,
callbacks={},
):
super().__init__(callbacks=callbacks)
self.connection_settings = connection_settings
self.server_id = server_id
self.log_file = log_file
self.log_pos = log_pos
self.schemas = None if not TableProcessor.extract_dbs(schemas, Util.join_lists(tables,
tables_prefixes)) else TableProcessor.extract_dbs(
schemas, Util.join_lists(tables, tables_prefixes))
self.tables = None if tables is None else TableProcessor.extract_tables(tables)
self.tables_prefixes = None if tables_prefixes is None else TableProcessor.extract_tables(tables_prefixes)
self.blocking = blocking
self.resume_stream = resume_stream
self.nice_pause = nice_pause
self.binlog_position_file = binlog_position_file
logging.info("raw dbs list. len()=%d", 0 if schemas is None else len(schemas))
if schemas is not None:
for schema in schemas:
logging.info(schema)
logging.info("normalised dbs list. len()=%d", 0 if self.schemas is None else len(self.schemas))
if self.schemas is not None:
for schema in self.schemas:
logging.info(schema)
logging.info("raw tables list. len()=%d", 0 if tables is None else len(tables))
if tables is not None:
for table in tables:
logging.info(table)
logging.info("normalised tables list. len()=%d", 0 if self.tables is None else len(self.tables))
if self.tables is not None:
for table in self.tables:
logging.info(table)
logging.info("raw tables-prefixes list. len()=%d", 0 if tables_prefixes is None else len(tables_prefixes))
if tables_prefixes is not None:
for table in tables_prefixes:
logging.info(table)
logging.info("normalised tables-prefixes list. len()=%d",
0 if self.tables_prefixes is None else len(self.tables_prefixes))
if self.tables_prefixes is not None:
for table in self.tables_prefixes:
logging.info(table)
if not isinstance(self.server_id, int):
raise Exception("Please specify server_id of src server as int. Ex.: --src-server-id=1")
self.binlog_stream = BinLogStreamReader(
# MySQL server - data source
connection_settings=self.connection_settings,
server_id=self.server_id,
# we are interested in reading CH-repeatable events only
only_events=[
# Possible events
# BeginLoadQueryEvent,
DeleteRowsEvent,
# ExecuteLoadQueryEvent,
# FormatDescriptionEvent,
# GtidEvent,
# HeartbeatLogEvent,
# IntvarEvent
# NotImplementedEvent,
# QueryEvent,
# RotateEvent,
# StopEvent,
# TableMapEvent,
UpdateRowsEvent,
WriteRowsEvent,
# XidEvent,
],
only_schemas=self.schemas,
# in case we have any prefixes - this means we need to listen to all tables within specified schemas
only_tables=self.tables if not self.tables_prefixes else None,
log_file=self.log_file,
log_pos=self.log_pos,
freeze_schema=True, # If true do not support ALTER TABLE. It's faster.
blocking=False,
resume_stream=self.resume_stream,
)
def performance_report(self, start, rows_num, rows_num_per_event_min=None, rows_num_per_event_max=None, now=None):
# log performance report
if now is None:
now = time.time()
window_size = now - start
if window_size > 0:
rows_per_sec = rows_num / window_size
logging.info(
'PERF - %f rows/sec, min(rows/event)=%d max(rows/event)=%d for last %d rows %f sec',
rows_per_sec,
rows_num_per_event_min if rows_num_per_event_min is not None else -1,
rows_num_per_event_max if rows_num_per_event_max is not None else -1,
rows_num,
window_size,
)
else:
logging.info("PERF - can not calc performance for time size=0")
def is_table_listened(self, table):
"""
Check whether table name in either directly listed in tables or starts with prefix listed in tables_prefixes
:param table: table name
:return: bool is table listened
"""
# check direct table name match
if self.tables:
if table in self.tables:
return True
# check prefixes
if self.tables_prefixes:
for prefix in self.tables_prefixes:
if table.startswith(prefix):
# table name starts with prefix list
return True
return False
first_rows_passed = []
start_timestamp = 0
start = 0
rows_num = 0
rows_num_since_interim_performance_report = 0
rows_num_per_event_min = None
rows_num_per_event_max = None
def init_read_events(self):
self.start_timestamp = int(time.time())
self.first_rows_passed = []
def init_fetch_loop(self):
self.start = time.time()
def stat_init_fetch_loop(self):
self.rows_num = 0
self.rows_num_since_interim_performance_report = 0
self.rows_num_per_event_min = None
self.rows_num_per_event_max = None
def stat_close_fetch_loop(self):
if self.rows_num > 0:
# we have some rows processed
now = time.time()
if now > self.start + 60:
# and processing was long enough
self.performance_report(self.start, self.rows_num, now)
def stat_write_rows_event_calc_rows_num_min_max(self, rows_num_per_event):
# populate min value
if (self.rows_num_per_event_min is None) or (rows_num_per_event < self.rows_num_per_event_min):
self.rows_num_per_event_min = rows_num_per_event
# populate max value
if (self.rows_num_per_event_max is None) or (rows_num_per_event > self.rows_num_per_event_max):
self.rows_num_per_event_max = rows_num_per_event
def stat_write_rows_event_all_rows(self, mysql_event):
self.write_rows_event_num += 1
self.rows_num += len(mysql_event.rows)
self.rows_num_since_interim_performance_report += len(mysql_event.rows)
logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows))
def stat_write_rows_event_each_row(self):
self.write_rows_event_each_row_num += 1
logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num)
def stat_write_rows_event_each_row_for_each_row(self):
self.rows_num += 1
self.rows_num_since_interim_performance_report += 1
def stat_write_rows_event_finalyse(self):
if self.rows_num_since_interim_performance_report >= 100000:
# speed report each N rows
self.performance_report(
start=self.start,
rows_num=self.rows_num,
rows_num_per_event_min=self.rows_num_per_event_min,
rows_num_per_event_max=self.rows_num_per_event_max,
)
self.rows_num_since_interim_performance_report = 0
self.rows_num_per_event_min = None
self.rows_num_per_event_max = None
def process_first_event(self, event):
if "{}.{}".format(event.schema, event.table) not in self.first_rows_passed:
Util.log_row(event.first_row(), "first row in replication {}.{}".format(event.schema, event.table))
self.first_rows_passed.append("{}.{}".format(event.schema, event.table))
logging.info(self.first_rows_passed)
def process_write_rows_event(self, mysql_event):
"""
Process specific MySQL event - WriteRowsEvent
:param mysql_event: WriteRowsEvent instance
:return:
"""
logging.debug("Received insert event for table: " + mysql_event.table)
if self.tables_prefixes:
# we have prefixes specified
# need to find whether current event is produced by table in 'looking-into-tables' list
if not self.is_table_listened(mysql_event.table):
# this table is not listened
# processing is over - just skip event
return
# statistics
self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
if self.subscribers('WriteRowsEvent'):
# dispatch event to subscribers
# statistics
self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
# dispatch Event
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.pymysqlreplication_event = mysql_event
self.process_first_event(event=event)
self.notify('WriteRowsEvent', event=event)
if self.subscribers('WriteRowsEvent.EachRow'):
# dispatch event to subscribers
# statistics
self.stat_write_rows_event_each_row()
# dispatch Event per each row
for row in mysql_event.rows:
# statistics
self.stat_write_rows_event_each_row_for_each_row()
# dispatch Event
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.row = row['values']
self.process_first_event(event=event)
self.notify('WriteRowsEvent.EachRow', event=event)
self.stat_write_rows_event_finalyse()
def process_update_rows_event(self, mysql_event):
logging.debug("Received update event for table: " + mysql_event.table + " Schema: " + mysql_event.schema)
# for row in mysql_event.rows:
# for key in row['before_values']:
# logging.debug("\t *%s:%s=>%s" % (key, row["before_values"][key], row["after_values"][key]))
if self.tables_prefixes:
# we have prefixes specified
# need to find whether current event is produced by table in 'looking-into-tables' list
if not self.is_table_listened(mysql_event.table):
# this table is not listened
# processing is over - just skip event
return
# statistics
self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
if self.subscribers('UpdateRowsEvent'):
# dispatch event to subscribers
# statistics
# self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
# dispatch Event
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.pymysqlreplication_event = mysql_event
self.process_first_event(event=event)
self.notify('UpdateRowsEvent', event=event)
# self.stat_write_rows_event_finalyse()
# logging.info("Skip update rows")
def process_delete_rows_event(self, mysql_event):
logging.debug("Received delete event for table: " + mysql_event.table)
"""
for row in mysql_event.rows:
for key in row['values']:
logging.debug("\t *", key, ":", row["values"][key])
"""
if self.tables_prefixes:
# we have prefixes specified
# need to find whether current event is produced by table in 'looking-into-tables' list
if not self.is_table_listened(mysql_event.table):
# this table is not listened
# processing is over - just skip event
return
# statistics
# self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
if self.subscribers('DeleteRowsEvent'):
# dispatch event to subscribers
# statistics
# self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
# dispatch Event
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.pymysqlreplication_event = mysql_event
self.process_first_event(event=event)
self.notify('DeleteRowsEvent', event=event)
# self.stat_write_rows_event_finalyse()
# logging.info("Skip delete rows")
def process_binlog_position(self, file, pos):
if self.binlog_position_file:
with open(self.binlog_position_file, "w") as f:
f.write("{}:{}".format(file, pos))
logging.debug("Next event binlog pos: {}.{}".format(file, pos))
def read(self):
# main function - read data from source
self.init_read_events()
# fetch events
try:
while not self.exit_gracefully:
logging.debug('Check events in binlog stream')
self.init_fetch_loop()
# statistics
self.stat_init_fetch_loop()
try:
logging.debug('Pre-start binlog position: ' + self.binlog_stream.log_file + ":" + str(
self.binlog_stream.log_pos) if self.binlog_stream.log_pos is not None else "undef")
# fetch available events from MySQL
for mysql_event in self.binlog_stream:
if self.exit_gracefully:
break
logging.debug(
'Got Event ' + self.binlog_stream.log_file + ":" + str(self.binlog_stream.log_pos))
# process event based on its type
if isinstance(mysql_event, WriteRowsEvent):
self.process_write_rows_event(mysql_event)
elif isinstance(mysql_event, DeleteRowsEvent):
self.process_delete_rows_event(mysql_event)
elif isinstance(mysql_event, UpdateRowsEvent):
self.process_update_rows_event(mysql_event)
else:
# skip other unhandled events
pass
# after event processed, we need to handle current binlog position
self.process_binlog_position(self.binlog_stream.log_file, self.binlog_stream.log_pos)
except Exception as ex:
if self.blocking:
# we'd like to continue waiting for data
# report and continue cycle
logging.warning("Got an exception, skip it in blocking mode")
logging.exception(ex)
else:
# do not continue, report error and exit
logging.critical("Got an exception, abort it in non-blocking mode")
logging.exception(ex)
sys.exit(1)
# all events fetched (or none of them available)
# statistics
self.stat_close_fetch_loop()
if not self.blocking:
# do not wait for more data - all done
break # while True
# blocking - wait for more data
if self.nice_pause > 0:
time.sleep(self.nice_pause)
self.notify('ReaderIdleEvent')
except Exception as ex:
logging.warning("Got an exception, handle it")
logging.exception(ex)
try:
self.binlog_stream.close()
logging.info("Stop reading from MySQL")
except Exception as ex:
logging.warning("Unable to close binlog stream correctly")
logging.exception(ex)
end_timestamp = int(time.time())
logging.info('start %d', self.start_timestamp)
logging.info('end %d', end_timestamp)
logging.info('len %d', end_timestamp - self.start_timestamp)
def close(self):
self.exit_gracefully = True
self.nice_pause = 0
logging.info("MySQL should stop in the next loop")
if __name__ == '__main__':
connection_settings = {
'host': '127.0.0.1',
'port': 3306,
'user': 'reader',
'passwd': 'qwerty',
}
server_id = 1
reader = Reader(
connection_settings=connection_settings,
server_id=server_id,
)
reader.read()