Skip to content

Commit cb3f30e

Browse files
committed
converters unification
1 parent 75bd2c1 commit cb3f30e

File tree

6 files changed

+46
-22
lines changed

6 files changed

+46
-22
lines changed

run.sh

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ python3 main.py \
55
--src-host=127.0.0.1 --src-user=reader --src-password=qwerty \
66
--dst-host=192.168.74.251 \
77
--dst-db=db --dst-table=datatypes \
8-
--mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \
9-
--csvpool --csvpool-file-path-prefix=qwe \
8+
--csvpool --csvpool-file-path-prefix=qwe_ \
109
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03
1110

12-
# --dst-file=dst.csv
11+
# --mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \
12+
# --dst-file=dst.csv
1313
# --csvpool-keep-files

src/cliopts.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def config():
7474
argparser.add_argument(
7575
'--mempool-max-events-num',
7676
type=int,
77-
default=1000,
77+
default=100000,
7878
help='max events num to pool before batch write'
7979
)
8080
argparser.add_argument(
@@ -91,7 +91,7 @@ def config():
9191
argparser.add_argument(
9292
'--csvpool-file-path-prefix',
9393
type=str,
94-
default='/tmp/csvpool',
94+
default='/tmp/csvpool_',
9595
help='file path prefix to CSV pool files'
9696
)
9797
argparser.add_argument(
@@ -220,7 +220,7 @@ def config():
220220
'dry': args.dry,
221221
'daemon': args.daemon,
222222
'pid_file': args.pid_file,
223-
'mempool': args.mempool,
223+
'mempool': args.mempool or args.csvpool, # csvpool assumes mempool to be enabled
224224
'mempool-max-events-num': args.mempool_max_events_num,
225225
'mempool-max-flush-interval': args.mempool_max_flush_interval,
226226
'csvpool': args.csvpool,

src/config.py

+20-3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .objectbuilder import ObjectBuilder
1313

1414
from .converter.csvwriteconverter import CSVWriteConverter
15+
from .converter.chwriteconverter import CHWriteConverter
1516

1617

1718
class Config(object):
@@ -42,6 +43,16 @@ def reader(self):
4243
else:
4344
return MySQLReader(**self.config['reader-config']['mysql'])
4445

46+
def converter_builder(self):
47+
if not self.config['converter-config']['csv']['column_default_value']:
48+
# no default values for CSV columns provided
49+
return None
50+
51+
return ObjectBuilder(
52+
instance=CSVWriteConverter(
53+
defaults=self.config['converter-config']['csv']['column_default_value']
54+
))
55+
4556
def writer_builder(self):
4657
if self.config['app-config']['csvpool']:
4758
return ObjectBuilder(class_name=ProcessWriter, constructor_params={
@@ -51,15 +62,21 @@ def writer_builder(self):
5162
class_name=CHCSVWriter,
5263
constructor_params=self.config['writer-config']['clickhouse']['connection_settings']
5364
),
54-
'converter_builder': ObjectBuilder(instance=CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value'])) if self.config['converter-config']['csv']['column_default_value'] else None,
65+
'converter_builder': self.converter_builder(),
5566
})
5667
})
5768

5869
elif self.config['writer-config']['file']['csv_file_path']:
59-
return ObjectBuilder(class_name=CSVWriter, constructor_params=self.config['writer-config']['file'])
70+
return ObjectBuilder(class_name=CSVWriter, constructor_params={
71+
**self.config['writer-config']['file'],
72+
'converter_builder': self.converter_builder(),
73+
})
6074

6175
else:
62-
return ObjectBuilder(class_name=CHWriter, constructor_params=self.config['writer-config']['clickhouse'])
76+
return ObjectBuilder(class_name=CHWriter, constructor_params={
77+
**self.config['writer-config']['clickhouse'],
78+
'converter_builder': ObjectBuilder(instance=CHWriteConverter()),
79+
})
6380

6481
def writer(self):
6582
if self.config['app-config']['mempool']:

src/writer/chwriter.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
from clickhouse_driver.client import Client
55
from .writer import Writer
6-
from ..event.event import Event
7-
from ..converter.chwriteconverter import CHWriteConverter
86

97

108
class CHWriter(Writer):
@@ -13,7 +11,16 @@ class CHWriter(Writer):
1311
dst_db = None
1412
dst_table = None
1513

16-
def __init__(self, connection_settings, dst_db, dst_table):
14+
def __init__(
15+
self,
16+
connection_settings,
17+
dst_db=None,
18+
dst_table=None,
19+
next_writer_builder=None,
20+
converter_builder=None,
21+
):
22+
super().__init__(next_writer_builder=next_writer_builder, converter_builder=converter_builder)
23+
1724
self.client = Client(**connection_settings)
1825
self.dst_db = dst_db
1926
self.dst_table = dst_table
@@ -32,16 +39,14 @@ def insert(self, event_or_events=None):
3239
if len(events) < 1:
3340
return
3441

35-
converter = CHWriteConverter()
36-
3742
values = []
38-
ev = None
43+
event_converted = None
3944
for event in events:
40-
ev = converter.convert(event)
41-
values.append(ev.row)
45+
event_converted = self.convert(event)
46+
values.append(event_converted.row)
4247

43-
schema = self.dst_db if self.dst_db else ev.schema
44-
table = self.dst_table if self.dst_table else ev.table
48+
schema = self.dst_db if self.dst_db else event_converted.schema
49+
table = self.dst_table if self.dst_table else event_converted.table
4550

4651
try:
4752
sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format(

src/writer/csvwriter.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def __init__(
4040
self.dst_table = dst_table
4141

4242
if self.path is None:
43-
self.path = self.path_prefix + '_' + '_'.join(self.path_suffix_parts) + '.csv'
43+
self.path = self.path_prefix + '_'.join(self.path_suffix_parts) + '.csv'
4444
self.delete = not csv_keep_file
4545

4646
def __del__(self):
@@ -82,7 +82,7 @@ def insert(self, event_or_events):
8282
self.writer.writeheader()
8383

8484
for event in events:
85-
self.writer.writerow(self.converter_builder.get().convert(event).row if self.converter_builder else event.row)
85+
self.writer.writerow(self.convert(event).row)
8686

8787
def push(self):
8888
if not self.next_writer_builder:
@@ -93,7 +93,7 @@ def push(self):
9393
event.table = self.dst_table
9494
event.file = self.path
9595
event.fieldnames = self.fieldnames
96-
self.next_writer_builder.get().insert([event])
96+
self.next_writer_builder.get().insert(event)
9797

9898
def close(self):
9999
if self.opened():

src/writer/writer.py

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def listify(self, obj_or_list):
4040
# event_or_events is an object
4141
return [obj_or_list]
4242

43+
def convert(self, event):
44+
return self.converter_builder.get().convert(event) if self.converter_builder else event
4345

4446
def insert(self, event_or_events=None):
4547
# event_or_events = [

0 commit comments

Comments
 (0)