Skip to content

Commit 75bd2c1

Browse files
committed
process writer
1 parent d65708b commit 75bd2c1

10 files changed

+128
-44
lines changed

main.py

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from src.daemon import Daemon
77

88
import sys
9+
import multiprocessing as mp
910

1011

1112
if sys.version_info[0] < 3:
@@ -17,6 +18,7 @@ class Main(Daemon):
1718
config = None
1819

1920
def __init__(self):
21+
mp.set_start_method('forkserver')
2022
self.config = CLIOpts.config()
2123
super().__init__(pidfile=self.config.pid_file())
2224

src/config.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from .writer.csvwriter import CSVWriter
99
from .writer.chcsvwriter import CHCSVWriter
1010
from .writer.poolwriter import PoolWriter
11+
from .writer.processwriter import ProcessWriter
1112
from .objectbuilder import ObjectBuilder
1213

1314
from .converter.csvwriteconverter import CSVWriteConverter
@@ -42,19 +43,23 @@ def reader(self):
4243
return MySQLReader(**self.config['reader-config']['mysql'])
4344

4445
def writer_builder(self):
45-
4646
if self.config['app-config']['csvpool']:
47-
return ObjectBuilder(class_name=CSVWriter, params={
48-
**self.config['writer-config']['file'],
49-
'next_writer_builder': ObjectBuilder(instance=CHCSVWriter(**self.config['writer-config']['clickhouse']['connection_settings'])),
50-
'converter_builder': ObjectBuilder(instance=CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value'])) if self.config['converter-config']['csv']['column_default_value'] else None,
47+
return ObjectBuilder(class_name=ProcessWriter, constructor_params={
48+
'next_writer_builder': ObjectBuilder(class_name=CSVWriter, constructor_params={
49+
**self.config['writer-config']['file'],
50+
'next_writer_builder': ObjectBuilder(
51+
class_name=CHCSVWriter,
52+
constructor_params=self.config['writer-config']['clickhouse']['connection_settings']
53+
),
54+
'converter_builder': ObjectBuilder(instance=CSVWriteConverter(defaults=self.config['converter-config']['csv']['column_default_value'])) if self.config['converter-config']['csv']['column_default_value'] else None,
55+
})
5156
})
5257

5358
elif self.config['writer-config']['file']['csv_file_path']:
54-
return ObjectBuilder(class_name=CSVWriter, params=self.config['writer-config']['file'])
59+
return ObjectBuilder(class_name=CSVWriter, constructor_params=self.config['writer-config']['file'])
5560

5661
else:
57-
return ObjectBuilder(class_name=CHWriter, params=self.config['writer-config']['clickhouse'])
62+
return ObjectBuilder(class_name=CHWriter, constructor_params=self.config['writer-config']['clickhouse'])
5863

5964
def writer(self):
6065
if self.config['app-config']['mempool']:

src/objectbuilder.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,27 @@
55
class ObjectBuilder(object):
66

77
class_name = None
8-
params = None
8+
constructor_params = None
99
instance = None
1010

11-
def __init__(self, class_name=None, params=None, instance=None):
11+
def __init__(self, class_name=None, constructor_params=None, instance=None):
1212
self.class_name = class_name
13-
self.params = params
13+
self.constructor_params = constructor_params
1414
self.instance = instance
1515

1616
def param(self, name, value):
17-
self.params[name] = value
17+
if not self.constructor_params:
18+
self.constructor_params = {}
19+
self.constructor_params[name] = value
1820

1921
def get(self):
20-
2122
if not self.class_name:
2223
# no class name - return instance, it may be None
2324
return self.instance
2425

2526
# have class name
2627

27-
if self.params:
28-
return self.class_name(**self.params)
28+
if self.constructor_params:
29+
return self.class_name(**self.constructor_params)
2930
else:
3031
return self.class_name()

src/pumper.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4+
import time
45

56
class Pumper(object):
67

src/reader/mysqlreader.py

-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ def read(self):
8888

8989
# blocking
9090
self.fire('ReaderIdleEvent')
91-
time.sleep(1)
9291

9392
except KeyboardInterrupt:
9493
pass

src/writer/chcsvwriter.py

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
#!/usr/bin/env python
22
# -*- coding: utf-8 -*-
33

4+
from .writer import Writer
5+
46
import os
7+
import time
58

6-
class CHCSVWriter(object):
9+
class CHCSVWriter(Writer):
710

811
host = None
912
port = None
@@ -26,7 +29,11 @@ def insert(self, event_or_events=None):
2629
# },
2730
# ]
2831

29-
for event in event_or_events:
32+
events = self.listify(event_or_events)
33+
if len(events) < 1:
34+
return
35+
36+
for event in events:
3037
sql = 'INSERT INTO `{0}`.`{1}` ({2}) FORMAT CSV'.format(
3138
event.schema,
3239
event.table,

src/writer/chwriter.py

+3-12
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,15 @@ def insert(self, event_or_events=None):
2828
# },
2929
# ]
3030

31-
if event_or_events is None:
32-
# nothing to insert at all
31+
events = self.listify(event_or_events)
32+
if len(events) < 1:
3333
return
3434

35-
elif isinstance(event_or_events, list):
36-
if len(event_or_events) < 1:
37-
# list is empty - nothing to insert
38-
return
39-
40-
else:
41-
# event_or_events is instance of Event
42-
event_or_events = [event_or_events]
43-
4435
converter = CHWriteConverter()
4536

4637
values = []
4738
ev = None
48-
for event in event_or_events:
39+
for event in events:
4940
ev = converter.convert(event)
5041
values.append(ev.row)
5142

src/writer/csvwriter.py

+6-15
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ def __init__(
3131
next_writer_builder=None,
3232
converter_builder=None,
3333
):
34+
super().__init__(next_writer_builder=next_writer_builder, converter_builder=converter_builder)
35+
3436
self.path = csv_file_path
3537
self.path_prefix = csv_file_path_prefix
3638
self.path_suffix_parts = csv_file_path_suffix_parts
3739
self.dst_db = dst_db
3840
self.dst_table = dst_table
39-
self.next_writer_builder = next_writer_builder
40-
self.converter_builder = converter_builder
4141

4242
if self.path is None:
4343
self.path = self.path_prefix + '_' + '_'.join(self.path_suffix_parts) + '.csv'
@@ -68,29 +68,20 @@ def insert(self, event_or_events):
6868
# },
6969
# ]
7070

71-
if event_or_events is None:
72-
# nothing to insert at all
71+
events = self.listify(event_or_events)
72+
if len(events) < 1:
7373
return
7474

75-
elif isinstance(event_or_events, list):
76-
if len(event_or_events) < 1:
77-
# list is empty - nothing to insert
78-
return
79-
80-
else:
81-
# event_or_events is instance of Event
82-
event_or_events = [event_or_events]
83-
8475
if not self.opened():
8576
self.open()
8677

8778
if not self.writer:
88-
self.fieldnames = sorted(event_or_events[0].row.keys())
79+
self.fieldnames = sorted(events[0].row.keys())
8980
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames)
9081
if not self.header_written:
9182
self.writer.writeheader()
9283

93-
for event in event_or_events:
84+
for event in events:
9485
self.writer.writerow(self.converter_builder.get().convert(event).row if self.converter_builder else event.row)
9586

9687
def push(self):

src/writer/processwriter.py

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
from .writer import Writer
5+
import multiprocessing as mp
6+
7+
8+
class ProcessWriter(Writer):
9+
10+
args = None
11+
12+
def __init__(self, **kwargs):
13+
next_writer_builder = kwargs.pop('next_writer_builder', None)
14+
converter_builder = kwargs.pop('converter_builder', None)
15+
super().__init__(next_writer_builder=next_writer_builder, converter_builder=converter_builder)
16+
for arg in kwargs:
17+
self.next_writer_builder.param(arg, kwargs[arg])
18+
19+
def opened(self):
20+
pass
21+
22+
def open(self):
23+
pass
24+
25+
def process(self, event_or_events=None):
26+
writer = self.next_writer_builder.get()
27+
writer.insert(event_or_events)
28+
writer.close()
29+
writer.push()
30+
writer.destroy()
31+
32+
def insert(self, event_or_events=None):
33+
# event_or_events = [
34+
# event: {
35+
# row: {'id': 3, 'a': 3}
36+
# },
37+
# event: {
38+
# row: {'id': 3, 'a': 3}
39+
# },
40+
# ]
41+
process = mp.Process(target=self.process, args=(event_or_events,))
42+
print('Start Process')
43+
process.start()
44+
#print('Join Process')
45+
#process.join()
46+
#print('Done Process')
47+
pass
48+
49+
def flush(self):
50+
pass
51+
52+
def push(self):
53+
pass
54+
55+
def destroy(self):
56+
pass
57+
58+
def close(self):
59+
pass

src/writer/writer.py

+28
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,40 @@ class Writer(object):
77
next_writer_builder = None
88
converter_builder = None
99

10+
def __init__(
11+
self,
12+
next_writer_builder=None,
13+
converter_builder=None
14+
):
15+
self.next_writer_builder = next_writer_builder
16+
self.converter_builder = converter_builder
17+
1018
def opened(self):
1119
pass
1220

1321
def open(self):
1422
pass
1523

24+
def listify(self, obj_or_list):
25+
"""Ensure list"""
26+
27+
if obj_or_list is None:
28+
# no value - return empty list
29+
return []
30+
31+
elif isinstance(obj_or_list, list):
32+
if len(obj_or_list) < 1:
33+
# list is empty - nothing to do
34+
return []
35+
else:
36+
# list is good
37+
return obj_or_list
38+
39+
else:
40+
# event_or_events is an object
41+
return [obj_or_list]
42+
43+
1644
def insert(self, event_or_events=None):
1745
# event_or_events = [
1846
# event: {

0 commit comments

Comments
 (0)