@@ -91,6 +91,7 @@ def read(self):
91
91
92
92
start = time .time ()
93
93
rows_num = 0
94
+ rows_num_since_interim_speed_report = 0
94
95
95
96
# fetch available events from MySQL
96
97
for mysql_event in self .binlog_stream :
@@ -99,6 +100,7 @@ def read(self):
99
100
self .write_rows_event_num += 1
100
101
logging .debug ('WriteRowsEvent #%d rows: %d' , self .write_rows_event_num , len (mysql_event .rows ))
101
102
rows_num += len (mysql_event .rows )
103
+ rows_num_since_interim_speed_report += len (mysql_event .rows )
102
104
event = Event ()
103
105
event .schema = mysql_event .schema
104
106
event .table = mysql_event .table
@@ -110,15 +112,17 @@ def read(self):
110
112
logging .debug ('WriteRowsEvent.EachRow #%d' , self .write_rows_event_each_row_num )
111
113
for row in mysql_event .rows :
112
114
rows_num += 1
115
+ rows_num_since_interim_speed_report += 1
113
116
event = Event ()
114
117
event .schema = mysql_event .schema
115
118
event .table = mysql_event .table
116
119
event .row = row ['values' ]
117
120
self .notify ('WriteRowsEvent.EachRow' , event = event )
118
121
119
- if rows_num % 100000 == 0 :
122
+ if rows_num_since_interim_speed_report >= 100000 :
120
123
# speed report each N rows
121
124
self .speed_report (start , rows_num )
125
+ rows_num_since_interim_speed_report = 0
122
126
else :
123
127
# skip non-insert events
124
128
pass
0 commit comments