Skip to content

Commit 6a2b00d

Browse files
authored
Merge pull request #34 from sunsingerus/master
enhancements
2 parents f064beb + 607ef88 commit 6a2b00d

File tree

5 files changed

+122
-49
lines changed

5 files changed

+122
-49
lines changed

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,4 +861,25 @@ for file in $(ls *.csv|sort|head -n 100); do
861861
rm -f ontime.csv
862862
i=$((i+1))
863863
done
864+
865+
#!/bin/bash
866+
files_to_import_num=3
867+
i=1
868+
for file in $(ls /mnt/nas/work/ontime/*.csv|sort|head -n $files_to_import_num); do
869+
echo "$i. Prepare $file"
870+
rm -f ontime
871+
ln -s $file ontime
872+
echo "$i. Import $file"
873+
time mysqlimport \
874+
--ignore-lines=1 \
875+
--fields-terminated-by=, \
876+
--fields-enclosed-by=\" \
877+
--local \
878+
-u root \
879+
airline ontime
880+
rm -f ontime
881+
i=$((i+1))
882+
done
883+
884+
864885
```

run_ontime.sh

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22

33
sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"
44

5-
python3.6 main.py ${*:1} \
5+
PYTHON=python3.6
6+
PYTHON=/home/user/pypy3.5-5.9-beta-linux_x86_64-portable/bin/pypy
7+
8+
$PYTHON main.py ${*:1} \
69
--src-resume \
710
--src-wait \
811
--nice-pause=1 \
912
--log-level=info \
13+
--log-file=ontime.log \
1014
--src-host=127.0.0.1 \
1115
--src-user=root \
1216
--dst-host=127.0.0.1 \
1317
--csvpool \
1418
--csvpool-file-path-prefix=qwe_ \
1519
--mempool-max-flush-interval=60 \
16-
--mempool-max-events-num=100000
20+
--mempool-max-events-num=1000
1721

1822
# --mempool
1923
# --mempool-max-events-num=3

src/pool/bbpool.py

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ class BBPool(Pool):
3131
# 'key.2': UNIX TIMESTAMP
3232
}
3333

34-
buckets_count = 0
35-
items_count = 0;
34+
buckets_num_total = 0
35+
items_num_total = 0;
3636

3737
prev_time = None
3838
prev_buckets_count = 0
@@ -98,7 +98,7 @@ def flush(self, key=None):
9898
def rotate_belt(self, belt_index, flush=False):
9999
"""Try to rotate belt"""
100100

101-
now = int(time.time())
101+
now = time.time()
102102

103103
if flush:
104104
# explicit flush requested
@@ -129,26 +129,26 @@ def rotate_belt(self, belt_index, flush=False):
129129
# too many buckets on the belt
130130
# time to rotate belt and flush the most-right-bucket
131131

132-
buckets_num = len(self.belts[belt_index])
133-
last_bucket_size = len(self.belts[belt_index][buckets_num-1])
134-
135-
self.buckets_count += 1
136-
self.items_count += last_bucket_size
137-
138-
logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d',
139-
now,
140-
self.buckets_count,
141-
self.items_count,
142-
str(belt_index),
143-
rotate_reason,
144-
buckets_num,
145-
last_bucket_size,
146-
len(self.belts)
147-
)
132+
buckets_on_belt_num = len(self.belts[belt_index])
133+
most_right_bucket_size = len(self.belts[belt_index][buckets_on_belt_num-1])
134+
135+
self.buckets_num_total += 1
136+
self.items_num_total += most_right_bucket_size
137+
138+
logging.info('rot now:%f bktttl:%d bktitemsttl: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d',
139+
now,
140+
self.buckets_num_total,
141+
self.items_num_total,
142+
str(belt_index),
143+
rotate_reason,
144+
buckets_on_belt_num,
145+
most_right_bucket_size,
146+
len(self.belts),
147+
)
148148

149149
# time to flush data for specified key
150-
self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)])
151-
writer = self.writer_builder.get()
150+
self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)])
151+
writer = self.writer_builder.new()
152152
writer.insert(self.belts[belt_index].pop())
153153
writer.close()
154154
writer.push()
@@ -159,18 +159,21 @@ def rotate_belt(self, belt_index, flush=False):
159159
# have previous time - meaning this is at least second rotate
160160
# can calculate belt speed
161161
window_size = now - self.prev_time
162-
buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size
163-
items_per_sec = (self.items_count - self.prev_items_count) / window_size
164-
logging.info(
165-
'buckets_per_sec:%f items_per_sec:%f for last %d sec',
166-
buckets_per_sec,
167-
items_per_sec,
168-
window_size
169-
)
162+
if window_size > 0:
163+
buckets_per_sec = (self.buckets_num_total - self.prev_buckets_count) / window_size
164+
items_per_sec = (self.items_num_total - self.prev_items_count) / window_size
165+
logging.info(
166+
'PERF - buckets_per_sec:%f items_per_sec:%f for last %d sec',
167+
buckets_per_sec,
168+
items_per_sec,
169+
window_size
170+
)
171+
else:
172+
logging.info("PERF - buckets window size=0 can not calc performance for this window")
170173

171174
self.prev_time = now
172-
self.prev_buckets_count = self.buckets_count
173-
self.prev_items_count = self.items_count
175+
self.prev_buckets_count = self.buckets_num_total
176+
self.prev_items_count = self.items_num_total
174177

175178
# belt rotated
176179
return True

src/reader/mysqlreader.py

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,21 +69,57 @@ def __init__(
6969
resume_stream=self.resume_stream,
7070
)
7171

72+
def performance_report(self, start, rows_num, rows_num_per_event_min=None, rows_num_per_event_max=None, now=None):
73+
# time to calc stat
74+
75+
if now is None:
76+
now = time.time()
77+
78+
window_size = now - start
79+
if window_size > 0:
80+
rows_per_sec = rows_num / window_size
81+
logging.info(
82+
'PERF - rows_per_sec:%f rows_per_event_min: %d rows_per_event_max: %d for last %d rows %f sec',
83+
rows_per_sec,
84+
rows_num_per_event_min if rows_num_per_event_min is not None else -1,
85+
rows_num_per_event_max if rows_num_per_event_max is not None else -1,
86+
rows_num,
87+
window_size,
88+
)
89+
else:
90+
logging.info("PERF - rows window size=0 can not calc performance for this window")
91+
92+
7293
def read(self):
7394
start_timestamp = int(time.time())
7495
# fetch events
7596
try:
76-
prev_stat_time = time.time()
77-
rows_num = 0
78-
7997
while True:
8098
logging.debug('Check events in binlog stream')
99+
100+
start = time.time()
101+
rows_num = 0
102+
rows_num_since_interim_performance_report = 0
103+
rows_num_per_event = 0
104+
rows_num_per_event_min = None
105+
rows_num_per_event_max = None
106+
107+
108+
# fetch available events from MySQL
81109
for mysql_event in self.binlog_stream:
82110
if isinstance(mysql_event, WriteRowsEvent):
111+
112+
rows_num_per_event = len(mysql_event.rows)
113+
if (rows_num_per_event_min is None) or (rows_num_per_event < rows_num_per_event_min):
114+
rows_num_per_event_min = rows_num_per_event
115+
if (rows_num_per_event_max is None) or (rows_num_per_event > rows_num_per_event_max):
116+
rows_num_per_event_max = rows_num_per_event
117+
83118
if self.subscribers('WriteRowsEvent'):
84119
self.write_rows_event_num += 1
85120
logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows))
86121
rows_num += len(mysql_event.rows)
122+
rows_num_since_interim_performance_report += len(mysql_event.rows)
87123
event = Event()
88124
event.schema = mysql_event.schema
89125
event.table = mysql_event.table
@@ -95,27 +131,36 @@ def read(self):
95131
logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num)
96132
for row in mysql_event.rows:
97133
rows_num += 1
134+
rows_num_since_interim_performance_report += 1
98135
event = Event()
99136
event.schema = mysql_event.schema
100137
event.table = mysql_event.table
101138
event.row = row['values']
102139
self.notify('WriteRowsEvent.EachRow', event=event)
140+
141+
if rows_num_since_interim_performance_report >= 100000:
142+
# speed report each N rows
143+
self.performance_report(
144+
start=start,
145+
rows_num=rows_num,
146+
rows_num_per_event_min=rows_num_per_event_min,
147+
rows_num_per_event_max=rows_num_per_event_max,
148+
)
149+
rows_num_since_interim_performance_report = 0
150+
rows_num_per_event_min = None
151+
rows_num_per_event_max = None
103152
else:
104153
# skip non-insert events
105154
pass
106155

107-
now = time.time()
108-
if now > prev_stat_time + 60:
109-
# time to calc stat
110-
window_size = now - prev_stat_time
111-
rows_per_sec = rows_num / window_size
112-
logging.info(
113-
'rows_per_sec:%f for last %f sec',
114-
rows_per_sec,
115-
window_size
116-
)
117-
prev_stat_time = now
118-
rows_num = 0
156+
# all events fetched (or none of them available)
157+
158+
if rows_num > 0:
159+
# we have some rows processed
160+
now = time.time()
161+
if now > start + 60:
162+
# and processing was long enough
163+
self.performance_report(start, rows_num, now)
119164

120165
if not self.blocking:
121166
break # while True

src/writer/chcsvwriter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def insert(self, event_or_events=None):
5858
sql,
5959
)
6060

61-
# print('running:', bash)
61+
logging.info('starting %s', bash)
6262
os.system(bash)
6363

6464
pass

0 commit comments

Comments
 (0)