@@ -69,15 +69,30 @@ def __init__(
69
69
resume_stream = self .resume_stream ,
70
70
)
71
71
72
+ def speed_report (self , start , rows_num , now = None ):
73
+ # time to calc stat
74
+ if now is None :
75
+ now = time .time ()
76
+ window_size = now - start
77
+ rows_per_sec = rows_num / window_size
78
+ logging .info (
79
+ 'rows_per_sec:%f for last %d rows %f sec' ,
80
+ rows_per_sec ,
81
+ rows_num ,
82
+ window_size
83
+ )
84
+
72
85
def read (self ):
73
86
start_timestamp = int (time .time ())
74
87
# fetch events
75
88
try :
76
- prev_stat_time = time .time ()
77
- rows_num = 0
78
-
79
89
while True :
80
90
logging .debug ('Check events in binlog stream' )
91
+
92
+ start = time .time ()
93
+ rows_num = 0
94
+
95
+ # fetch available events from MySQL
81
96
for mysql_event in self .binlog_stream :
82
97
if isinstance (mysql_event , WriteRowsEvent ):
83
98
if self .subscribers ('WriteRowsEvent' ):
@@ -100,22 +115,22 @@ def read(self):
100
115
event .table = mysql_event .table
101
116
event .row = row ['values' ]
102
117
self .notify ('WriteRowsEvent.EachRow' , event = event )
118
+
119
+ if rows_num % 100000 == 0 :
120
+ # speed report each N rows
121
+ self .speed_report (start , rows_num )
103
122
else :
104
123
# skip non-insert events
105
124
pass
106
125
107
- now = time .time ()
108
- if now > prev_stat_time + 60 :
109
- # time to calc stat
110
- window_size = now - prev_stat_time
111
- rows_per_sec = rows_num / window_size
112
- logging .info (
113
- 'rows_per_sec:%f for last %f sec' ,
114
- rows_per_sec ,
115
- window_size
116
- )
117
- prev_stat_time = now
118
- rows_num = 0
126
+ # all events fetched (or none of them available)
127
+
128
+ if rows_num > 0 :
129
+ # we have some rows processed
130
+ now = time .time ()
131
+ if now > start + 60 :
132
+ # and processing was long enough
133
+ self .speed_report (start , rows_num , now )
119
134
120
135
if not self .blocking :
121
136
break # while True
0 commit comments