12
12
from clickhouse_mysql .event .event import Event
13
13
from clickhouse_mysql .tableprocessor import TableProcessor
14
14
from clickhouse_mysql .util import Util
15
- # from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
15
+ from pymysqlreplication .event import QueryEvent , RotateEvent , FormatDescriptionEvent
16
16
17
17
18
18
class MySQLReader (Reader ):
@@ -56,13 +56,15 @@ def __init__(
56
56
self .server_id = server_id
57
57
self .log_file = log_file
58
58
self .log_pos = log_pos
59
- self .schemas = None if not TableProcessor .extract_dbs (schemas , Util .join_lists (tables , tables_prefixes )) else TableProcessor .extract_dbs (schemas , Util .join_lists (tables , tables_prefixes ))
59
+ self .schemas = None if not TableProcessor .extract_dbs (schemas , Util .join_lists (tables ,
60
+ tables_prefixes )) else TableProcessor .extract_dbs (
61
+ schemas , Util .join_lists (tables , tables_prefixes ))
60
62
self .tables = None if tables is None else TableProcessor .extract_tables (tables )
61
63
self .tables_prefixes = None if tables_prefixes is None else TableProcessor .extract_tables (tables_prefixes )
62
64
self .blocking = blocking
63
65
self .resume_stream = resume_stream
64
66
self .nice_pause = nice_pause
65
- self .binlog_position_file = binlog_position_file
67
+ self .binlog_position_file = binlog_position_file
66
68
67
69
logging .info ("raw dbs list. len()=%d" , 0 if schemas is None else len (schemas ))
68
70
if schemas is not None :
@@ -86,7 +88,8 @@ def __init__(
86
88
if tables_prefixes is not None :
87
89
for table in tables_prefixes :
88
90
logging .info (table )
89
- logging .info ("normalised tables-prefixes list. len()=%d" , 0 if self .tables_prefixes is None else len (self .tables_prefixes ))
91
+ logging .info ("normalised tables-prefixes list. len()=%d" ,
92
+ 0 if self .tables_prefixes is None else len (self .tables_prefixes ))
90
93
if self .tables_prefixes is not None :
91
94
for table in self .tables_prefixes :
92
95
logging .info (table )
@@ -101,21 +104,21 @@ def __init__(
101
104
# we are interested in reading CH-repeatable events only
102
105
only_events = [
103
106
# Possible events
104
- #BeginLoadQueryEvent,
107
+ # BeginLoadQueryEvent,
105
108
DeleteRowsEvent ,
106
- #ExecuteLoadQueryEvent,
107
- #FormatDescriptionEvent,
108
- #GtidEvent,
109
- #HeartbeatLogEvent,
110
- #IntvarEvent
111
- #NotImplementedEvent,
112
- #QueryEvent,
113
- #RotateEvent,
114
- #StopEvent,
115
- #TableMapEvent,
109
+ # ExecuteLoadQueryEvent,
110
+ # FormatDescriptionEvent,
111
+ # GtidEvent,
112
+ # HeartbeatLogEvent,
113
+ # IntvarEvent
114
+ # NotImplementedEvent,
115
+ # QueryEvent,
116
+ # RotateEvent,
117
+ # StopEvent,
118
+ # TableMapEvent,
116
119
UpdateRowsEvent ,
117
120
WriteRowsEvent ,
118
- #XidEvent,
121
+ # XidEvent,
119
122
],
120
123
only_schemas = self .schemas ,
121
124
# in case we have any prefixes - this means we need to listen to all tables within specified schemas
@@ -245,6 +248,9 @@ def process_write_rows_event(self, mysql_event):
245
248
:param mysql_event: WriteRowsEvent instance
246
249
:return:
247
250
"""
251
+
252
+ logging .debug ("Received insert event for table: " + mysql_event .table )
253
+
248
254
if self .tables_prefixes :
249
255
# we have prefixes specified
250
256
# need to find whether current event is produced by table in 'looking-into-tables' list
@@ -294,10 +300,81 @@ def process_write_rows_event(self, mysql_event):
294
300
self .stat_write_rows_event_finalyse ()
295
301
296
302
def process_update_rows_event (self , mysql_event ):
297
- logging .info ("Skip update rows" )
303
+
304
+ logging .debug ("Received update event for table: " + mysql_event .table + " Schema: " + mysql_event .schema )
305
+
306
+ # for row in mysql_event.rows:
307
+ # for key in row['before_values']:
308
+ # logging.debug("\t *%s:%s=>%s" % (key, row["before_values"][key], row["after_values"][key]))
309
+
310
+ if self .tables_prefixes :
311
+ # we have prefixes specified
312
+ # need to find whether current event is produced by table in 'looking-into-tables' list
313
+ if not self .is_table_listened (mysql_event .table ):
314
+ # this table is not listened
315
+ # processing is over - just skip event
316
+ return
317
+
318
+ # statistics
319
+ #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
320
+
321
+ if self .subscribers ('UpdateRowsEvent' ):
322
+ # dispatch event to subscribers
323
+
324
+ # statistics
325
+ #self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
326
+
327
+ # dispatch Event
328
+ event = Event ()
329
+ event .schema = mysql_event .schema
330
+ event .table = mysql_event .table
331
+ event .pymysqlreplication_event = mysql_event
332
+
333
+ #self.process_first_event(event=event)
334
+ self .notify ('UpdateRowsEvent' , event = event )
335
+
336
+ # self.stat_write_rows_event_finalyse()
337
+
338
+ # logging.info("Skip update rows")
298
339
299
340
def process_delete_rows_event (self , mysql_event ):
300
- logging .info ("Skip delete rows" )
341
+ logging .debug ("Received delete event for table: " + mysql_event .table )
342
+
343
+ """
344
+ for row in mysql_event.rows:
345
+ for key in row['values']:
346
+ logging.debug("\t *", key, ":", row["values"][key])
347
+ """
348
+
349
+ if self .tables_prefixes :
350
+ # we have prefixes specified
351
+ # need to find whether current event is produced by table in 'looking-into-tables' list
352
+ if not self .is_table_listened (mysql_event .table ):
353
+ # this table is not listened
354
+ # processing is over - just skip event
355
+ return
356
+
357
+ # statistics
358
+ #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
359
+
360
+ if self .subscribers ('DeleteRowsEvent' ):
361
+ # dispatch event to subscribers
362
+
363
+ # statistics
364
+ #self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
365
+
366
+ # dispatch Event
367
+ event = Event ()
368
+ event .schema = mysql_event .schema
369
+ event .table = mysql_event .table
370
+ event .pymysqlreplication_event = mysql_event
371
+
372
+ self .process_first_event (event = event )
373
+ self .notify ('DeleteRowsEvent' , event = event )
374
+
375
+ # self.stat_write_rows_event_finalyse()
376
+
377
+ # logging.info("Skip delete rows")
301
378
302
379
def process_binlog_position (self , file , pos ):
303
380
if self .binlog_position_file :
@@ -321,14 +398,16 @@ def read(self):
321
398
self .stat_init_fetch_loop ()
322
399
323
400
try :
324
- logging .debug ('Pre-start binlog position: ' + self .binlog_stream .log_file + ":" + str (self .binlog_stream .log_pos ) if self .binlog_stream .log_pos is not None else "undef" )
401
+ logging .debug ('Pre-start binlog position: ' + self .binlog_stream .log_file + ":" + str (
402
+ self .binlog_stream .log_pos ) if self .binlog_stream .log_pos is not None else "undef" )
325
403
326
404
# fetch available events from MySQL
327
405
for mysql_event in self .binlog_stream :
328
406
# new event has come
329
407
# check what to do with it
330
408
331
- logging .debug ('Got Event ' + self .binlog_stream .log_file + ":" + str (self .binlog_stream .log_pos ))
409
+ logging .debug (
410
+ 'Got Event ' + self .binlog_stream .log_file + ":" + str (self .binlog_stream .log_pos ))
332
411
333
412
# process event based on its type
334
413
if isinstance (mysql_event , WriteRowsEvent ):
@@ -393,6 +472,7 @@ def read(self):
393
472
logging .info ('end %d' , end_timestamp )
394
473
logging .info ('len %d' , end_timestamp - self .start_timestamp )
395
474
475
+
396
476
if __name__ == '__main__' :
397
477
connection_settings = {
398
478
'host' : '127.0.0.1' ,
0 commit comments