Skip to content

butches #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ python3 main.py \
--dst-host=192.168.74.251 \
--csvpool \
--csvpool-file-path-prefix=qwe_ \
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 \
--csv-column-default-value \
date_1=2000-01-01 \
datetime_1=2000-01-01\ 01:02:03 \
time_1=2001-01-01\ 01:02:03 \
timestamp_1=2002-01-01\ 01:02:03 \
--mempool-max-flush-interval=600 \
--mempool-max-events-num=900000

Expand Down
23 changes: 16 additions & 7 deletions src/cliopts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,44 @@ class CLIOpts(object):

@staticmethod
def join(lists_to_join):
# lists_to_join contains something like
# [['a=b', 'c=d'], ['e=f', 'z=x'], ]
"""Join several lists into one

:param lists_to_join: is a list of lists
[['a=b', 'c=d'], ['e=f', 'z=x'], ]

:return: None or dictionary
{'a': 'b', 'c': 'd', 'e': 'f', 'z': 'x'}

"""

if not isinstance(lists_to_join, list):
return None

res = {}
for lst in lists_to_join:
# lst = ['a=b', 'c=d']
for column_value_pair in lst:
# value = 'a=b'
# column_value_value = 'a=b'
column, value = column_value_pair.split('=', 2)
res[column] = value

# dict {
# res = dict {
# 'col1': 'value1',
# 'col2': 'value2',
# }

# return with sanity check
if len(res) > 0:
return res
else:
return None

