diff --git a/README.md b/README.md index 5da5e9c..8b57571 100644 --- a/README.md +++ b/README.md @@ -861,4 +861,25 @@ for file in $(ls *.csv|sort|head -n 100); do rm -f ontime.csv i=$((i+1)) done + +#!/bin/bash +files_to_import_num=3 +i=1 +for file in $(ls /mnt/nas/work/ontime/*.csv|sort|head -n $files_to_import_num); do + echo "$i. Prepare $file" + rm -f ontime + ln -s $file ontime + echo "$i. Import $file" + time mysqlimport \ + --ignore-lines=1 \ + --fields-terminated-by=, \ + --fields-enclosed-by=\" \ + --local \ + -u root \ + airline ontime + rm -f ontime + i=$((i+1)) +done + + ``` diff --git a/run_ontime.sh b/run_ontime.sh index 520ad4a..837ed97 100755 --- a/run_ontime.sh +++ b/run_ontime.sh @@ -2,18 +2,22 @@ sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" -python3.6 main.py ${*:1} \ +PYTHON=python3.6 +PYTHON=/home/user/pypy3.5-5.9-beta-linux_x86_64-portable/bin/pypy + +$PYTHON main.py ${*:1} \ --src-resume \ --src-wait \ --nice-pause=1 \ --log-level=info \ + --log-file=ontime.log \ --src-host=127.0.0.1 \ --src-user=root \ --dst-host=127.0.0.1 \ --csvpool \ --csvpool-file-path-prefix=qwe_ \ --mempool-max-flush-interval=60 \ - --mempool-max-events-num=100000 + --mempool-max-events-num=1000 # --mempool # --mempool-max-events-num=3 diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 82cda55..e628ce5 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -31,8 +31,8 @@ class BBPool(Pool): # 'key.2': UNIX TIMESTAMP } - buckets_count = 0 - items_count = 0; + buckets_num_total = 0 + items_num_total = 0; prev_time = None prev_buckets_count = 0 @@ -98,7 +98,7 @@ def flush(self, key=None): def rotate_belt(self, belt_index, flush=False): """Try to rotate belt""" - now = int(time.time()) + now = time.time() if flush: # explicit flush requested @@ -129,26 +129,26 @@ def rotate_belt(self, belt_index, flush=False): # too many buckets on the belt # time to rotate belt and flush the most-right-bucket - buckets_num = len(self.belts[belt_index]) - last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - - self.buckets_count += 1 - self.items_count += last_bucket_size - - logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d', - now, - self.buckets_count, - self.items_count, - str(belt_index), - rotate_reason, - buckets_num, - last_bucket_size, - len(self.belts) - ) + buckets_on_belt_num = len(self.belts[belt_index]) + most_right_bucket_size = len(self.belts[belt_index][buckets_on_belt_num-1]) + + self.buckets_num_total += 1 + self.items_num_total += most_right_bucket_size + + logging.info('rot now:%f bktttl:%d bktitemsttl: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d', + now, + self.buckets_num_total, + self.items_num_total, + str(belt_index), + rotate_reason, + buckets_on_belt_num, + most_right_bucket_size, + len(self.belts), + ) # time to flush data for specified key - self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)]) - writer = self.writer_builder.get() + self.writer_builder.param('csv_file_path_suffix_parts', [str(int(now)), str(self.buckets_num_total)]) + writer = self.writer_builder.new() writer.insert(self.belts[belt_index].pop()) writer.close() writer.push() @@ -159,18 +159,21 @@ def rotate_belt(self, belt_index, flush=False): # have previous time - meaning this is at least second rotate # can calculate belt speed window_size = now - self.prev_time - buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size - items_per_sec = (self.items_count - self.prev_items_count) / window_size - logging.info( - 'buckets_per_sec:%f items_per_sec:%f for last %d sec', - buckets_per_sec, - items_per_sec, - window_size - ) + if window_size > 0: + buckets_per_sec = (self.buckets_num_total - self.prev_buckets_count) / window_size + items_per_sec = (self.items_num_total - self.prev_items_count) / window_size + logging.info( + 'PERF - buckets_per_sec:%f items_per_sec:%f for last %d sec', + buckets_per_sec, + items_per_sec, + window_size + ) + else: + logging.info("PERF - buckets window size=0 can not calc performance for this window") self.prev_time = now - self.prev_buckets_count = self.buckets_count - self.prev_items_count = self.items_count + self.prev_buckets_count = self.buckets_num_total + self.prev_items_count = self.items_num_total # belt rotated return True diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index f5589c5..4f2ff4d 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -69,21 +69,57 @@ def __init__( resume_stream=self.resume_stream, ) + def performance_report(self, start, rows_num, rows_num_per_event_min=None, rows_num_per_event_max=None, now=None): + # time to calc stat + + if now is None: + now = time.time() + + window_size = now - start + if window_size > 0: + rows_per_sec = rows_num / window_size + logging.info( + 'PERF - rows_per_sec:%f rows_per_event_min: %d rows_per_event_max: %d for last %d rows %f sec', + rows_per_sec, + rows_num_per_event_min if rows_num_per_event_min is not None else -1, + rows_num_per_event_max if rows_num_per_event_max is not None else -1, + rows_num, + window_size, + ) + else: + logging.info("PERF - rows window size=0 can not calc performance for this window") + + def read(self): start_timestamp = int(time.time()) # fetch events try: - prev_stat_time = time.time() - rows_num = 0 - while True: logging.debug('Check events in binlog stream') + + start = time.time() + rows_num = 0 + rows_num_since_interim_performance_report = 0 + rows_num_per_event = 0 + rows_num_per_event_min = None + rows_num_per_event_max = None + + + # fetch available events from MySQL for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): + + rows_num_per_event = len(mysql_event.rows) + if (rows_num_per_event_min is None) or (rows_num_per_event < rows_num_per_event_min): + rows_num_per_event_min = rows_num_per_event + if (rows_num_per_event_max is None) or (rows_num_per_event > rows_num_per_event_max): + rows_num_per_event_max = rows_num_per_event + if self.subscribers('WriteRowsEvent'): self.write_rows_event_num += 1 logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) rows_num += len(mysql_event.rows) + rows_num_since_interim_performance_report += len(mysql_event.rows) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -95,27 +131,36 @@ def read(self): logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) for row in mysql_event.rows: rows_num += 1 + rows_num_since_interim_performance_report += 1 event = Event() event.schema = mysql_event.schema event.table = mysql_event.table event.row = row['values'] self.notify('WriteRowsEvent.EachRow', event=event) + + if rows_num_since_interim_performance_report >= 100000: + # speed report each N rows + self.performance_report( + start=start, + rows_num=rows_num, + rows_num_per_event_min=rows_num_per_event_min, + rows_num_per_event_max=rows_num_per_event_max, + ) + rows_num_since_interim_performance_report = 0 + rows_num_per_event_min = None + rows_num_per_event_max = None else: # skip non-insert events pass - now = time.time() - if now > prev_stat_time + 60: - # time to calc stat - window_size = now - prev_stat_time - rows_per_sec = rows_num / window_size - logging.info( - 'rows_per_sec:%f for last %f sec', - rows_per_sec, - window_size - ) - prev_stat_time = now - rows_num = 0 + # all events fetched (or none of them available) + + if rows_num > 0: + # we have some rows processed + now = time.time() + if now > start + 60: + # and processing was long enough + self.performance_report(start, rows_num, now) if not self.blocking: break # while True diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py index d9390fb..9ca27d7 100644 --- a/src/writer/chcsvwriter.py +++ b/src/writer/chcsvwriter.py @@ -58,7 +58,7 @@ def insert(self, event_or_events=None): sql, ) -# print('running:', bash) + logging.info('starting %s', bash) os.system(bash) pass