Skip to content

Commit c4f6ec1

Browse files
committed
pass schema.table info through csv pool
1 parent cb3f30e commit c4f6ec1

File tree

7 files changed

+46
-32
lines changed

7 files changed

+46
-32
lines changed

run.sh

+16-7
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11
#!/bin/bash
22

33
python3 main.py \
4-
--src-resume --src-wait \
5-
--src-host=127.0.0.1 --src-user=reader --src-password=qwerty \
6-
--dst-host=192.168.74.251 \
7-
--dst-db=db --dst-table=datatypes \
8-
--csvpool --csvpool-file-path-prefix=qwe_ \
9-
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03
4+
--src-resume \
5+
--src-wait \
6+
--src-host=127.0.0.1 \
7+
--src-user=reader \
8+
--src-password=qwerty \
9+
--dst-host=192.168.74.251 \
10+
--csvpool \
11+
--csvpool-file-path-prefix=qwe_ \
12+
--csv-column-default-value date_1=2000-01-01 datetime_1=2000-01-01\ 01:02:03 time_1=2001-01-01\ 01:02:03 timestamp_1=2002-01-01\ 01:02:03 \
13+
--mempool-max-flush-interval=600 \
14+
--mempool-max-events-num=900000
1015

11-
# --mempool --mempool-max-events-num=3 --mempool-max-flush-interval=30 \
16+
# --mempool
17+
# --mempool-max-events-num=3
18+
# --mempool-max-flush-interval=30
1219
# --dst-file=dst.csv
20+
# --dst-schema=db
21+
# --dst-table=datatypes
1322
# --csvpool-keep-files

src/cliopts.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def config():
5252
argparser.add_argument(
5353
'--dry',
5454
action='store_true',
55-
help='Dry mode - do not do anything that can harm.'
55+
help='Dry mode - do not do anything that can harm. '
5656
'Useful for debugging.'
5757
)
5858
argparser.add_argument(
@@ -75,29 +75,29 @@ def config():
7575
'--mempool-max-events-num',
7676
type=int,
7777
default=100000,
78-
help='max events num to pool before batch write'
78+
help='Max events number to pool - triggering pool flush'
7979
)
8080
argparser.add_argument(
8181
'--mempool-max-flush-interval',
8282
type=int,
8383
default=60,
84-
help='max seconds num between flushes'
84+
help='Max seconds number between pool flushes'
8585
)
8686
argparser.add_argument(
8787
'--csvpool',
8888
action='store_true',
89-
help='Cache data in csv files.'
89+
help='Cache data in CSV pool files on disk. Requires memory pooling, thus enables --mempool even if it is not explicitly specified'
9090
)
9191
argparser.add_argument(
9292
'--csvpool-file-path-prefix',
9393
type=str,
9494
default='/tmp/csvpool_',
95-
help='file path prefix to CSV pool files'
95+
help='File path prefix to CSV pool files'
9696
)
9797
argparser.add_argument(
9898
'--csvpool-keep-files',
9999
action='store_true',
100-
help='Keep pool csv files.'
100+
help='Keep CSV pool files. Useful for debugging'
101101
)
102102

103103
argparser.add_argument(
@@ -156,7 +156,7 @@ def config():
156156
'--src-file',
157157
type=str,
158158
default=None,
159-
help='Source file tp read data from'
159+
help='Source file to read data from'
160160
)
161161

162162
argparser.add_argument(
@@ -190,10 +190,10 @@ def config():
190190
help='Password to be used when writing to dst'
191191
)
192192
argparser.add_argument(
193-
'--dst-db',
193+
'--dst-schema',
194194
type=str,
195195
default=None,
196-
help='Database to be used when writing to dst'
196+
help='Database/schema to be used when writing to dst'
197197
)
198198
argparser.add_argument(
199199
'--dst-table',
@@ -262,15 +262,15 @@ def config():
262262
'user': args.dst_user,
263263
'password': args.dst_password,
264264
},
265-
'dst_db': args.dst_db,
265+
'dst_schema': args.dst_schema,
266266
'dst_table': args.dst_table,
267267
},
268268
'file': {
269269
'csv_file_path': args.dst_file,
270270
'csv_file_path_prefix': args.csvpool_file_path_prefix,
271271
'csv_file_path_suffix_parts': [],
272272
'csv_keep_file': args.csvpool_keep_files,
273-
'dst_db': args.dst_db,
273+
'dst_schema': args.dst_schema,
274274
'dst_table': args.dst_table,
275275
},
276276
},

src/pool/bbpool.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,17 @@ def flush(self, key=None):
8989
def rotate_belt(self, belt_index, flush=False):
9090
now = int(time.time())
9191
need_rotation = True if flush else False
92-
rotate_by = "FLUSH"
92+
rotate_reason = "FLUSH"
9393