@staticmethod
def config():
"""Parse application's CLI options into options dictionary
:return: instance of Config
"""
parse CLI options into options dict
:return: dict
"""

argparser = argparse.ArgumentParser(
description='ClickHouse data reader',
epilog='==============='
Expand Down
22 changes: 10 additions & 12 deletions src/converter/chwriteconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,28 @@ class CHWriteConverter(Converter):
set,
]

def convert(self, event):

def row(self, row):
columns_to_delete = []

for column in event.row:
if (event.row[column] is None) and self.delete_empty_columns:
for column in row:
if (row[column] is None) and self.delete_empty_columns:
# include empty column to the list of to be deleted columns
columns_to_delete.append(column)
# move to next column
continue

for t in self.types_to_convert:
if isinstance(event.row[column], t):
# print("Converting column", column, "of type", type(event.row[column]),
# event.row[column])
event.row[column] = str(event.row[column])
# print("res", event.row[column])
if isinstance(row[column], t):
# print("Converting column", column, "of type", type(event.row[column]), event.row[column])
row[column] = str(row[column])
# print("res", event.row[column])
break
else:
# print("Using asis column", column, "of type", type(event.row[column]))
# print("Using asis column", column, "of type", type(event.row[column]))
pass

# delete columns according to the list
for column in columns_to_delete:
event.row.pop(column)
row.pop(column)

return event
return row
26 changes: 25 additions & 1 deletion src/converter/converter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,31 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from ..event.event import Event


class Converter(object):

def convert(self, event):
def row(self, row):
return row

def rows(self, rows):
if rows is None:
return None

res = []
for row in rows:
res.append(self.row(row))

return res

def event(self, event):
event.row = self.row(event.row)
event.rows = self.rows(event.rows)
return event

def convert(self, event_or_row):
if isinstance(event_or_row, Event):
return self.event(event_or_row)
else:
return self.row(event_or_row)
10 changes: 5 additions & 5 deletions src/converter/csvreadconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

class CSVReadConverter(Converter):

def convert(self, event):
for column in event.row:
if event.row[column] == '':
event.row[column] = None
def row(self, row):
for column in row:
if row[column] == '':
row[column] = None
# else:
# try:
# event.row[column] = ast.literal_eval(event.row[column])
# except:
# pass
return event
return row
19 changes: 10 additions & 9 deletions src/converter/csvwriteconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@ class CSVWriteConverter(Converter):
def __init__(self, defaults=None):
self.defaults = defaults

def convert(self, event):
def row(self, row):
for column in row:
if (row[column] is None) and (column in self.defaults):
row[column] = self.defaults[column]
return row

def convert(self, event_or_row):
# no defaults - nothing to convert
if not self.defaults:
return event
return event_or_row

# defaults are empty - nothing to convert
if len(self.defaults) < 1:
return event
return event_or_row

# have defaults
for column in event.row:
# replace None column with default value
if event.row[column] is None and column in self.defaults:
event.row[column] = self.defaults[column]

return event
return super().convert(event_or_row)
20 changes: 18 additions & 2 deletions src/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,23 @@ class Event(object):
# {'id':1, 'col1':1}
row = None

file = None
# [{'id': 1, 'col1':1}, {'id': 2, 'col1': 2}, {'id': 3, 'col1': 3}]
rows = None

# /path/to/csv/file.csv
filename = None

# ['id', 'col1', 'col2']
fieldnames = None
fieldnames = None

def first(self):
if self.rows is None:
return self.row
else:
return self.rows[0]

def all(self):
if self.rows is None:
return [self.row]
else:
return self.rows
35 changes: 35 additions & 0 deletions src/objectbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ class ObjectBuilder(object):
instance = None

def __init__(self, class_name=None, constructor_params=None, instance=None):
"""
Builder/Wrapper for ad object.
In case instance is provided - operates as a wrapper
In case class_name and (optional) constructor_params provided - return instance of specified class
:param class_name: class to instantiate
:param constructor_params: dict of class's contrcutor params. Used as **constructor_params
:param instance: ready-to use instance
"""
self.class_name = class_name
self.constructor_params = constructor_params
self.instance = instance
Expand All @@ -19,12 +27,39 @@ def param(self, name, value):
self.constructor_params[name] = value

def get(self):
"""
Get object (in case wrapper) or an instance of a class (in case Object builder) - each time the same object
:return: object
"""
if not self.class_name:
# no class name - return instance, it may be None
return self.instance

# have class name

# instantiate object
if self.constructor_params:
self.instance = self.class_name(**self.constructor_params)
else:
self.instance = self.class_name()

# in order to return instance on next get() call
self.class_name = None

return self.instance

def new(self):
"""
Get object (in case wrapper) or an instance of a class (in case Object builder) - each time new object
:return: object
"""
if not self.class_name:
# no class name - return instance, it may be None
return self.instance

# have class name

# instantiate object
if self.constructor_params:
return self.class_name(**self.constructor_params)
else:
Expand Down
28 changes: 28 additions & 0 deletions src/observable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-


class Observable(object):

event_handlers = {
'Event1': [],
'Event2': [],
}

def subscribe(self, event_handlers):
for event_name in event_handlers:
if event_name in self.event_handlers:
if callable(event_handlers[event_name]):
self.event_handlers[event_name].append(event_handlers[event_name])
else:
if isinstance(event_handlers[event_name], list):
for callback in event_handlers[event_name]:
if callable(callback):
self.event_handlers[event_name].append(callback)

def notify(self, event_name, **attrs):
for callback in self.event_handlers[event_name]:
callback(**attrs)

def subscribers(self, event_name):
return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0)
5 changes: 2 additions & 3 deletions src/pumper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ def __init__(self, reader=None, writer=None):
if self.reader:
self.reader.subscribe({
'WriteRowsEvent': self.write_rows_event,
'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
'ReaderIdleEvent': self.reader_idle_event,
})

def run(self):
self.reader.read()

def write_rows_event(self, event=None):
# binlog_event.dump()
pass
self.writer.insert(event)

def write_rows_event_each_row(self, event=None):
self.writer.insert(event)
Expand Down
4 changes: 2 additions & 2 deletions src/reader/csvreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def read(self):
try:
event = Event()
event.table = os.path.splitext(self.csv_file_path)[0]
self.fire('WriteRowsEvent', event=event)
self.notify('WriteRowsEvent', event=event)
for row in self.reader:
event.row = row
self.fire('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event)
self.notify('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event)
except KeyboardInterrupt:
pass

Expand Down
22 changes: 14 additions & 8 deletions src/reader/mysqlreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,22 @@ def read(self):
while True:
for mysql_event in self.binlog_stream:
if isinstance(mysql_event, WriteRowsEvent):
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
self.fire('WriteRowsEvent', event=event)
for row in mysql_event.rows:
if self.subscribers('WriteRowsEvent'):
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.row = row['values']
self.fire('WriteRowsEvent.EachRow', event=event)
event.rows = []
for row in mysql_event.rows:
event.rows.append(row['values'])
self.notify('WriteRowsEvent', event=event)

if self.subscribers('WriteRowsEvent.EachRow'):
for row in mysql_event.rows:
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.row = row['values']
self.notify('WriteRowsEvent.EachRow', event=event)
else:
# skip non-insert events
pass
Expand All @@ -87,7 +93,7 @@ def read(self):
break

# blocking
self.fire('ReaderIdleEvent')
self.notify('ReaderIdleEvent')

except KeyboardInterrupt:
pass
Expand Down
Loading