12
12
from clickhouse_mysql .event .event import Event
13
13
from clickhouse_mysql .tableprocessor import TableProcessor
14
14
from clickhouse_mysql .util import Util
15
- from pymysqlreplication .event import QueryEvent , RotateEvent , FormatDescriptionEvent
16
15
17
16
18
17
class MySQLReader (Reader ):
@@ -29,9 +28,10 @@ class MySQLReader(Reader):
29
28
resume_stream = None
30
29
binlog_stream = None
31
30
nice_pause = 0
31
+ exit_gracefully = False
32
32
33
33
write_rows_event_num = 0
34
- write_rows_event_each_row_num = 0 ;
34
+ write_rows_event_each_row_num = 0
35
35
36
36
binlog_position_file = None
37
37
@@ -316,21 +316,21 @@ def process_update_rows_event(self, mysql_event):
316
316
return
317
317
318
318
# statistics
319
- # self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
319
+ self .stat_write_rows_event_calc_rows_num_min_max (rows_num_per_event = len (mysql_event .rows ))
320
320
321
321
if self .subscribers ('UpdateRowsEvent' ):
322
322
# dispatch event to subscribers
323
323
324
324
# statistics
325
- #self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
325
+ # self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
326
326
327
327
# dispatch Event
328
328
event = Event ()
329
329
event .schema = mysql_event .schema
330
330
event .table = mysql_event .table
331
331
event .pymysqlreplication_event = mysql_event
332
332
333
- # self.process_first_event(event=event)
333
+ self .process_first_event (event = event )
334
334
self .notify ('UpdateRowsEvent' , event = event )
335
335
336
336
# self.stat_write_rows_event_finalyse()
@@ -355,13 +355,13 @@ def process_delete_rows_event(self, mysql_event):
355
355
return
356
356
357
357
# statistics
358
- #self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
358
+ # self.stat_write_rows_event_calc_rows_num_min_max(rows_num_per_event=len(mysql_event.rows))
359
359
360
360
if self .subscribers ('DeleteRowsEvent' ):
361
361
# dispatch event to subscribers
362
362
363
363
# statistics
364
- #self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
364
+ # self.stat_write_rows_event_all_rows(mysql_event=mysql_event)
365
365
366
366
# dispatch Event
367
367
event = Event ()
@@ -389,7 +389,7 @@ def read(self):
389
389
390
390
# fetch events
391
391
try :
392
- while True :
392
+ while not self . exit_gracefully :
393
393
logging .debug ('Check events in binlog stream' )
394
394
395
395
self .init_fetch_loop ()
@@ -403,8 +403,9 @@ def read(self):
403
403
404
404
# fetch available events from MySQL
405
405
for mysql_event in self .binlog_stream :
406
- # new event has come
407
- # check what to do with it
406
+
407
+ if self .exit_gracefully :
408
+ break
408
409
409
410
logging .debug (
410
411
'Got Event ' + self .binlog_stream .log_file + ":" + str (self .binlog_stream .log_pos ))
@@ -420,23 +421,19 @@ def read(self):
420
421
# skip other unhandled events
421
422
pass
422
423
423
- # after event processed, we need to handle current binlog position
424
- self .process_binlog_position (self .binlog_stream .log_file , self .binlog_stream .log_pos )
424
+ # after event processed, we need to handle current binlog position
425
+ self .process_binlog_position (self .binlog_stream .log_file , self .binlog_stream .log_pos )
425
426
426
- except KeyboardInterrupt :
427
- # pass SIGINT further
428
- logging .info ("SIGINT received. Pass it further." )
429
- raise
430
427
except Exception as ex :
431
428
if self .blocking :
432
429
# we'd like to continue waiting for data
433
430
# report and continue cycle
434
431
logging .warning ("Got an exception, skip it in blocking mode" )
435
- logging .warning (ex )
432
+ logging .exception (ex )
436
433
else :
437
434
# do not continue, report error and exit
438
435
logging .critical ("Got an exception, abort it in non-blocking mode" )
439
- logging .critical (ex )
436
+ logging .exception (ex )
440
437
sys .exit (1 )
441
438
442
439
# all events fetched (or none of them available)
@@ -453,25 +450,28 @@ def read(self):
453
450
time .sleep (self .nice_pause )
454
451
455
452
self .notify ('ReaderIdleEvent' )
456
-
457
- except KeyboardInterrupt :
458
- logging .info ("SIGINT received. Time to exit." )
459
453
except Exception as ex :
460
454
logging .warning ("Got an exception, handle it" )
461
- logging .warning (ex )
455
+ logging .exception (ex )
462
456
463
457
try :
464
458
self .binlog_stream .close ()
459
+ logging .info ("Stop reading from MySQL" )
465
460
except Exception as ex :
466
461
logging .warning ("Unable to close binlog stream correctly" )
467
- logging .warning (ex )
462
+ logging .exception (ex )
468
463
469
464
end_timestamp = int (time .time ())
470
465
471
466
logging .info ('start %d' , self .start_timestamp )
472
467
logging .info ('end %d' , end_timestamp )
473
468
logging .info ('len %d' , end_timestamp - self .start_timestamp )
474
469
470
+ def close (self ):
471
+ self .exit_gracefully = True
472
+ self .nice_pause = 0
473
+ logging .info ("MySQL should stop in the next loop" )
474
+
475
475
476
476
if __name__ == '__main__' :
477
477
connection_settings = {
0 commit comments