Skip to content

Commit 6675cd1

Browse files
committed
min and max rows per event
1 parent 3afa7a8 commit 6675cd1

File tree

2 files changed

+37
-12
lines changed

2 files changed

+37
-12
lines changed

run_ontime.sh

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"
44

5-
python3.6 main.py ${*:1} \
5+
PYTHON=python3.6
6+
PYTHON=/home/user/pypy3.5-5.9-beta-linux_x86_64-portable/bin/pypy
7+
8+
$PYTHON main.py ${*:1} \
69
--src-resume \
710
--src-wait \
811
--nice-pause=1 \
@@ -13,7 +16,7 @@ python3.6 main.py ${*:1} \
1316
--csvpool \
1417
--csvpool-file-path-prefix=qwe_ \
1518
--mempool-max-flush-interval=60 \
16-
--mempool-max-events-num=100000
19+
--mempool-max-events-num=1000
1720

1821
# --mempool
1922
# --mempool-max-events-num=3

src/reader/mysqlreader.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,21 @@ def __init__(
6969
resume_stream=self.resume_stream,
7070
)
7171

72-
def speed_report(self, start, rows_num, now=None):
72+
def performance_report(self, start, rows_num, rows_per_event_min=-1, rows_per_event_max=-1, now=None):
7373
# time to calc stat
74+
7475
if now is None:
7576
now = time.time()
77+
7678
window_size = now - start
7779
rows_per_sec = rows_num / window_size
7880
logging.info(
79-
'rows_per_sec:%f for last %d rows %f sec',
81+
'rows_per_sec:%f rows_per_event_min: %d rows_per_event_max: %d for last %d rows %f sec',
8082
rows_per_sec,
83+
rows_per_event_min,
84+
rows_per_event_max,
8185
rows_num,
82-
window_size
86+
window_size,
8387
)
8488

8589
def read(self):
@@ -91,16 +95,27 @@ def read(self):
9195

9296
start = time.time()
9397
rows_num = 0
94-
rows_num_since_interim_speed_report = 0
98+
rows_num_since_interim_performance_report = 0
99+
rows_per_event = 0
100+
rows_per_event_min = 0
101+
rows_per_event_max = 0
102+
95103

96104
# fetch available events from MySQL
97105
for mysql_event in self.binlog_stream:
98106
if isinstance(mysql_event, WriteRowsEvent):
107+
108+
rows_per_event = len(mysql_event.rows)
109+
if rows_per_event < rows_per_event_min:
110+
rows_per_event_min = rows_per_event
111+
if rows_per_event > rows_per_event_max:
112+
rows_per_event_max = rows_per_event
113+
99114
if self.subscribers('WriteRowsEvent'):
100115
self.write_rows_event_num += 1
101116
logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows))
102117
rows_num += len(mysql_event.rows)
103-
rows_num_since_interim_speed_report += len(mysql_event.rows)
118+
rows_num_since_interim_performance_report += len(mysql_event.rows)
104119
event = Event()
105120
event.schema = mysql_event.schema
106121
event.table = mysql_event.table
@@ -112,17 +127,24 @@ def read(self):
112127
logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num)
113128
for row in mysql_event.rows:
114129
rows_num += 1
115-
rows_num_since_interim_speed_report += 1
130+
rows_num_since_interim_performance_report += 1
116131
event = Event()
117132
event.schema = mysql_event.schema
118133
event.table = mysql_event.table
119134
event.row = row['values']
120135
self.notify('WriteRowsEvent.EachRow', event=event)
121136

122-
if rows_num_since_interim_speed_report >= 100000:
137+
if rows_num_since_interim_performance_report >= 100000:
123138
# speed report each N rows
124-
self.speed_report(start, rows_num)
125-
rows_num_since_interim_speed_report = 0
139+
self.performance_report(
140+
start=start,
141+
rows_num=rows_num,
142+
rows_per_event_min=rows_per_event_min,
143+
rows_per_event_max=rows_per_event_max,
144+
)
145+
rows_num_since_interim_performance_report = 0
146+
rows_per_event_min = 0
147+
rows_per_event_max = 0
126148
else:
127149
# skip non-insert events
128150
pass
@@ -134,7 +156,7 @@ def read(self):
134156
now = time.time()
135157
if now > start + 60:
136158
# and processing was long enough
137-
self.speed_report(start, rows_num, now)
159+
self.performance_report(start, rows_num, now)
138160

139161
if not self.blocking:
140162
break # while True

0 commit comments

Comments
 (0)