diff --git a/run_datatypes.sh b/run_datatypes.sh index 2675ff4..fc4e9a3 100755 --- a/run_datatypes.sh +++ b/run_datatypes.sh @@ -2,7 +2,7 @@ sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" -python3 main.py \ +python3 main.py ${*:1} \ --src-resume \ --src-wait \ --src-host=127.0.0.1 \ diff --git a/run_ontime.sh b/run_ontime.sh index 292e442..520ad4a 100755 --- a/run_ontime.sh +++ b/run_ontime.sh @@ -2,10 +2,11 @@ sudo bash -c "echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse" -python3.6 main.py \ +python3.6 main.py ${*:1} \ --src-resume \ --src-wait \ --nice-pause=1 \ + --log-level=info \ --src-host=127.0.0.1 \ --src-user=root \ --dst-host=127.0.0.1 \ @@ -21,3 +22,4 @@ python3.6 main.py \ # --dst-schema=db # --dst-table=datatypes # --csvpool-keep-files +# --log-level=info \ diff --git a/src/event/event.py b/src/event/event.py index f1e502c..fe4d7f8 100644 --- a/src/event/event.py +++ b/src/event/event.py @@ -10,6 +10,10 @@ class Event(object): # table name table = None + #for row in mysql_event.rows: + # event.rows.append(row['values']) + mysql_event = None + # {'id':1, 'col1':1} row = None @@ -22,14 +26,33 @@ class Event(object): # ['id', 'col1', 'col2'] fieldnames = None - def first(self): - if self.rows is None: - return self.row + _iter = None + + def __iter__(self): + if self.mysql_event is not None: + self._iter = iter(self.mysql_event.rows) + + elif self.row is not None: + self._iter = iter([self.row]) + else: - return self.rows[0] + self._iter = iter(self.rows) + + return self - def all(self): - if self.rows is None: - return [self.row] + def __next__(self): + item = next(self._iter) + + if self.mysql_event is not None: + return item['values'] else: - return self.rows + return item + + def column_names(self): + if self.mysql_event is not None: + return self.mysql_event.rows[0]['values'].keys() + + if self.row is not None: + return self.row.keys() + + return self.rows[0].keys() diff --git a/src/reader/mysqlreader.py b/src/reader/mysqlreader.py index ebb607a..f5589c5 100644 --- a/src/reader/mysqlreader.py +++ b/src/reader/mysqlreader.py @@ -87,9 +87,7 @@ def read(self): event = Event() event.schema = mysql_event.schema event.table = mysql_event.table - event.rows = [] - for row in mysql_event.rows: - event.rows.append(row['values']) + event.mysql_event = mysql_event self.notify('WriteRowsEvent', event=event) if self.subscribers('WriteRowsEvent.EachRow'): diff --git a/src/writer/chwriter.py b/src/writer/chwriter.py index c68a98e..cd307b3 100644 --- a/src/writer/chwriter.py +++ b/src/writer/chwriter.py @@ -42,12 +42,12 @@ def insert(self, event_or_events=None): logging.debug('class:%s insert %d rows', __class__, len(events)) - values = [] + rows = [] event_converted = None for event in events: event_converted = self.convert(event) - for value in event_converted.all(): - values.append(value) + for row in event_converted: + rows.append(row) schema = self.dst_schema if self.dst_schema else event_converted.schema table = self.dst_table if self.dst_table else event_converted.table @@ -56,13 +56,13 @@ def insert(self, event_or_events=None): sql = 'INSERT INTO `{0}`.`{1}` ({2}) VALUES'.format( schema, table, - ', '.join(map(lambda column: '`%s`' % column, values[0].keys())) + ', '.join(map(lambda column: '`%s`' % column, rows[0].keys())) ) - self.client.execute(sql, values) + self.client.execute(sql, rows) except: print('QUERY FAILED -------------------------') print(sql) - print(values) + print(rows) if __name__ == '__main__': diff --git a/src/writer/csvwriter.py b/src/writer/csvwriter.py index 6e70f56..2074683 100644 --- a/src/writer/csvwriter.py +++ b/src/writer/csvwriter.py @@ -79,7 +79,7 @@ def insert(self, event_or_events): self.open() if not self.writer: - self.fieldnames = sorted(events[0].first().keys()) + self.fieldnames = sorted(events[0].column_names()) if self.dst_schema is None: self.dst_schema = events[0].schema if self.dst_table is None: @@ -90,7 +90,7 @@ def insert(self, event_or_events): self.writer.writeheader() for event in events: - for row in event.all(): + for row in event: self.writer.writerow(self.convert(row)) def push(self):