diff --git a/README.md b/README.md index f54aa16..5da5e9c 100644 --- a/README.md +++ b/README.md @@ -185,7 +185,7 @@ binlog-format = row #Very important if you want to receive write, update and ## MySQL Test Tables We have to separate test table into several ones because of this error, produced by MySQL: -```bash +```text ERROR 1118 (42000): Row size too large. The maximum row size for the used table type, not counting BLOBs, is 65535. This includes storage overhead, check the manual. You have to change some columns to TEXT or BLOBs ``` @@ -727,6 +727,7 @@ CREATE TABLE IF NOT EXISTS `airline`.`ontime` ( ### ClickHouse Table ```sql +CREATE DATABASE IF NOT EXISTS `airline`; CREATE TABLE IF NOT EXISTS `airline`.`ontime` ( `Year` UInt16, `Quarter` UInt8, diff --git a/main.py b/main.py index 77d6190..a6f929c 100755 --- a/main.py +++ b/main.py @@ -7,24 +7,29 @@ import sys import multiprocessing as mp - +import logging +import pprint if sys.version_info[0] < 3: raise "Must be using Python 3" + class Main(Daemon): config = None def __init__(self): - mp.set_start_method('forkserver') self.config = CLIOpts.config() - super().__init__(pidfile=self.config.pid_file()) - print('---') - print(self.config) - print('---') + logging.basicConfig( + filename=self.config.log_file(), + level=self.config.log_level(), + format='%(asctime)s - %(levelname)s - %(message)s' + ) + super().__init__(pidfile=self.config.pid_file()) + logging.debug(pprint.pformat(self.config.config)) +# mp.set_start_method('forkserver') def run(self): pumper = Pumper( @@ -36,7 +41,7 @@ def run(self): def start(self): if self.config.is_daemon(): if not super().start(): - print("Error going background. The process already running?") + logging.error("Error going background. The process already running?") else: self.run() diff --git a/run.sh b/run_datatypes.sh similarity index 92% rename from run.sh rename to run_datatypes.sh index 97fbe28..2675ff4 100755 --- a/run.sh +++ b/run_datatypes.sh @@ -1,5 +1,7 @@ #!/bin/bash +sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" + python3 main.py \ --src-resume \ --src-wait \ diff --git a/run_ontime.sh b/run_ontime.sh new file mode 100755 index 0000000..292e442 --- /dev/null +++ b/run_ontime.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" + +python3.6 main.py \ + --src-resume \ + --src-wait \ + --nice-pause=1 \ + --src-host=127.0.0.1 \ + --src-user=root \ + --dst-host=127.0.0.1 \ + --csvpool \ + --csvpool-file-path-prefix=qwe_ \ + --mempool-max-flush-interval=60 \ + --mempool-max-events-num=100000 + +# --mempool +# --mempool-max-events-num=3 +# --mempool-max-flush-interval=30 +# --dst-file=dst.csv +# --dst-schema=db +# --dst-table=datatypes +# --csvpool-keep-files diff --git a/src/cliopts.py b/src/cliopts.py index 561ca2b..7237146 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -3,6 +3,7 @@ import argparse from .config import Config +import logging class CLIOpts(object): @@ -41,6 +42,25 @@ def join(lists_to_join): else: return None + @staticmethod + def log_level_from_string(log_level_string): + level = log_level_string.upper() + + if level == 'CRITICAL': + return logging.CRITICAL + elif level == 'ERROR': + return logging.ERROR + elif level == 'WARNING': + return logging.WARNING + elif level == 'INFO': + return logging.INFO + elif level == 'DEBUG': + return logging.DEBUG + elif level == 'NOTSET': + return logging.NOTSET + else: + return logging.NOTSET + @staticmethod def config(): """Parse application's CLI options into options dictionary @@ -55,9 +75,27 @@ def config(): argparser.add_argument( '--config-file', type=str, - default='', + default=None, help='Path to config file. Default - not specified' ) + argparser.add_argument( + '--log-file', + type=str, + default=None, + help='Path to log file. Default - not specified' + ) + argparser.add_argument( + '--log-level', + type=str, + default="NOTSET", + help='Log Level. Default - NOTSET' + ) + argparser.add_argument( + '--nice-pause', + type=int, + default=None, + help='make nice pause between attempts to read binlog stream' + ) argparser.add_argument( '--dry', action='store_true', @@ -226,6 +264,8 @@ def config(): return Config ({ 'app-config': { 'config-file': args.config_file, + 'log-file': args.log_file, + 'log-level': CLIOpts.log_level_from_string(args.log_level), 'dry': args.dry, 'daemon': args.daemon, 'pid_file': args.pid_file, @@ -257,9 +297,11 @@ def config(): 'only_tables': [x for x in args.src_only_tables.split(',') if x] if args.src_only_tables else None, 'blocking': args.src_wait, 'resume_stream': args.src_resume, + 'nice_pause': 0 if args.nice_pause is None else args.nice_pause, }, 'file': { 'csv_file_path': args.src_file, + 'nice_pause': 0 if args.nice_pause is None else args.nice_pause, }, }, diff --git a/src/config.py b/src/config.py index 06e19c9..85bc2cb 100644 --- a/src/config.py +++ b/src/config.py @@ -28,6 +28,15 @@ def __str__(self): def __getitem__(self, item): return self.config[item] + def log_file(self): + return self.config['app-config']['log-file'] + + def log_level(self): + return self.config['app-config']['log-level'] + + def nice_pause(self): + return self.config['app-config']['nice-pause'] + def pid_file(self): return self.config['app-config']['pid_file'] diff --git a/src/observable.py b/src/observable.py index d8b9495..e92b415 100644 --- a/src/observable.py +++ b/src/observable.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import logging class Observable(object): @@ -25,4 +26,4 @@ def notify(self, event_name, **attrs): callback(**attrs) def subscribers(self, event_name): - return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0) \ No newline at end of file + return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0) diff --git a/src/pool/bbpool.py b/src/pool/bbpool.py index 76a8b94..82cda55 100644 --- a/src/pool/bbpool.py +++ b/src/pool/bbpool.py @@ -5,6 +5,7 @@ from .pool import Pool from ..objectbuilder import ObjectBuilder +import logging # Buckets Belts' Index Generator @@ -20,17 +21,22 @@ class BBPool(Pool): # buckets on the belts belts = { -# pour data into 0-index bucket -# 'key.1': [[item,], [item, item, item,], [item, item, item,]] -# 'key.2': [[item,], [item, item, item,], [item, item, item,]] + # pour data into 0-index bucket + # 'key.1': [[item,], [item, item, item,], [item, item, item,]] + # 'key.2': [[item,], [item, item, item,], [item, item, item,]] } belts_rotated_at = { -# 'key.1': UNIX TIMESTAMP -# 'key.2': UNIX TIMESTAMP + # 'key.1': UNIX TIMESTAMP + # 'key.2': UNIX TIMESTAMP } buckets_count = 0 + items_count = 0; + + prev_time = None + prev_buckets_count = 0 + prev_items_count = 0; def __init__( self, @@ -49,11 +55,14 @@ def __init__( ) def create_belt(self, belt_index): - # create belt with one empty bucket + """create belt with one empty bucket""" + self.belts[belt_index] = [[]] self.belts_rotated_at[belt_index] = int(time.time()) def insert(self, item): + """Insert item into pool""" + # which belt we'll insert item? belt_index = self.key_generator.generate(item) @@ -64,12 +73,12 @@ def insert(self, item): # append item to the 0-indexed bucket of the specified belt self.belts[belt_index][0].append(item) - # may be bucket is already full - if len(self.belts[belt_index][0]) >= self.max_bucket_size: - # bucket full, rotate the belt - self.rotate_belt(belt_index) + # try to rotate belt - may it it already should be rotated + self.rotate_belt(belt_index) def flush(self, key=None): + """Flush all buckets from the belt and delete the belt itself""" + belt_index = key empty_belts_indexes = [] @@ -87,25 +96,27 @@ def flush(self, key=None): self.belts_rotated_at.pop(b_index) def rotate_belt(self, belt_index, flush=False): + """Try to rotate belt""" + now = int(time.time()) - need_rotation = True if flush else False - rotate_reason = "FLUSH" - if len(self.belts[belt_index][0]) >= self.max_bucket_size: + if flush: + # explicit flush requested + rotate_reason = "FLUSH" + + elif len(self.belts[belt_index][0]) >= self.max_bucket_size: # 0-index bucket is full - need_rotation = True rotate_reason = "SIZE" elif now >= self.belts_rotated_at[belt_index] + self.max_interval_between_rotations: # time interval reached - need_rotation = True rotate_reason = "TIME" - if not need_rotation: - # belt not rotated + else: + # no need to rotate belt - belt not rotated return False - # belts needs rotation + # belt(s) needs rotation # insert empty bucket into the beginning of the belt self.belts[belt_index].insert(0, []) @@ -117,11 +128,23 @@ def rotate_belt(self, belt_index, flush=False): while len(self.belts[belt_index]) > buckets_num_left_on_belt: # too many buckets on the belt # time to rotate belt and flush the most-right-bucket - self.buckets_count += 1 buckets_num = len(self.belts[belt_index]) last_bucket_size = len(self.belts[belt_index][buckets_num-1]) - print('rotating belt. now:', now, 'bucket number:', self.buckets_count, 'index:', belt_index, 'reason:', rotate_reason, 'buckets on belt:', buckets_num, 'last bucket size:', last_bucket_size, 'belts count:', len(self.belts)) + + self.buckets_count += 1 + self.items_count += last_bucket_size + + logging.info('rot now:%d bktcnt:%d bktcontentcnt: %d index:%s reason:%s bktsonbelt:%d bktsize:%d beltnum:%d', + now, + self.buckets_count, + self.items_count, + str(belt_index), + rotate_reason, + buckets_num, + last_bucket_size, + len(self.belts) + ) # time to flush data for specified key self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)]) @@ -132,5 +155,22 @@ def rotate_belt(self, belt_index, flush=False): writer.destroy() del writer + if self.prev_time is not None: + # have previous time - meaning this is at least second rotate + # can calculate belt speed + window_size = now - self.prev_time + buckets_per_sec = (self.buckets_count - self.prev_buckets_count)/window_size + items_per_sec = (self.items_count - self.prev_items_count) / window_size + logging.info( + 'buckets_per_sec:%f items_per_sec:%f for last %d sec', + buckets_per_sec, + items_per_sec, + window_size + ) + + self.prev_time = now + self.prev_buckets_count = self.buckets_count + self.prev_items_count = self.items_count + # belt rotated return True diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index ba13fef..ebb607a 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -7,6 +7,7 @@ from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent #from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent import time +import logging class MySQLReader(Reader): @@ -19,6 +20,10 @@ class MySQLReader(Reader): blocking = None resume_stream = None binlog_stream = None + nice_pause = 0 + + write_rows_event_num = 0 + write_rows_event_each_row_num = 0; def __init__( self, @@ -30,6 +35,7 @@ def __init__( only_tables=None, blocking=None, resume_stream=None, + nice_pause=None, callbacks={}, ): super().__init__(callbacks=callbacks) @@ -42,6 +48,7 @@ def __init__( self.only_tables = only_tables self.blocking = blocking self.resume_stream = resume_stream + self.nice_pause = nice_pause self.binlog_stream = BinLogStreamReader( # MySQL server - data source connection_settings=self.connection_settings, @@ -66,10 +73,17 @@ def read(self): start_timestamp = int(time.time()) # fetch events try: + prev_stat_time = time.time() + rows_num = 0 + while True: + logging.debug('Check events in binlog stream') for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): if self.subscribers('WriteRowsEvent'): + self.write_rows_event_num += 1 + logging.debug('WriteRowsEvent #%d rows: %d', self.write_rows_event_num, len(mysql_event.rows)) + rows_num += len(mysql_event.rows) event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -79,7 +93,10 @@ def read(self): self.notify('WriteRowsEvent', event=event) if self.subscribers('WriteRowsEvent.EachRow'): + self.write_rows_event_each_row_num += 1 + logging.debug('WriteRowsEvent.EachRow #%d', self.write_rows_event_each_row_num) for row in mysql_event.rows: + rows_num += 1 event = Event() event.schema = mysql_event.schema event.table = mysql_event.table @@ -89,10 +106,26 @@ def read(self): # skip non-insert events pass + now = time.time() + if now > prev_stat_time + 60: + # time to calc stat + window_size = now - prev_stat_time + rows_per_sec = rows_num / window_size + logging.info( + 'rows_per_sec:%f for last %f sec', + rows_per_sec, + window_size + ) + prev_stat_time = now + rows_num = 0 + if not self.blocking: - break + break # while True # blocking + if self.nice_pause > 0: + time.sleep(self.nice_pause) + self.notify('ReaderIdleEvent') except KeyboardInterrupt: diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py index fd1900d..d9390fb 100644 --- a/src/writer/chcsvwriter.py +++ b/src/writer/chcsvwriter.py @@ -5,6 +5,7 @@ import os import time +import logging class CHCSVWriter(Writer): @@ -33,6 +34,8 @@ def insert(self, event_or_events=None): if len(events) < 1: return + logging.debug('class:%s insert %d rows', __class__, len(events)) + for event in events: sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format( event.schema, diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index fb18f07..c68a98e 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -3,6 +3,7 @@ from clickhouse_driver.client import Client from .writer import Writer +import logging class CHWriter(Writer): @@ -39,6 +40,8 @@ def insert(self, event_or_events=None): if len(events) < 1: return + logging.debug('class:%s insert %d rows', __class__, len(events)) + values = [] event_converted = None for event in events: diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index 9d4d392..6e70f56 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -5,6 +5,7 @@ from ..event.event import Event import csv import os.path +import logging class CSVWriter(Writer): @@ -72,6 +73,8 @@ def insert(self, event_or_events): if len(events) < 1: return + logging.debug('class:%s insert %d events', __class__, len(events)) + if not self.opened(): self.open() diff --git a/src/writer/poolwriter.py b/src/writer/poolwriter.py index 84413bb..be6a662 100644 --- a/src/writer/poolwriter.py +++ b/src/writer/poolwriter.py @@ -4,6 +4,7 @@ from .writer import Writer from ..event.event import Event from ..pool.bbpool import BBPool +import logging class PoolWriter(Writer): @@ -29,6 +30,7 @@ def __init__( ) def insert(self, event_or_events): + logging.debug('class:%s insert', __class__) self.pool.insert(event_or_events) def flush(self): diff --git a/src/writer/processwriter.py b/src/writer/processwriter.py index 93db354..6f67df6 100644 --- a/src/writer/processwriter.py +++ b/src/writer/processwriter.py @@ -3,6 +3,7 @@ from .writer import Writer import multiprocessing as mp +import logging class ProcessWriter(Writer): @@ -23,11 +24,13 @@ def open(self): pass def process(self, event_or_events=None): + logging.debug('class:%s process()', __class__) writer = self.next_writer_builder.get() writer.insert(event_or_events) writer.close() writer.push() writer.destroy() + logging.debug('class:%s process() done', __class__) def insert(self, event_or_events=None): # event_or_events = [ @@ -38,12 +41,14 @@ def insert(self, event_or_events=None): # row: {'id': 3, 'a': 3} # }, # ] + logging.debug('class:%s insert', __class__) process = mp.Process(target=self.process, args=(event_or_events,)) - #print('Start Process') + + logging.debug('class:%s insert.process.start()', __class__) process.start() - #print('Join Process') + #process.join() - #print('Done Process') + logging.debug('class:%s insert done', __class__) pass def flush(self):