9494
if len(self.belts[belt_index][0]) >= self.max_bucket_size:
9595
# 0-index bucket is full
9696
need_rotation = True
97-
rotate_by = "SIZE"
97+
rotate_reason = "SIZE"
9898

9999
elif now >= self.belts_rotated_at[belt_index] + self.max_interval_between_rotations:
100100
# time interval reached
101101
need_rotation = True
102-
rotate_by = "TIME"
102+
rotate_reason = "TIME"
103103

104104
if not need_rotation:
105105
# belt not rotated
@@ -121,7 +121,7 @@ def rotate_belt(self, belt_index, flush=False):
121121

122122
buckets_num = len(self.belts[belt_index])
123123
last_bucket_size = len(self.belts[belt_index][buckets_num-1])
124-
print(now, self.buckets_count, 'rotating belt', belt_index, 'rotate by', rotate_by, 'buckets_num', buckets_num, 'last bucket size', last_bucket_size, 'belts:', len(self.belts))
124+
print('rotating belt. now:', now, 'bucket number:', self.buckets_count, 'index:', belt_index, 'reason:', rotate_reason, 'buckets on belt:', buckets_num, 'last bucket size:', last_bucket_size, 'belts count:', len(self.belts))
125125

126126
# time to flush data for specified key
127127
self.writer_builder.param('csv_file_path_suffix_parts', [str(now), str(self.buckets_count)])

src/writer/chcsvwriter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def insert(self, event_or_events=None):
5555
sql,
5656
)
5757

58-
print('running:', bash)
58+
# print('running:', bash)
5959
os.system(bash)
6060

6161
pass

src/writer/chwriter.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,21 @@
88
class CHWriter(Writer):
99

1010
client = None
11-
dst_db = None
11+
dst_schema = None
1212
dst_table = None
1313

1414
def __init__(
1515
self,
1616
connection_settings,
17-
dst_db=None,
17+
dst_schema=None,
1818
dst_table=None,
1919
next_writer_builder=None,
2020
converter_builder=None,
2121
):
2222
super().__init__(next_writer_builder=next_writer_builder, converter_builder=converter_builder)
2323

2424
self.client = Client(**connection_settings)
25-
self.dst_db = dst_db
25+
self.dst_schema = dst_schema
2626
self.dst_table = dst_table
2727

2828
def insert(self, event_or_events=None):
@@ -45,7 +45,7 @@ def insert(self, event_or_events=None):
4545
event_converted = self.convert(event)
4646
values.append(event_converted.row)
4747

48-
schema = self.dst_db if self.dst_db else event_converted.schema
48+
schema = self.dst_schema if self.dst_schema else event_converted.schema
4949
table = self.dst_table if self.dst_table else event_converted.table
5050

5151
try:

src/writer/csvwriter.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class CSVWriter(Writer):
1212
file = None
1313
path = None
1414
writer = None
15-
dst_db = None
15+
dst_schema = None
1616
dst_table = None
1717
fieldnames = None
1818
header_written = False
@@ -26,7 +26,7 @@ def __init__(
2626
csv_file_path_prefix=None,
2727
csv_file_path_suffix_parts=[],
2828
csv_keep_file=False,
29-
dst_db=None,
29+
dst_schema=None,
3030
dst_table=None,
3131
next_writer_builder=None,
3232
converter_builder=None,
@@ -36,7 +36,7 @@ def __init__(
3636
self.path = csv_file_path
3737
self.path_prefix = csv_file_path_prefix
3838
self.path_suffix_parts = csv_file_path_suffix_parts
39-
self.dst_db = dst_db
39+
self.dst_schema = dst_schema
4040
self.dst_table = dst_table
4141

4242
if self.path is None:
@@ -77,6 +77,11 @@ def insert(self, event_or_events):
7777

7878
if not self.writer:
7979
self.fieldnames = sorted(events[0].row.keys())
80+
if self.dst_schema is None:
81+
self.dst_schema = events[0].schema
82+
if self.dst_table is None:
83+
self.dst_table = events[0].table
84+
8085
self.writer = csv.DictWriter(self.file, fieldnames=self.fieldnames)
8186
if not self.header_written:
8287
self.writer.writeheader()
@@ -89,7 +94,7 @@ def push(self):
8994
return
9095

9196
event = Event()
92-
event.schema = self.dst_db
97+
event.schema = self.dst_schema
9398
event.table = self.dst_table
9499
event.file = self.path
95100
event.fieldnames = self.fieldnames

src/writer/processwriter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ def insert(self, event_or_events=None):
3939
# },
4040
# ]
4141
process = mp.Process(target=self.process, args=(event_or_events,))
42-
print('Start Process')
42+
#print('Start Process')
4343
process.start()
4444
#print('Join Process')
4545
#process.join()

0 commit comments

Comments
 (0)