Skip to content

memory consumtion #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion run_datatypes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"

python3 main.py \
python3 main.py ${*:1} \
--src-resume \
--src-wait \
--src-host=127.0.0.1 \
Expand Down
4 changes: 3 additions & 1 deletion run_ontime.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse"

python3.6 main.py \
python3.6 main.py ${*:1} \
--src-resume \
--src-wait \
--nice-pause=1 \
--log-level=info \
--src-host=127.0.0.1 \
--src-user=root \
--dst-host=127.0.0.1 \
Expand All @@ -21,3 +22,4 @@ python3.6 main.py \
# --dst-schema=db
# --dst-table=datatypes
# --csvpool-keep-files
# --log-level=info \
39 changes: 31 additions & 8 deletions src/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ class Event(object):
# table name
table = None

#for row in mysql_event.rows:
# event.rows.append(row['values'])
mysql_event = None

# {'id':1, 'col1':1}
row = None

Expand All @@ -22,14 +26,33 @@ class Event(object):
# ['id', 'col1', 'col2']
fieldnames = None

def first(self):
if self.rows is None:
return self.row
_iter = None

def __iter__(self):
if self.mysql_event is not None:
self._iter = iter(self.mysql_event.rows)

elif self.row is not None:
self._iter = iter([self.row])

else:
return self.rows[0]
self._iter = iter(self.rows)

return self

def all(self):
if self.rows is None:
return [self.row]
def __next__(self):
item = next(self._iter)

if self.mysql_event is not None:
return item['values']
else:
return self.rows
return item

def column_names(self):
if self.mysql_event is not None:
return self.mysql_event.rows[0]['values'].keys()

if self.row is not None:
return self.row.keys()

return self.rows[0].keys()
4 changes: 1 addition & 3 deletions src/reader/mysqlreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ def read(self):
event = Event()
event.schema = mysql_event.schema
event.table = mysql_event.table
event.rows = []
for row in mysql_event.rows:
event.rows.append(row['values'])
event.mysql_event = mysql_event
self.notify('WriteRowsEvent', event=event)

if self.subscribers('WriteRowsEvent.EachRow'):
Expand Down
12 changes: 6 additions & 6 deletions src/writer/chwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def insert(self, event_or_events=None):

logging.debug('class:%s insert %d rows', __class__, len(events))

values = []
rows = []
event_converted = None
for event in events:
event_converted = self.convert(event)
for value in event_converted.all():
values.append(value)
for row in event_converted:
rows.append(row)

schema = self.dst_schema if self.dst_schema else event_converted.schema
table = self.dst_table if self.dst_table else event_converted.table
Expand All @@ -56,13 +56,13 @@ def insert(self, event_or_events=None):
sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format(
schema,
table,
', '.join(map(lambda column: '`%s`' % column, values[0].keys()))
', '.join(map(lambda column: '`%s`' % column, rows[0].keys()))
)
self.client.execute(sql, values)
self.client.execute(sql, rows)
except:
print('QUERY FAILED -------------------------')
print(sql)
print(values)
print(rows)


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions src/writer/csvwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def insert(self, event_or_events):
self.open()

if not self.writer:
self.fieldnames = sorted(events[0].first().keys())
self.fieldnames = sorted(events[0].column_names())
if self.dst_schema is None:
self.dst_schema = events[0].schema
if self.dst_table is None:
Expand All @@ -90,7 +90,7 @@ def insert(self, event_or_events):
self.writer.writeheader()

for event in events:
for row in event.all():
for row in event:
self.writer.writerow(self.convert(row))

def push(self):
Expand Down