Skip to content

Commit d7ba9e4

Browse files
committed
butches
1 parent c4f6ec1 commit d7ba9e4

17 files changed

+183
-68
lines changed

run.sh

+5-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ python3 main.py \
99
--dst-host=192.168.74.251 \
1010
--csvpool \
1111
--csvpool-file-path-prefix=qwe_ \
12-
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 \
12+
--csv-column-default-value \
13+
date_1=2000-01-01 \
14+
datetime_1=2000-01-01\ 01:02:03 \
15+
time_1=2001-01-01\ 01:02:03 \
16+
timestamp_1=2002-01-01\ 01:02:03 \
1317
--mempool-max-flush-interval=600 \
1418
--mempool-max-events-num=900000
1519

src/cliopts.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,44 @@ class CLIOpts(object):
99

1010
@staticmethod
1111
def join(lists_to_join):
12-
# lists_to_join contains something like
13-
# [['a=b', 'c=d'], ['e=f', 'z=x'], ]
12+
"""Join several lists into one
13+
14+
:param lists_to_join: is a list of lists
15+
[['a=b', 'c=d'], ['e=f', 'z=x'], ]
16+
17+
:return: None or dictionary
18+
{'a': 'b', 'c': 'd', 'e': 'f', 'z': 'x'}
19+
20+
"""
21+
1422
if not isinstance(lists_to_join, list):
1523
return None
1624

1725
res = {}
1826
for lst in lists_to_join:
1927
# lst = ['a=b', 'c=d']
2028
for column_value_pair in lst:
21-
# value = 'a=b'
29+
# column_value_value = 'a=b'
2230
column, value = column_value_pair.split('=', 2)
2331
res[column] = value
2432

25-
# dict {
33+
# res = dict {
2634
# 'col1': 'value1',
2735
# 'col2': 'value2',
2836
# }
2937

38+
# return with sanity check
3039
if len(res) > 0:
3140
return res
3241
else:
3342
return None
3443

