Skip to content

enhancements #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -861,4 +861,25 @@ for file in $(ls *.csv|sort|head -n 100); do
rm -f ontime.csv
i=$((i+1))
done

#!/bin/bash
files_to_import_num=3
i=1
for file in $(ls /mnt/nas/work/ontime/*.csv|sort|head -n $files_to_import_num); do
echo "$i. Prepare $file"
rm -f ontime
ln -s $file ontime
echo "$i. Import $file"
time mysqlimport \
--ignore-lines=1 \
--fields-terminated-by=, \
--fields-enclosed-by=\" \
--local \
-u root \
airline ontime
rm -f ontime
i=$((i+1))
done


```
8 changes: 6 additions & 2 deletions run_ontime.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"

python3.6 main.py ${*:1} \
PYTHON=python3.6
PYTHON=/home/user/pypy3.5-5.9-beta-linux_x86_64-portable/bin/pypy

$PYTHON main.py ${*:1} \
--src-resume \
--src-wait \
--nice-pause=1 \
--log-level=info \
--log-file=ontime.log \
--src-host=127.0.0.1 \
--src-user=root \
--dst-host=127.0.0.1 \
--csvpool \
--csvpool-file-path-prefix=qwe_ \
--mempool-max-flush-interval=60 \
--mempool-max-events-num=100000
--mempool-max-events-num=1000

# --mempool
# --mempool-max-events-num=3
Expand Down
65 changes: 34 additions & 31 deletions src/pool/bbpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class BBPool(Pool):
# 'key.2': UNIX TIMESTAMP
}

buckets_count = 0
items_count = 0;
buckets_num_total = 0
items_num_total = 0;

prev_time = None
prev_buckets_count = 0
Expand Down Expand Up @@ -98,7 +98,7 @@ def flush(self, key=None):
def rotate_belt(self, belt_index, flush=False):
"""Try to rotate belt"""

now = int(time.time())
now = time.time()

if flush:
# explicit flush requested
Expand Down Expand Up @@ -129,26 +129,26 @@ def rotate_belt(self, belt_index, flush=False):
# too many buckets on the belt
# time to rotate belt and flush the most-right-bucket

buckets_num = len(self.belts[belt_index])
last_bucket_size = len(self.belts[belt_index][buckets_num-1])

self.buckets_count += 1
self.items_count += last_bucket_size

logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d',
now,
self.buckets_count,
self.items_count,
str(belt_index),
rotate_reason,
buckets_num,
last_bucket_size,
len(self.belts)
)
buckets_on_belt_num = len(self.belts[belt_index])
most_right_bucket_size = len(self.belts[belt_index][buckets_on_belt_num-1])

self.buckets_num_total += 1
self.items_num_total += most_right_bucket_size

logging.info('rot now:%f bktttl:%d bktitemsttl: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d',
now,
self.buckets_num_total,
self.items_num_total,
str(belt_index),
rotate_reason,
buckets_on_belt_num,
most_right_bucket_size,
len(self.belts),
)

# time to flush data for specified key
self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)])
writer = self.writer_builder.get()
self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)])
writer = self.writer_builder.new()
writer.insert(self.belts[belt_index].pop())
writer.close()
writer.push()
Expand All @@ -159,18 +159,21 @@ def rotate_belt(self, belt_index, flush=False):
# have previous time - meaning this is at least second rotate
# can calculate belt speed
window_size = now - self.prev_time
buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size
items_per_sec = (self.items_count - self.prev_items_count) / window_size
logging.info(
'buckets_per_sec:%f items_per_sec:%f for last %d sec',
buckets_per_sec,
items_per_sec,
window_size
)
if window_size > 0:
buckets_per_sec = (self.buckets_num_total - self.prev_buckets_count) / window_size
items_per_sec = (self.items_num_total - self.prev_items_count) / window_size
logging.info(
'PERF - buckets_per_sec:%f items_per_sec:%f for last %d sec',
buckets_per_sec,
items_per_sec,
window_size
)
else:
logging.info("PERF - buckets window size=0 can not calc performance for this window")

self.prev_time = now
self.prev_buckets_count = self.buckets_count
self.prev_items_count = self.items_count
self.prev_buckets_count = self.buckets_num_total
self.prev_items_count = self.items_num_total

# belt rotated
return True
75 changes: 60 additions & 15 deletions src/reader/mysqlreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,57 @@ def __init__(
resume_stream=self.resume_stream,
)

def performance_report(self, start, rows_num, rows_num_per_event_min=None, rows_num_per_event_max=None, now=None):
# time to calc stat

if now is None:
now = time.time()

window_size = now - start
if window_size > 0:
rows_per_sec = rows_num / window_size
logging.info(
'PERF - rows_per_sec:%f rows_per_event_min: %d rows_per_event_max: %d for last %d rows %f sec',
rows_per_sec,
rows_num_per_event_min if rows_num_per_event_min is not None else -1,
rows_num_per_event_max if rows_num_per_event_max is not None else -1,
rows_num,
window_size,
)
else:
logging.info("PERF - rows window size=0 can not calc performance for this window")


def read(self):
start_timestamp = int(time.time())
# fetch events
try:
prev_stat_time = time.time()
rows_num = 0

while True:
logging.debug('Check events in binlog stream')

start = time.time()
rows_num = 0
rows_num_since_interim_performance_report = 0
rows_num_per_event = 0
rows_num_per_event_min = None
rows_num_per_event_max = None


# fetch available events from MySQL
for mysql_event in self.binlog_stream:
if isinstance(mysql_event, WriteRowsEvent):

rows_num_per_event = len(mysql_event.rows)
if (rows_num_per_event_min is None) or (rows_num_per_event < rows_num_per_event_min):
rows_num_per_event_min = rows_num_per_event
if (rows_num_per_event_max is None) or (rows_num_per_event > rows_num_per_event_max):
rows_num_per_event_max = rows_num_per_event

if self.subscribers('WriteRowsEvent'):
self.write_rows_event_num += 1
logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows))
rows_num += len(mysql_event.rows)
rows_num_since_interim_performance_report += len(mysql_event.rows)
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
Expand All @@ -95,27 +131,36 @@ def read(self):
logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num)
for row in mysql_event.rows:
rows_num += 1
rows_num_since_interim_performance_report += 1
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.row = row['values']
self.notify('WriteRowsEvent.EachRow', event=event)

if rows_num_since_interim_performance_report >= 100000:
# speed report each N rows
self.performance_report(
start=start,
rows_num=rows_num,
rows_num_per_event_min=rows_num_per_event_min,
rows_num_per_event_max=rows_num_per_event_max,
)
rows_num_since_interim_performance_report = 0
rows_num_per_event_min = None
rows_num_per_event_max = None
else:
# skip non-insert events
pass

now = time.time()
if now > prev_stat_time + 60:
# time to calc stat
window_size = now - prev_stat_time
rows_per_sec = rows_num / window_size
logging.info(
'rows_per_sec:%f for last %f sec',
rows_per_sec,
window_size
)
prev_stat_time = now
rows_num = 0
# all events fetched (or none of them available)

if rows_num > 0:
# we have some rows processed
now = time.time()
if now > start + 60:
# and processing was long enough
self.performance_report(start, rows_num, now)

if not self.blocking:
break # while True
Expand Down
2 changes: 1 addition & 1 deletion src/writer/chcsvwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def insert(self, event_or_events=None):
sql,
)

# print('running:', bash)
logging.info('starting %s', bash)
os.system(bash)

pass