From c7d1ceb45dd45b0332d09fbd33b96e776785cce9 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 11:18:50 +0300 Subject: [PATCH 1/8] minor md formatting --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f54aa16..d0e5319 100644 --- a/README.md +++ b/README.md @@ -185,7 +185,7 @@ binlog-format = row #Very important if you want to receive write, update and ## MySQL Test Tables We have to separate test table into several ones because of this error, produced by MySQL: -```bash +```text ERROR 1118 (42000): Row size too large. The maximum row size for the used table type, not counting BLOBs, is 65535. This includes storage overhead, check the manual. You have to change some columns to TEXT or BLOBs ``` From 25e7e2eb5046463fd386f6b49d2f334db8a55998 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 16:43:19 +0300 Subject: [PATCH 2/8] rotating refactoring --- README.md | 1 + main.py | 17 ++++++++----- run.sh | 2 ++ src/cliopts.py | 36 ++++++++++++++++++++++++++- src/config.py | 6 +++++ src/observable.py | 3 ++- src/pool/bbpool.py | 52 +++++++++++++++++++++++++-------------- src/reader/mysqlreader.py | 3 +++ 8 files changed, 93 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index d0e5319..5da5e9c 100644 --- a/README.md +++ b/README.md @@ -727,6 +727,7 @@ CREATE TABLE IF NOT EXISTS `airline`.`ontime` ( ### ClickHouse Table ```sql +CREATE DATABASE IF NOT EXISTS `airline`; CREATE TABLE IF NOT EXISTS `airline`.`ontime` ( `Year` UInt16, `Quarter` UInt8, diff --git a/main.py b/main.py index 77d6190..100507c 100755 --- a/main.py +++ b/main.py @@ -7,12 +7,14 @@ import sys import multiprocessing as mp - +import logging +import pprint if sys.version_info[0] < 3: raise "Must be using Python 3" + class Main(Daemon): config = None @@ -20,11 +22,14 @@ class Main(Daemon): def __init__(self): mp.set_start_method('forkserver') self.config = CLIOpts.config() - super().__init__(pidfile=self.config.pid_file()) - print('---') - print(self.config) - print('---') + logging.basicConfig( + filename=self.config.log_file(), + level=self.config.log_level(), + format='%(asctime)s - %(levelname)s - %(message)s' + ) + super().__init__(pidfile=self.config.pid_file()) + logging.debug(pprint.pformat(self.config.config)) def run(self): pumper = Pumper( @@ -36,7 +41,7 @@ def run(self): def start(self): if self.config.is_daemon(): if not super().start(): - print("Error going background. The process already running?") + logging.error("Error going background. The process already running?") else: self.run() diff --git a/run.sh b/run.sh index 97fbe28..c4ce0c4 100755 --- a/run.sh +++ b/run.sh @@ -1,5 +1,7 @@ #!/bin/bash +sudo echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse + python3 main.py \ --src-resume \ --src-wait \ diff --git a/src/cliopts.py b/src/cliopts.py index 561ca2b..b643f79 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -3,6 +3,7 @@ import argparse from .config import Config +import logging class CLIOpts(object): @@ -41,6 +42,25 @@ def join(lists_to_join): else: return None + @staticmethod + def log_level_from_string(log_level_string): + level = log_level_string.upper() + + if level == 'CRITICAL': + return logging.CRITICAL + elif level == 'ERROR': + return logging.ERROR + elif level == 'WARNING': + return logging.WARNING + elif level == 'INFO': + return logging.INFO + elif level == 'DEBUG': + return logging.DEBUG + elif level == 'NOTSET': + return logging.NOTSET + else: + return logging.NOTSET + @staticmethod def config(): """Parse application's CLI options into options dictionary @@ -55,9 +75,21 @@ def config(): argparser.add_argument( '--config-file', type=str, - default='', + default=None, help='Path to config file. Default - not specified' ) + argparser.add_argument( + '--log-file', + type=str, + default=None, + help='Path to log file. Default - not specified' + ) + argparser.add_argument( + '--log-level', + type=str, + default="NOTSET", + help='Log Level. Default - NOTSET' + ) argparser.add_argument( '--dry', action='store_true', @@ -226,6 +258,8 @@ def config(): return Config ({ 'app-config': { 'config-file': args.config_file, + 'log-file': args.log_file, + 'log-level': CLIOpts.log_level_from_string(args.log_level), 'dry': args.dry, 'daemon': args.daemon, 'pid_file': args.pid_file, diff --git a/src/config.py b/src/config.py index 06e19c9..1a256ba 100644 --- a/src/config.py +++ b/src/config.py @@ -28,6 +28,12 @@ def __str__(self): def __getitem__(self, item): return self.config[item] + def log_file(self): + return self.config['app-config']['log-file'] + + def log_level(self): + return self.config['app-config']['log-level'] + def pid_file(self): return self.config['app-config']['pid_file'] diff --git a/src/observable.py b/src/observable.py index d8b9495..e92b415 100644 --- a/src/observable.py +++ b/src/observable.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import logging class Observable(object): @@ -25,4 +26,4 @@ def notify(self, event_name, **attrs): callback(**attrs) def subscribers(self, event_name): - return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0) \ No newline at end of file + return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0) diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 76a8b94..34580cd 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -5,6 +5,7 @@ from .pool import Pool from ..objectbuilder import ObjectBuilder +import logging # Buckets Belts' Index Generator @@ -20,14 +21,14 @@ class BBPool(Pool): # buckets on the belts belts = { -# pour data into 0-index bucket -# 'key.1': [[item,], [item, item, item,], [item, item, item,]] -# 'key.2': [[item,], [item, item, item,], [item, item, item,]] + # pour data into 0-index bucket + # 'key.1': [[item,], [item, item, item,], [item, item, item,]] + # 'key.2': [[item,], [item, item, item,], [item, item, item,]] } belts_rotated_at = { -# 'key.1': UNIX TIMESTAMP -# 'key.2': UNIX TIMESTAMP + # 'key.1': UNIX TIMESTAMP + # 'key.2': UNIX TIMESTAMP } buckets_count = 0 @@ -49,11 +50,14 @@ def __init__( ) def create_belt(self, belt_index): - # create belt with one empty bucket + """create belt with one empty bucket""" + self.belts[belt_index] = [[]] self.belts_rotated_at[belt_index] = int(time.time()) def insert(self, item): + """Insert item into pool""" + # which belt we'll insert item? belt_index = self.key_generator.generate(item) @@ -64,12 +68,12 @@ def insert(self, item): # append item to the 0-indexed bucket of the specified belt self.belts[belt_index][0].append(item) - # may be bucket is already full - if len(self.belts[belt_index][0]) >= self.max_bucket_size: - # bucket full, rotate the belt - self.rotate_belt(belt_index) + # try to rotate belt - may it it already should be rotated + self.rotate_belt(belt_index) def flush(self, key=None): + """Flush all buckets from the belt and delete the belt itself""" + belt_index = key empty_belts_indexes = [] @@ -87,25 +91,27 @@ def flush(self, key=None): self.belts_rotated_at.pop(b_index) def rotate_belt(self, belt_index, flush=False): + """Try to rotate belt""" + now = int(time.time()) - need_rotation = True if flush else False - rotate_reason = "FLUSH" - if len(self.belts[belt_index][0]) >= self.max_bucket_size: + if flush: + # explicit flush requested + rotate_reason = "FLUSH" + + elif len(self.belts[belt_index][0]) >= self.max_bucket_size: # 0-index bucket is full - need_rotation = True rotate_reason = "SIZE" elif now >= self.belts_rotated_at[belt_index] + self.max_interval_between_rotations: # time interval reached - need_rotation = True rotate_reason = "TIME" - if not need_rotation: - # belt not rotated + else: + # no need to rotate belt return False - # belts needs rotation + # belt(s) needs rotation # insert empty bucket into the beginning of the belt self.belts[belt_index].insert(0, []) @@ -121,7 +127,15 @@ def rotate_belt(self, belt_index, flush=False): buckets_num = len(self.belts[belt_index]) last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - print('rotating belt. now:', now, 'bucket number:', self.buckets_count, 'index:', belt_index, 'reason:', rotate_reason, 'buckets on belt:', buckets_num, 'last bucket size:', last_bucket_size, 'belts count:', len(self.belts)) + logging.info('rbelt. now:%d bucket:%d index:%d reason:%s bucketsonbelt:%d lastbucketsize:%d beltsnum:%d', + now, + self.buckets_count, + belt_index, + rotate_reason, + buckets_num, + last_bucket_size, + len(self.belts) + ) # time to flush data for specified key self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)]) diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index ba13fef..a5bb692 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -7,6 +7,7 @@ from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent #from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent import time +import logging class MySQLReader(Reader): @@ -70,6 +71,7 @@ def read(self): for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): if self.subscribers('WriteRowsEvent'): + logging.info('WriteRowsEvent rows: %d', len(mysql_event.rows)) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -79,6 +81,7 @@ def read(self): self.notify('WriteRowsEvent', event=event) if self.subscribers('WriteRowsEvent.EachRow'): + logging.info('firing WriteRowsEvent.EachRow') for row in mysql_event.rows: event = Event() event.schema = mysql_event.schema From 4457dc139a97da0fc822802a40e65c5152c65b69 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 19:28:37 +0300 Subject: [PATCH 3/8] run_* scripts --- run.sh => run_datatypes.sh | 2 +- run_ontime.sh | 22 ++++++++++++++++++++++ src/pool/bbpool.py | 2 +- 3 files changed, 24 insertions(+), 2 deletions(-) rename run.sh => run_datatypes.sh (92%) create mode 100755 run_ontime.sh diff --git a/run.sh b/run_datatypes.sh similarity index 92% rename from run.sh rename to run_datatypes.sh index c4ce0c4..2675ff4 100755 --- a/run.sh +++ b/run_datatypes.sh @@ -1,6 +1,6 @@ #!/bin/bash -sudo echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse +sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" python3 main.py \ --src-resume \ diff --git a/run_ontime.sh b/run_ontime.sh new file mode 100755 index 0000000..9626f37 --- /dev/null +++ b/run_ontime.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" + +python3 main.py \ + --src-resume \ + --src-wait \ + --src-host=127.0.0.1 \ + --src-user=root \ + --dst-host=127.0.0.1 \ + --csvpool \ + --csvpool-file-path-prefix=qwe_ \ + --mempool-max-flush-interval=300 \ + --mempool-max-events-num=100000 + +# --mempool +# --mempool-max-events-num=3 +# --mempool-max-flush-interval=30 +# --dst-file=dst.csv +# --dst-schema=db +# --dst-table=datatypes +# --csvpool-keep-files diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 34580cd..0469a0b 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -108,7 +108,7 @@ def rotate_belt(self, belt_index, flush=False): rotate_reason = "TIME" else: - # no need to rotate belt + # no need to rotate belt - belt not rotated return False # belt(s) needs rotation From 0def9b134050e53953bd4f9840d68fbb1a58bfcd Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 19:50:01 +0300 Subject: [PATCH 4/8] nice pause param --- run_ontime.sh | 5 +++-- src/cliopts.py | 8 ++++++++ src/config.py | 3 +++ src/pool/bbpool.py | 4 ++-- src/reader/mysqlreader.py | 16 ++++++++++++++-- 5 files changed, 30 insertions(+), 6 deletions(-) diff --git a/run_ontime.sh b/run_ontime.sh index 9626f37..76e3c29 100755 --- a/run_ontime.sh +++ b/run_ontime.sh @@ -2,15 +2,16 @@ sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" -python3 main.py \ +python3.6 main.py \ --src-resume \ --src-wait \ + --nice-pause=1 \ --src-host=127.0.0.1 \ --src-user=root \ --dst-host=127.0.0.1 \ --csvpool \ --csvpool-file-path-prefix=qwe_ \ - --mempool-max-flush-interval=300 \ + --mempool-max-flush-interval=10 \ --mempool-max-events-num=100000 # --mempool diff --git a/src/cliopts.py b/src/cliopts.py index b643f79..7237146 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -90,6 +90,12 @@ def config(): default="NOTSET", help='Log Level. Default - NOTSET' ) + argparser.add_argument( + '--nice-pause', + type=int, + default=None, + help='make nice pause between attempts to read binlog stream' + ) argparser.add_argument( '--dry', action='store_true', @@ -291,9 +297,11 @@ def config(): 'only_tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None, 'blocking': args.src_wait, 'resume_stream': args.src_resume, + 'nice_pause': 0 if args.nice_pause is None else args.nice_pause, }, 'file': { 'csv_file_path': args.src_file, + 'nice_pause': 0 if args.nice_pause is None else args.nice_pause, }, }, diff --git a/src/config.py b/src/config.py index 1a256ba..85bc2cb 100644 --- a/src/config.py +++ b/src/config.py @@ -34,6 +34,9 @@ def log_file(self): def log_level(self): return self.config['app-config']['log-level'] + def nice_pause(self): + return self.config['app-config']['nice-pause'] + def pid_file(self): return self.config['app-config']['pid_file'] diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 0469a0b..bb9d948 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -127,10 +127,10 @@ def rotate_belt(self, belt_index, flush=False): buckets_num = len(self.belts[belt_index]) last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - logging.info('rbelt. now:%d bucket:%d index:%d reason:%s bucketsonbelt:%d lastbucketsize:%d beltsnum:%d', + logging.info('rbelt. now:%d bucket:%d index:%s reason:%s bucketsonbelt:%d lastbucketsize:%d beltsnum:%d', now, self.buckets_count, - belt_index, + str(belt_index), rotate_reason, buckets_num, last_bucket_size, diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index a5bb692..bdd7c5e 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -20,6 +20,10 @@ class MySQLReader(Reader): blocking = None resume_stream = None binlog_stream = None + nice_pause = 0 + + write_rows_event_num = 0 + write_rows_event_each_row_num = 0; def __init__( self, @@ -31,6 +35,7 @@ def __init__( only_tables=None, blocking=None, resume_stream=None, + nice_pause=None, callbacks={}, ): super().__init__(callbacks=callbacks) @@ -43,6 +48,7 @@ def __init__( self.only_tables = only_tables self.blocking = blocking self.resume_stream = resume_stream + self.nice_pause = nice_pause self.binlog_stream = BinLogStreamReader( # MySQL server - data source connection_settings=self.connection_settings, @@ -68,10 +74,12 @@ def read(self): # fetch events try: while True: + logging.info('Check events in binlog stream') for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): if self.subscribers('WriteRowsEvent'): - logging.info('WriteRowsEvent rows: %d', len(mysql_event.rows)) + self.write_rows_event_num += 1 + logging.info('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -81,7 +89,8 @@ def read(self): self.notify('WriteRowsEvent', event=event) if self.subscribers('WriteRowsEvent.EachRow'): - logging.info('firing WriteRowsEvent.EachRow') + self.write_rows_event_each_row_num += 1 + logging.info('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) for row in mysql_event.rows: event = Event() event.schema = mysql_event.schema @@ -96,6 +105,9 @@ def read(self): break # blocking + if self.nice_pause > 0: + time.sleep(self.nice_pause) + self.notify('ReaderIdleEvent') except KeyboardInterrupt: From 7656988ac0082b04d348fdd2435b59db2cdbee23 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 20:10:50 +0300 Subject: [PATCH 5/8] writers path logging --- run_ontime.sh | 2 +- src/writer/chcsvwriter.py | 3 +++ src/writer/chwriter.py | 3 +++ src/writer/csvwriter.py | 3 +++ src/writer/poolwriter.py | 2 ++ src/writer/processwriter.py | 2 ++ 6 files changed, 14 insertions(+), 1 deletion(-) diff --git a/run_ontime.sh b/run_ontime.sh index 76e3c29..292e442 100755 --- a/run_ontime.sh +++ b/run_ontime.sh @@ -11,7 +11,7 @@ python3.6 main.py \ --dst-host=127.0.0.1 \ --csvpool \ --csvpool-file-path-prefix=qwe_ \ - --mempool-max-flush-interval=10 \ + --mempool-max-flush-interval=60 \ --mempool-max-events-num=100000 # --mempool diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py index fd1900d..d9390fb 100644 --- a/src/writer/chcsvwriter.py +++ b/src/writer/chcsvwriter.py @@ -5,6 +5,7 @@ import os import time +import logging class CHCSVWriter(Writer): @@ -33,6 +34,8 @@ def insert(self, event_or_events=None): if len(events) < 1: return + logging.debug('class:%s insert %d rows', __class__, len(events)) + for event in events: sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( event.schema, diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index fb18f07..c68a98e 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -3,6 +3,7 @@ from clickhouse_driver.client import Client from .writer import Writer +import logging class CHWriter(Writer): @@ -39,6 +40,8 @@ def insert(self, event_or_events=None): if len(events) < 1: return + logging.debug('class:%s insert %d rows', __class__, len(events)) + values = [] event_converted = None for event in events: diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index 9d4d392..7190b2e 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -5,6 +5,7 @@ from ..event.event import Event import csv import os.path +import logging class CSVWriter(Writer): @@ -72,6 +73,8 @@ def insert(self, event_or_events): if len(events) < 1: return + logging.debug('class:%s insert %d rows', __class__, len(events)) + if not self.opened(): self.open() diff --git a/src/writer/poolwriter.py b/src/writer/poolwriter.py index 84413bb..be6a662 100644 --- a/src/writer/poolwriter.py +++ b/src/writer/poolwriter.py @@ -4,6 +4,7 @@ from .writer import Writer from ..event.event import Event from ..pool.bbpool import BBPool +import logging class PoolWriter(Writer): @@ -29,6 +30,7 @@ def __init__( ) def insert(self, event_or_events): + logging.debug('class:%s insert', __class__) self.pool.insert(event_or_events) def flush(self): diff --git a/src/writer/processwriter.py b/src/writer/processwriter.py index 93db354..7f6d6da 100644 --- a/src/writer/processwriter.py +++ b/src/writer/processwriter.py @@ -3,6 +3,7 @@ from .writer import Writer import multiprocessing as mp +import logging class ProcessWriter(Writer): @@ -38,6 +39,7 @@ def insert(self, event_or_events=None): # row: {'id': 3, 'a': 3} # }, # ] + logging.debug('class:%s insert', __class__) process = mp.Process(target=self.process, args=(event_or_events,)) #print('Start Process') process.start() From 8223694e70e5a27603760f77024dd0b7af010acf Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 20:35:06 +0300 Subject: [PATCH 6/8] speed calc --- src/pool/bbpool.py | 30 ++++++++++++++++++++++++++++-- src/writer/processwriter.py | 7 ++++--- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index bb9d948..11e6abf 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -32,6 +32,11 @@ class BBPool(Pool): } buckets_count = 0 + buckets_content_count = 0; + + prev_time = None + prev_buckets_count = 0 + prev_buckets_content_count = 0; def __init__( self, @@ -123,13 +128,17 @@ def rotate_belt(self, belt_index, flush=False): while len(self.belts[belt_index]) > buckets_num_left_on_belt: # too many buckets on the belt # time to rotate belt and flush the most-right-bucket - self.buckets_count += 1 buckets_num = len(self.belts[belt_index]) last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - logging.info('rbelt. now:%d bucket:%d index:%s reason:%s bucketsonbelt:%d lastbucketsize:%d beltsnum:%d', + + self.buckets_count += 1 + self.buckets_content_count += last_bucket_size + + logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d', now, self.buckets_count, + self.buckets_content_count, str(belt_index), rotate_reason, buckets_num, @@ -146,5 +155,22 @@ def rotate_belt(self, belt_index, flush=False): writer.destroy() del writer + if self.prev_time is not None: + # have previous time - meaning this is at least second rotate + # can calculate belt speed + window_size = now - self.prev_time + buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size + buckets_content_per_sec = (self.buckets_content_count - self.prev_buckets_content_count)/window_size + logging.info( + 'buckets per sec:%f buckets content per sec:%f for last %d sec', + buckets_per_sec, + buckets_content_per_sec, + window_size + ) + + self.prev_time = now + self.prev_buckets_count = self.buckets_count + self.prev_buckets_content_count = self.buckets_content_count + # belt rotated return True diff --git a/src/writer/processwriter.py b/src/writer/processwriter.py index 7f6d6da..9ab3d23 100644 --- a/src/writer/processwriter.py +++ b/src/writer/processwriter.py @@ -41,11 +41,12 @@ def insert(self, event_or_events=None): # ] logging.debug('class:%s insert', __class__) process = mp.Process(target=self.process, args=(event_or_events,)) - #print('Start Process') + + logging.debug('class:%s insert - starting process', __class__) process.start() - #print('Join Process') + #process.join() - #print('Done Process') + logging.debug('class:%s insert - done process', __class__) pass def flush(self): From f1adf352325a69b7ca04cbef7ff3daa1d8e6d52e Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 21:04:26 +0300 Subject: [PATCH 7/8] minor --- main.py | 2 +- src/writer/processwriter.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/main.py b/main.py index 100507c..318143a 100755 --- a/main.py +++ b/main.py @@ -20,7 +20,6 @@ class Main(Daemon): config = None def __init__(self): - mp.set_start_method('forkserver') self.config = CLIOpts.config() logging.basicConfig( @@ -30,6 +29,7 @@ def __init__(self): ) super().__init__(pidfile=self.config.pid_file()) logging.debug(pprint.pformat(self.config.config)) + mp.set_start_method('forkserver') def run(self): pumper = Pumper( diff --git a/src/writer/processwriter.py b/src/writer/processwriter.py index 9ab3d23..2926e76 100644 --- a/src/writer/processwriter.py +++ b/src/writer/processwriter.py @@ -24,11 +24,13 @@ def open(self): pass def process(self, event_or_events=None): + logging.debug('class:%s starting process', __class__) writer = self.next_writer_builder.get() writer.insert(event_or_events) writer.close() writer.push() writer.destroy() + logging.debug('class:%s ending process', __class__) def insert(self, event_or_events=None): # event_or_events = [ From 23196b1e78a665addb64f2f4e7fb753884add903 Mon Sep 17 00:00:00 2001 From: sunsingerus Date: Wed, 15 Nov 2017 21:55:32 +0300 Subject: [PATCH 8/8] avg speed in buckets/items/rows --- main.py | 2 +- src/pool/bbpool.py | 32 ++++++++++++++++---------------- src/reader/mysqlreader.py | 26 ++++++++++++++++++++++---- src/writer/csvwriter.py | 2 +- src/writer/processwriter.py | 8 ++++---- 5 files changed, 44 insertions(+), 26 deletions(-) diff --git a/main.py b/main.py index 318143a..a6f929c 100755 --- a/main.py +++ b/main.py @@ -29,7 +29,7 @@ def __init__(self): ) super().__init__(pidfile=self.config.pid_file()) logging.debug(pprint.pformat(self.config.config)) - mp.set_start_method('forkserver') +# mp.set_start_method('forkserver') def run(self): pumper = Pumper( diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 11e6abf..82cda55 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -32,11 +32,11 @@ class BBPool(Pool): } buckets_count = 0 - buckets_content_count = 0; + items_count = 0; prev_time = None prev_buckets_count = 0 - prev_buckets_content_count = 0; + prev_items_count = 0; def __init__( self, @@ -133,18 +133,18 @@ def rotate_belt(self, belt_index, flush=False): last_bucket_size = len(self.belts[belt_index][buckets_num-1]) self.buckets_count += 1 - self.buckets_content_count += last_bucket_size + self.items_count += last_bucket_size logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d', - now, - self.buckets_count, - self.buckets_content_count, - str(belt_index), - rotate_reason, - buckets_num, - last_bucket_size, - len(self.belts) - ) + now, + self.buckets_count, + self.items_count, + str(belt_index), + rotate_reason, + buckets_num, + last_bucket_size, + len(self.belts) + ) # time to flush data for specified key self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)]) @@ -160,17 +160,17 @@ def rotate_belt(self, belt_index, flush=False): # can calculate belt speed window_size = now - self.prev_time buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size - buckets_content_per_sec = (self.buckets_content_count - self.prev_buckets_content_count)/window_size + items_per_sec = (self.items_count - self.prev_items_count) / window_size logging.info( - 'buckets per sec:%f buckets content per sec:%f for last %d sec', + 'buckets_per_sec:%f items_per_sec:%f for last %d sec', buckets_per_sec, - buckets_content_per_sec, + items_per_sec, window_size ) self.prev_time = now self.prev_buckets_count = self.buckets_count - self.prev_buckets_content_count = self.buckets_content_count + self.prev_items_count = self.items_count # belt rotated return True diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index bdd7c5e..ebb607a 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -73,13 +73,17 @@ def read(self): start_timestamp = int(time.time()) # fetch events try: + prev_stat_time = time.time() + rows_num = 0 + while True: - logging.info('Check events in binlog stream') + logging.debug('Check events in binlog stream') for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): if self.subscribers('WriteRowsEvent'): self.write_rows_event_num += 1 - logging.info('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) + logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) + rows_num += len(mysql_event.rows) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -90,8 +94,9 @@ def read(self): if self.subscribers('WriteRowsEvent.EachRow'): self.write_rows_event_each_row_num += 1 - logging.info('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) + logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) for row in mysql_event.rows: + rows_num += 1 event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -101,8 +106,21 @@ def read(self): # skip non-insert events pass + now = time.time() + if now > prev_stat_time + 60: + # time to calc stat + window_size = now - prev_stat_time + rows_per_sec = rows_num / window_size + logging.info( + 'rows_per_sec:%f for last %f sec', + rows_per_sec, + window_size + ) + prev_stat_time = now + rows_num = 0 + if not self.blocking: - break + break # while True # blocking if self.nice_pause > 0: diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index 7190b2e..6e70f56 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -73,7 +73,7 @@ def insert(self, event_or_events): if len(events) < 1: return - logging.debug('class:%s insert %d rows', __class__, len(events)) + logging.debug('class:%s insert %d events', __class__, len(events)) if not self.opened(): self.open() diff --git a/src/writer/processwriter.py b/src/writer/processwriter.py index 2926e76..6f67df6 100644 --- a/src/writer/processwriter.py +++ b/src/writer/processwriter.py @@ -24,13 +24,13 @@ def open(self): pass def process(self, event_or_events=None): - logging.debug('class:%s starting process', __class__) + logging.debug('class:%s process()', __class__) writer = self.next_writer_builder.get() writer.insert(event_or_events) writer.close() writer.push() writer.destroy() - logging.debug('class:%s ending process', __class__) + logging.debug('class:%s process() done', __class__) def insert(self, event_or_events=None): # event_or_events = [ @@ -44,11 +44,11 @@ def insert(self, event_or_events=None): logging.debug('class:%s insert', __class__) process = mp.Process(target=self.process, args=(event_or_events,)) - logging.debug('class:%s insert - starting process', __class__) + logging.debug('class:%s insert.process.start()', __class__) process.start() #process.join() - logging.debug('class:%s insert - done process', __class__) + logging.debug('class:%s insert done', __class__) pass def flush(self):