3544
@staticmethod
3645
def config():
46+
"""Parse application's CLI options into options dictionary
47+
:return: instance of Config
3748
"""
38-
parse CLI options into options dict
39-
:return: dict
40-
"""
49+
4150
argparser = argparse.ArgumentParser(
4251
description='ClickHouse data reader',
4352
epilog='==============='

src/converter/chwriteconverter.py

+10-12
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,28 @@ class CHWriteConverter(Converter):
2525
set,
2626
]
2727

28-
def convert(self, event):
29-
28+
def row(self, row):
3029
columns_to_delete = []
3130

32-
for column in event.row:
33-
if (event.row[column] is None) and self.delete_empty_columns:
31+
for column in row:
32+
if (row[column] is None) and self.delete_empty_columns:
3433
# include empty column to the list of to be deleted columns
3534
columns_to_delete.append(column)
3635
# move to next column
3736
continue
3837

3938
for t in self.types_to_convert:
40-
if isinstance(event.row[column], t):
41-
# print("Converting column", column, "of type", type(event.row[column]),
42-
# event.row[column])
43-
event.row[column] = str(event.row[column])
44-
# print("res", event.row[column])
39+
if isinstance(row[column], t):
40+
# print("Converting column", column, "of type", type(event.row[column]), event.row[column])
41+
row[column] = str(row[column])
42+
# print("res", event.row[column])
4543
break
4644
else:
47-
# print("Using asis column", column, "of type", type(event.row[column]))
45+
# print("Using asis column", column, "of type", type(event.row[column]))
4846
pass
4947

5048
# delete columns according to the list
5149
for column in columns_to_delete:
52-
event.row.pop(column)
50+
row.pop(column)
5351

54-
return event
52+
return row

src/converter/converter.py

+25-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,31 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4+
from ..event.event import Event
5+
6+
47
class Converter(object):
58

6-
def convert(self, event):
9+
def row(self, row):
10+
return row
11+
12+
def rows(self, rows):
13+
if rows is None:
14+
return None
15+
16+
res = []
17+
for row in rows:
18+
res.append(self.row(row))
19+
20+
return res
21+
22+
def event(self, event):
23+
event.row = self.row(event.row)
24+
event.rows = self.rows(event.rows)
725
return event
26+
27+
def convert(self, event_or_row):
28+
if isinstance(event_or_row, Event):
29+
return self.event(event_or_row)
30+
else:
31+
return self.row(event_or_row)

src/converter/csvreadconverter.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77

88
class CSVReadConverter(Converter):
99

10-
def convert(self, event):
11-
for column in event.row:
12-
if event.row[column] == '':
13-
event.row[column] = None
10+
def row(self, row):
11+
for column in row:
12+
if row[column] == '':
13+
row[column] = None
1414
# else:
1515
# try:
1616
# event.row[column] = ast.literal_eval(event.row[column])
1717
# except:
1818
# pass
19-
return event
19+
return row

src/converter/csvwriteconverter.py

+10-9
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,20 @@ class CSVWriteConverter(Converter):
1212
def __init__(self, defaults=None):
1313
self.defaults = defaults
1414

15-
def convert(self, event):
15+
def row(self, row):
16+
for column in row:
17+
if (row[column] is None) and (column in self.defaults):
18+
row[column] = self.defaults[column]
19+
return row
20+
21+
def convert(self, event_or_row):
1622
# no defaults - nothing to convert
1723
if not self.defaults:
18-
return event
24+
return event_or_row
1925

2026
# defaults are empty - nothing to convert
2127
if len(self.defaults) < 1:
22-
return event
28+
return event_or_row
2329

2430
# have defaults
25-
for column in event.row:
26-
# replace None column with default value
27-
if event.row[column] is None and column in self.defaults:
28-
event.row[column] = self.defaults[column]
29-
30-
return event
31+
return super().convert(event_or_row)

src/event/event.py

+18-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,23 @@ class Event(object):
1313
# {'id':1, 'col1':1}
1414
row = None
1515

16-
file = None
16+
# [{'id': 1, 'col1':1}, {'id': 2, 'col1': 2}, {'id': 3, 'col1': 3}]
17+
rows = None
18+
19+
# /path/to/csv/file.csv
20+
filename = None
1721

1822
# ['id', 'col1', 'col2']
19-
fieldnames = None
23+
fieldnames = None
24+
25+
def first(self):
26+
if self.rows is None:
27+
return self.row
28+
else:
29+
return self.rows[0]
30+
31+
def all(self):
32+
if self.rows is None:
33+
return [self.row]
34+
else:
35+
return self.rows

src/objectbuilder.py

+35
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ class ObjectBuilder(object):
99
instance = None
1010

1111
def __init__(self, class_name=None, constructor_params=None, instance=None):
12+
"""
13+
Builder/Wrapper for ad object.
14+
In case instance is provided - operates as a wrapper
15+
In case class_name and (optional) constructor_params provided - return instance of specified class
16+
:param class_name: class to instantiate
17+
:param constructor_params: dict of class's contrcutor params. Used as **constructor_params
18+
:param instance: ready-to use instance
19+
"""
1220
self.class_name = class_name
1321
self.constructor_params = constructor_params
1422
self.instance = instance
@@ -19,12 +27,39 @@ def param(self, name, value):
1927
self.constructor_params[name] = value
2028

2129
def get(self):
30+
"""
31+
Get object (in case wrapper) or an instance of a class (in case Object builder) - each time the same object
32+
:return: object
33+
"""
2234
if not self.class_name:
2335
# no class name - return instance, it may be None
2436
return self.instance
2537

2638
# have class name
2739

40+
# instantiate object
41+
if self.constructor_params:
42+
self.instance = self.class_name(**self.constructor_params)
43+
else:
44+
self.instance = self.class_name()
45+
46+
# in order to return instance on next get() call
47+
self.class_name = None
48+
49+
return self.instance
50+
51+
def new(self):
52+
"""
53+
Get object (in case wrapper) or an instance of a class (in case Object builder) - each time new object
54+
:return: object
55+
"""
56+
if not self.class_name:
57+
# no class name - return instance, it may be None
58+
return self.instance
59+
60+
# have class name
61+
62+
# instantiate object
2863
if self.constructor_params:
2964
return self.class_name(**self.constructor_params)
3065
else:

src/observable.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
5+
class Observable(object):
6+
7+
event_handlers = {
8+
'Event1': [],
9+
'Event2': [],
10+
}
11+
12+
def subscribe(self, event_handlers):
13+
for event_name in event_handlers:
14+
if event_name in self.event_handlers:
15+
if callable(event_handlers[event_name]):
16+
self.event_handlers[event_name].append(event_handlers[event_name])
17+
else:
18+
if isinstance(event_handlers[event_name], list):
19+
for callback in event_handlers[event_name]:
20+
if callable(callback):
21+
self.event_handlers[event_name].append(callback)
22+
23+
def notify(self, event_name, **attrs):
24+
for callback in self.event_handlers[event_name]:
25+
callback(**attrs)
26+
27+
def subscribers(self, event_name):
28+
return event_name in self.event_handlers and (len(self.event_handlers[event_name]) > 0)

src/pumper.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@ def __init__(self, reader=None, writer=None):
1616
if self.reader:
1717
self.reader.subscribe({
1818
'WriteRowsEvent': self.write_rows_event,
19-
'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
19+
# 'WriteRowsEvent.EachRow': self.write_rows_event_each_row,
2020
'ReaderIdleEvent': self.reader_idle_event,
2121
})
2222

2323
def run(self):
2424
self.reader.read()
2525

2626
def write_rows_event(self, event=None):
27-
# binlog_event.dump()
28-
pass
27+
self.writer.insert(event)
2928

3029
def write_rows_event_each_row(self, event=None):
3130
self.writer.insert(event)

src/reader/csvreader.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ def read(self):
4141
try:
4242
event = Event()
4343
event.table = os.path.splitext(self.csv_file_path)[0]
44-
self.fire('WriteRowsEvent', event=event)
44+
self.notify('WriteRowsEvent', event=event)
4545
for row in self.reader:
4646
event.row = row
47-
self.fire('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event)
47+
self.notify('WriteRowsEvent.EachRow', event=self.converter.convert(event) if self.converter else event)
4848
except KeyboardInterrupt:
4949
pass
5050

src/reader/mysqlreader.py

+14-8
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,22 @@ def read(self):
6969
while True:
7070
for mysql_event in self.binlog_stream:
7171
if isinstance(mysql_event, WriteRowsEvent):
72-
event = Event()
73-
event.schema = mysql_event.schema
74-
event.table = mysql_event.table
75-
self.fire('WriteRowsEvent', event=event)
76-
for row in mysql_event.rows:
72+
if self.subscribers('WriteRowsEvent'):
7773
event = Event()
7874
event.schema = mysql_event.schema
7975
event.table = mysql_event.table
80-
event.row = row['values']
81-
self.fire('WriteRowsEvent.EachRow', event=event)
76+
event.rows = []
77+
for row in mysql_event.rows:
78+
event.rows.append(row['values'])
79+
self.notify('WriteRowsEvent', event=event)
80+
81+
if self.subscribers('WriteRowsEvent.EachRow'):
82+
for row in mysql_event.rows:
83+
event = Event()
84+
event.schema = mysql_event.schema
85+
event.table = mysql_event.table
86+
event.row = row['values']
87+
self.notify('WriteRowsEvent.EachRow', event=event)
8288
else:
8389
# skip non-insert events
8490
pass
@@ -87,7 +93,7 @@ def read(self):
8793
break
8894

8995
# blocking
90-
self.fire('ReaderIdleEvent')
96+
self.notify('ReaderIdleEvent')
9197

9298
except KeyboardInterrupt:
9399
pass

0 commit comments

Comments
 (0)