Skip to content

Commit f064beb

Browse files
authored
Merge pull request #33 from sunsingerus/master
memory consumtion
2 parents 5a36c45 + 17e676f commit f064beb

File tree

6 files changed

+44
-21
lines changed

6 files changed

+44
-21
lines changed

run_datatypes.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"
44

5-
python3 main.py \
5+
python3 main.py ${*:1} \
66
--src-resume \
77
--src-wait \
88
--src-host=127.0.0.1 \

run_ontime.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"
44

5-
python3.6 main.py \
5+
python3.6 main.py ${*:1} \
66
--src-resume \
77
--src-wait \
88
--nice-pause=1 \
9+
--log-level=info \
910
--src-host=127.0.0.1 \
1011
--src-user=root \
1112
--dst-host=127.0.0.1 \
@@ -21,3 +22,4 @@ python3.6 main.py \
2122
# --dst-schema=db
2223
# --dst-table=datatypes
2324
# --csvpool-keep-files
25+
# --log-level=info \

src/event/event.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ class Event(object):
1010
# table name
1111
table = None
1212

13+
#for row in mysql_event.rows:
14+
# event.rows.append(row['values'])
15+
mysql_event = None
16+
1317
# {'id':1, 'col1':1}
1418
row = None
1519

@@ -22,14 +26,33 @@ class Event(object):
2226
# ['id', 'col1', 'col2']
2327
fieldnames = None
2428

25-
def first(self):
26-
if self.rows is None:
27-
return self.row
29+
_iter = None
30+
31+
def __iter__(self):
32+
if self.mysql_event is not None:
33+
self._iter = iter(self.mysql_event.rows)
34+
35+
elif self.row is not None:
36+
self._iter = iter([self.row])
37+
2838
else:
29-
return self.rows[0]
39+
self._iter = iter(self.rows)
40+
41+
return self
3042

31-
def all(self):
32-
if self.rows is None:
33-
return [self.row]
43+
def __next__(self):
44+
item = next(self._iter)
45+
46+
if self.mysql_event is not None:
47+
return item['values']
3448
else:
35-
return self.rows
49+
return item
50+
51+
def column_names(self):
52+
if self.mysql_event is not None:
53+
return self.mysql_event.rows[0]['values'].keys()
54+
55+
if self.row is not None:
56+
return self.row.keys()
57+
58+
return self.rows[0].keys()

src/reader/mysqlreader.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,7 @@ def read(self):
8787
event = Event()
8888
event.schema = mysql_event.schema
8989
event.table = mysql_event.table
90-
event.rows = []
91-
for row in mysql_event.rows:
92-
event.rows.append(row['values'])
90+
event.mysql_event = mysql_event
9391
self.notify('WriteRowsEvent', event=event)
9492

9593
if self.subscribers('WriteRowsEvent.EachRow'):

src/writer/chwriter.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ def insert(self, event_or_events=None):
4242

4343
logging.debug('class:%s insert %d rows', __class__, len(events))
4444

45-
values = []
45+
rows = []
4646
event_converted = None
4747
for event in events:
4848
event_converted = self.convert(event)
49-
for value in event_converted.all():
50-
values.append(value)
49+
for row in event_converted:
50+
rows.append(row)
5151

5252
schema = self.dst_schema if self.dst_schema else event_converted.schema
5353
table = self.dst_table if self.dst_table else event_converted.table
@@ -56,13 +56,13 @@ def insert(self, event_or_events=None):
5656
sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format(
5757
schema,
5858
table,
59-
', '.join(map(lambda column: '`%s`' % column, values[0].keys()))
59+
', '.join(map(lambda column: '`%s`' % column, rows[0].keys()))
6060
)
61-
self.client.execute(sql, values)
61+
self.client.execute(sql, rows)
6262
except:
6363
print('QUERY FAILED -------------------------')
6464
print(sql)
65-
print(values)
65+
print(rows)
6666

6767

6868
if __name__ == '__main__':

src/writer/csvwriter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def insert(self, event_or_events):
7979
self.open()
8080

8181
if not self.writer:
82-
self.fieldnames = sorted(events[0].first().keys())
82+
self.fieldnames = sorted(events[0].column_names())
8383
if self.dst_schema is None:
8484
self.dst_schema = events[0].schema
8585
if self.dst_table is None:
@@ -90,7 +90,7 @@ def insert(self, event_or_events):
9090
self.writer.writeheader()
9191

9292
for event in events:
93-
for row in event.all():
93+
for row in event:
9494
self.writer.writerow(self.convert(row))
9595

9696
def push(self):

0 commit comments

Comments
 (0)