diff --git a/run.sh b/run.sh index 0162783..97fbe28 100755 --- a/run.sh +++ b/run.sh @@ -9,7 +9,11 @@ python3 main.py \ --dst-host=192.168.74.251 \ --csvpool \ --csvpool-file-path-prefix=qwe_ \ - --csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 \ + --csv-column-default-value \ + date_1=2000-01-01 \ + datetime_1=2000-01-01\ 01:02:03 \ + time_1=2001-01-01\ 01:02:03 \ + timestamp_1=2002-01-01\ 01:02:03 \ --mempool-max-flush-interval=600 \ --mempool-max-events-num=900000 diff --git a/src/cliopts.py b/src/cliopts.py index 9d36566..561ca2b 100644 --- a/src/cliopts.py +++ b/src/cliopts.py @@ -9,8 +9,16 @@ class CLIOpts(object): @staticmethod def join(lists_to_join): - # lists_to_join contains something like - # [['a=b', 'c=d'], ['e=f', 'z=x'], ] + """Join several lists into one + + :param lists_to_join: is a list of lists + [['a=b', 'c=d'], ['e=f', 'z=x'], ] + + :return: None or dictionary + {'a': 'b', 'c': 'd', 'e': 'f', 'z': 'x'} + + """ + if not isinstance(lists_to_join, list): return None @@ -18,15 +26,16 @@ def join(lists_to_join): for lst in lists_to_join: # lst = ['a=b', 'c=d'] for column_value_pair in lst: - # value = 'a=b' + # column_value_value = 'a=b' column, value = column_value_pair.split('=', 2) res[column] = value - # dict { + # res = dict { # 'col1': 'value1', # 'col2': 'value2', # } + # return with sanity check if len(res) > 0: return res else: @@ -34,10 +43,10 @@ def join(lists_to_join): @staticmethod def config(): + """Parse application's CLI options into options dictionary + :return: instance of Config """ - parse CLI options into options dict - :return: dict - """ + argparser = argparse.ArgumentParser( description='ClickHouse data reader', epilog='===============' diff --git a/src/converter/chwriteconverter.py b/src/converter/chwriteconverter.py index 0499077..d4c3950 100644 --- a/src/converter/chwriteconverter.py +++ b/src/converter/chwriteconverter.py @@ -25,30 +25,28 @@ class CHWriteConverter(Converter): set, ] - def convert(self, event): - + def row(self, row): columns_to_delete = [] - for column in event.row: - if (event.row[column] is None) and self.delete_empty_columns: + for column in row: + if (row[column] is None) and self.delete_empty_columns: # include empty column to the list of to be deleted columns columns_to_delete.append(column) # move to next column continue for t in self.types_to_convert: - if isinstance(event.row[column], t): -# print("Converting column", column, "of type", type(event.row[column]), -# event.row[column]) - event.row[column] = str(event.row[column]) -# print("res", event.row[column]) + if isinstance(row[column], t): + # print("Converting column", column, "of type", type(event.row[column]), event.row[column]) + row[column] = str(row[column]) + # print("res", event.row[column]) break else: -# print("Using asis column", column, "of type", type(event.row[column])) + # print("Using asis column", column, "of type", type(event.row[column])) pass # delete columns according to the list for column in columns_to_delete: - event.row.pop(column) + row.pop(column) - return event + return row diff --git a/src/converter/converter.py b/src/converter/converter.py index e398b9f..4020658 100644 --- a/src/converter/converter.py +++ b/src/converter/converter.py @@ -1,7 +1,31 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from ..event.event import Event + + class Converter(object): - def convert(self, event): + def row(self, row): + return row + + def rows(self, rows): + if rows is None: + return None + + res = [] + for row in rows: + res.append(self.row(row)) + + return res + + def event(self, event): + event.row = self.row(event.row) + event.rows = self.rows(event.rows) return event + + def convert(self, event_or_row): + if isinstance(event_or_row, Event): + return self.event(event_or_row) + else: + return self.row(event_or_row) diff --git a/src/converter/csvreadconverter.py b/src/converter/csvreadconverter.py index 70ab90d..ef3192f 100644 --- a/src/converter/csvreadconverter.py +++ b/src/converter/csvreadconverter.py @@ -7,13 +7,13 @@ class CSVReadConverter(Converter): - def convert(self, event): - for column in event.row: - if event.row[column] == '': - event.row[column] = None + def row(self, row): + for column in row: + if row[column] == '': + row[column] = None # else: # try: # event.row[column] = ast.literal_eval(event.row[column]) # except: # pass - return event + return row diff --git a/src/converter/csvwriteconverter.py b/src/converter/csvwriteconverter.py index c820e14..a33a045 100644 --- a/src/converter/csvwriteconverter.py +++ b/src/converter/csvwriteconverter.py @@ -12,19 +12,20 @@ class CSVWriteConverter(Converter): def __init__(self, defaults=None): self.defaults = defaults - def convert(self, event): + def row(self, row): + for column in row: + if (row[column] is None) and (column in self.defaults): + row[column] = self.defaults[column] + return row + + def convert(self, event_or_row): # no defaults - nothing to convert if not self.defaults: - return event + return event_or_row # defaults are empty - nothing to convert if len(self.defaults) < 1: - return event + return event_or_row # have defaults - for column in event.row: - # replace None column with default value - if event.row[column] is None and column in self.defaults: - event.row[column] = self.defaults[column] - - return event + return super().convert(event_or_row) diff --git a/src/event/event.py b/src/event/event.py index 1e388a3..f1e502c 100644 --- a/src/event/event.py +++ b/src/event/event.py @@ -13,7 +13,23 @@ class Event(object): # {'id':1, 'col1':1} row = None - file = None + # [{'id': 1, 'col1':1}, {'id': 2, 'col1': 2}, {'id': 3, 'col1': 3}] + rows = None + + # /path/to/csv/file.csv + filename = None # ['id', 'col1', 'col2'] - fieldnames = None \ No newline at end of file + fieldnames = None + + def first(self): + if self.rows is None: + return self.row + else: + return self.rows[0] + + def all(self): + if self.rows is None: + return [self.row] + else: + return self.rows diff --git a/src/objectbuilder.py b/src/objectbuilder.py index 08e6762..0a353fe 100644 --- a/src/objectbuilder.py +++ b/src/objectbuilder.py @@ -9,6 +9,14 @@ class ObjectBuilder(object): instance = None def __init__(self, class_name=None, constructor_params=None, instance=None): + """ + Builder/Wrapper for ad object. + In case instance is provided - operates as a wrapper + In case class_name and (optional) constructor_params provided - return instance of specified class + :param class_name: class to instantiate + :param constructor_params: dict of class's contrcutor params. Used as **constructor_params + :param instance: ready-to use instance + """ self.class_name = class_name self.constructor_params = constructor_params self.instance = instance @@ -19,12 +27,39 @@ def param(self, name, value): self.constructor_params[name] = value def get(self): + """ + Get object (in case wrapper) or an instance of a class (in case Object builder) - each time the same object + :return: object + """ if not self.class_name: # no class name - return instance, it may be None return self.instance # have class name + # instantiate object + if self.constructor_params: + self.instance = self.class_name(**self.constructor_params) + else: + self.instance = self.class_name() + + # in order to return instance on next get() call + self.class_name = None + + return self.instance + + def new(self): + """ + Get object (in case wrapper) or an instance of a class (in case Object builder) - each time new object + :return: object + """ + if not self.class_name: + # no class name - return instance, it may be None + return self.instance + + # have class name + + # instantiate object if self.constructor_params: return self.class_name(**self.constructor_params) else: diff --git a/src/observable.py b/src/observable.py new file mode 100644 index 0000000..d8b9495 --- /dev/null +++ b/src/observable.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + + +class Observable(object): + + event_handlers = { + 'Event1': [], + 'Event2': [], + } + + def subscribe(self, event_handlers): + for event_name in event_handlers: + if event_name in self.event_handlers: + if callable(event_handlers[event_name]): + self.event_handlers[event_name].append(event_handlers[event_name]) + else: + if isinstance(event_handlers[event_name], list): + for callback in event_handlers[event_name]: + if callable(callback): + self.event_handlers[event_name].append(callback) + + def notify(self, event_name, **attrs): + for callback in self.event_handlers[event_name]: + callback(**attrs) + + def subscribers(self, event_name): + return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0) \ No newline at end of file diff --git a/src/pumper.py b/src/pumper.py index 03f5d31..97c5b32 100644 --- a/src/pumper.py +++ b/src/pumper.py @@ -16,7 +16,7 @@ def __init__(self, reader=None, writer=None): if self.reader: self.reader.subscribe({ 'WriteRowsEvent': self.write_rows_event, - 'WriteRowsEvent.EachRow': self.write_rows_event_each_row, +# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row, 'ReaderIdleEvent': self.reader_idle_event, }) @@ -24,8 +24,7 @@ def run(self): self.reader.read() def write_rows_event(self, event=None): -# binlog_event.dump() - pass + self.writer.insert(event) def write_rows_event_each_row(self, event=None): self.writer.insert(event) diff --git a/src/reader/csvreader.py b/src/reader/csvreader.py index e0a439e..09b53b4 100644 --- a/src/reader/csvreader.py +++ b/src/reader/csvreader.py @@ -41,10 +41,10 @@ def read(self): try: event = Event() event.table = os.path.splitext(self.csv_file_path)[0] - self.fire('WriteRowsEvent', event=event) + self.notify('WriteRowsEvent', event=event) for row in self.reader: event.row = row - self.fire('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event) + self.notify('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event) except KeyboardInterrupt: pass diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index 88044db..ba13fef 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -69,16 +69,22 @@ def read(self): while True: for mysql_event in self.binlog_stream: if isinstance(mysql_event, WriteRowsEvent): - event = Event() - event.schema = mysql_event.schema - event.table = mysql_event.table - self.fire('WriteRowsEvent', event=event) - for row in mysql_event.rows: + if self.subscribers('WriteRowsEvent'): event = Event() event.schema = mysql_event.schema event.table = mysql_event.table - event.row = row['values'] - self.fire('WriteRowsEvent.EachRow', event=event) + event.rows = [] + for row in mysql_event.rows: + event.rows.append(row['values']) + self.notify('WriteRowsEvent', event=event) + + if self.subscribers('WriteRowsEvent.EachRow'): + for row in mysql_event.rows: + event = Event() + event.schema = mysql_event.schema + event.table = mysql_event.table + event.row = row['values'] + self.notify('WriteRowsEvent.EachRow', event=event) else: # skip non-insert events pass @@ -87,7 +93,7 @@ def read(self): break # blocking - self.fire('ReaderIdleEvent') + self.notify('ReaderIdleEvent') except KeyboardInterrupt: pass diff --git a/src/reader/reader.py b/src/reader/reader.py index 547edc1..23c27b2 100644 --- a/src/reader/reader.py +++ b/src/reader/reader.py @@ -1,12 +1,14 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from ..observable import Observable -class Reader(object): + +class Reader(Observable): converter = None - callbacks = { + event_handlers = { # called on each WriteRowsEvent 'WriteRowsEvent': [], @@ -21,14 +23,5 @@ def __init__(self, converter=None, callbacks={}): self.converter = converter self.subscribe(callbacks) - def subscribe(self, callbacks): - for callback_name in callbacks: - if callback_name in self.callbacks: - self.callbacks[callback_name].append(callbacks[callback_name]) - - def fire(self, event_name, **attrs): - for callback in self.callbacks[event_name]: - callback(**attrs) - def read(self): pass diff --git a/src/writer/chcsvwriter.py b/src/writer/chcsvwriter.py index e350b2f..fd1900d 100644 --- a/src/writer/chcsvwriter.py +++ b/src/writer/chcsvwriter.py @@ -50,7 +50,7 @@ def insert(self, event_or_events=None): if self.password: choptions += " --password=" + self.password bash = "tail -n +2 '{0}' | clickhouse-client {1} --query='{2}'".format( - event.file, + event.filename, choptions, sql, ) diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index 4625e6d..fb18f07 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -43,7 +43,8 @@ def insert(self, event_or_events=None): event_converted = None for event in events: event_converted = self.convert(event) - values.append(event_converted.row) + for value in event_converted.all(): + values.append(value) schema = self.dst_schema if self.dst_schema else event_converted.schema table = self.dst_table if self.dst_table else event_converted.table diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index 6ea1f04..9d4d392 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -76,7 +76,7 @@ def insert(self, event_or_events): self.open() if not self.writer: - self.fieldnames = sorted(events[0].row.keys()) + self.fieldnames = sorted(events[0].first().keys()) if self.dst_schema is None: self.dst_schema = events[0].schema if self.dst_table is None: @@ -87,7 +87,8 @@ def insert(self, event_or_events): self.writer.writeheader() for event in events: - self.writer.writerow(self.convert(event).row) + for row in event.all(): + self.writer.writerow(self.convert(row)) def push(self): if not self.next_writer_builder: @@ -96,7 +97,7 @@ def push(self): event = Event() event.schema = self.dst_schema event.table = self.dst_table - event.file = self.path + event.filename = self.path event.fieldnames = self.fieldnames self.next_writer_builder.get().insert(event) diff --git a/src/writer/writer.py b/src/writer/writer.py index 6fabd2f..6d5c681 100644 --- a/src/writer/writer.py +++ b/src/writer/writer.py @@ -40,8 +40,8 @@ def listify(self, obj_or_list): # event_or_events is an object return [obj_or_list] - def convert(self, event): - return self.converter_builder.get().convert(event) if self.converter_builder else event + def convert(self, data): + return self.converter_builder.get().convert(data) if self.converter_builder else data def insert(self, event_or_events=None): # event_or_events = [