diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 39208413..fea9d1f8 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -12,11 +12,11 @@ class BinLogStreamReader(object): '''Connect to replication stream and read event''' def __init__(self, connection_settings={}, resume_stream=False, blocking=False, only_events=None, server_id=255): - ''' + """ resume_stream: Start for latest event of binlog or from older available event blocking: Read on stream is blocking only_events: Array of allowed events - ''' + """ self.__connection_settings = connection_settings self.__connection_settings['charset'] = 'utf8' @@ -29,7 +29,7 @@ def __init__(self, connection_settings={}, resume_stream=False, blocking=False, self.__log_pos = None self.__log_file = None - #Store table meta informations + #Store table meta information self.table_map = {} def close(self): @@ -97,7 +97,7 @@ def fetchone(self): return None binlog_event = BinLogPacketWrapper(pkt, self.table_map, self._ctl_connection) if binlog_event.event_type == TABLE_MAP_EVENT: - self.table_map[binlog_event.event.table_id] = binlog_event.event + self.table_map[binlog_event.event.table_id] = binlog_event.event.get_table() if self.__filter_event(binlog_event.event): continue if binlog_event.event_type == ROTATE_EVENT: diff --git a/pymysqlreplication/column.py b/pymysqlreplication/column.py index a31f7b38..b5aaf13c 100644 --- a/pymysqlreplication/column.py +++ b/pymysqlreplication/column.py @@ -2,60 +2,85 @@ from .constants import FIELD_TYPE from pymysql.util import byte2int, int2byte + class Column(object): - '''Definition of a column''' + """Definition of a column""" + + def __init__(self, *args, **kwargs): + self.data = {} + if len(args) == 3: + self.__parse_column_definition(*args) + else: + self.data = kwargs - def __init__(self, column_type, column_schema, packet): - self.type = column_type - self.name = column_schema["COLUMN_NAME"] - self.collation_name = column_schema["COLLATION_NAME"] - self.character_set_name = column_schema["CHARACTER_SET_NAME"] - self.comment = column_schema["COLUMN_COMMENT"] - self.unsigned = False + def __parse_column_definition(self, column_type, column_schema, packet): + self.data["type"] = column_type + self.data["name"] = column_schema["COLUMN_NAME"] + self.data["collation_name"] = column_schema["COLLATION_NAME"] + self.data["character_set_name"] = column_schema["CHARACTER_SET_NAME"] + self.data["comment"] = column_schema["COLUMN_COMMENT"] + self.data["unsigned"] = False + self.data["type_is_bool"] = False if column_schema["COLUMN_TYPE"].find("unsigned") != -1: - self.unsigned = True + self.data["unsigned"] = True if self.type == FIELD_TYPE.VAR_STRING or self.type == FIELD_TYPE.STRING: self.__read_string_metadata(packet, column_schema) elif self.type == FIELD_TYPE.VARCHAR: - self.max_length = struct.unpack('> 8 if real_type == FIELD_TYPE.SET or real_type == FIELD_TYPE.ENUM: - self.type = real_type - self.size = metadata & 0x00ff + self.data["type"] = real_type + self.data["size"] = metadata & 0x00ff self.__read_enum_metadata(column_schema) else: - self.max_length = (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff) + self.data["max_length"] = (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff) def __read_enum_metadata(self, column_schema): enums = column_schema["COLUMN_TYPE"] if self.type == FIELD_TYPE.ENUM: - self.enum_values = enums.replace('enum(', '').replace(')', '').replace('\'', '').split(',') + self.data["enum_values"] = enums.replace('enum(', '').replace(')', '').replace('\'', '').split(',') + else: + self.data["set_values"] = enums.replace('set(', '').replace(')', '').replace('\'', '').split(',') + + def __eq__(self, other): + return self.data == other.data + + def __ne__(self, other): + return not self.__eq__(other) + + def serializable_data(self): + return self.data + + def __getattr__(self, item): + if item in self.data: + return self.data[item] else: - self.set_values = enums.replace('set(', '').replace(')', '').replace('\'', '').split(',') + raise AttributeError("{0} not found".format(item)) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index a3f4edc9..d9eacbbb 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -107,15 +107,15 @@ def read_length_coded_binary(self): """ c = byte2int(self.read(1)) if c == NULL_COLUMN: - return None + return None if c < UNSIGNED_CHAR_COLUMN: - return c + return c elif c == UNSIGNED_SHORT_COLUMN: return self.unpack_uint16(self.read(UNSIGNED_SHORT_LENGTH)) elif c == UNSIGNED_INT24_COLUMN: - return self.unpack_int24(self.read(UNSIGNED_INT24_LENGTH)) + return self.unpack_int24(self.read(UNSIGNED_INT24_LENGTH)) elif c == UNSIGNED_INT64_COLUMN: - return self.unpack_int64(self.read(UNSIGNED_INT64_LENGTH)) + return self.unpack_int64(self.read(UNSIGNED_INT64_LENGTH)) def read_length_coded_string(self): """Read a 'Length Coded String' from the data buffer. diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 767e5ec8..170c988d 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -7,6 +7,8 @@ from .constants import FIELD_TYPE from .constants import BINLOG from .column import Column +from .table import Table + class RowsEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection): @@ -19,8 +21,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection): #Event V2 if self.event_type == BINLOG.WRITE_ROWS_EVENT or \ - self.event_type == BINLOG.DELETE_ROWS_EVENT or \ - self.event_type == BINLOG.UPDATE_ROWS_EVENT: + self.event_type == BINLOG.DELETE_ROWS_EVENT or \ + self.event_type == BINLOG.UPDATE_ROWS_EVENT: self.extra_data_length = struct.unpack(' 0: microsecond = self.packet.read_int_be_by_size(read) if column.fsp % 2: - time = time.replace(microsecond = int(microsecond / 10)) + time = time.replace(microsecond=int(microsecond / 10)) else: - time = time.replace(microsecond = microsecond) + time = time.replace(microsecond=microsecond) return time def __read_string(self, size, column): @@ -171,9 +177,9 @@ def __read_bit(self, column): def __read_time(self): time = self.packet.read_uint24() date = datetime.time( - hour = int(time / 10000), - minute = int((time % 10000) / 100), - second = int(time % 100)) + hour=int(time / 10000), + minute=int((time % 10000) / 100), + second=int(time % 100)) return date def __read_time2(self, column): @@ -187,9 +193,9 @@ def __read_time2(self, column): 24 bits = 3 bytes''' data = self.packet.read_int_be_by_size(3) t = datetime.time( - hour = self.__read_binary_slice(data, 2, 10, 24), - minute = self.__read_binary_slice(data, 12, 6, 24), - second = self.__read_binary_slice(data, 18, 6, 24)) + hour=self.__read_binary_slice(data, 2, 10, 24), + minute=self.__read_binary_slice(data, 12, 6, 24), + second=self.__read_binary_slice(data, 18, 6, 24)) return self.__add_fsp_to_time(t, column) def __read_date(self): @@ -198,9 +204,9 @@ def __read_date(self): return None date = datetime.date( - year = (time & ((1 << 15) - 1) << 9) >> 9, - month = (time & ((1 << 4) - 1) << 5) >> 5, - day = (time & ((1 << 5) - 1)) + year=(time & ((1 << 15) - 1) << 9) >> 9, + month=(time & ((1 << 4) - 1) << 5) >> 5, + day=(time & ((1 << 5) - 1)) ) return date @@ -219,12 +225,12 @@ def __read_datetime(self): return None date = datetime.datetime( - year = year, - month = month, - day = day, - hour = int(time / 10000), - minute = int((time % 10000) / 100), - second = int(time % 100)) + year=year, + month=month, + day=day, + hour=int(time / 10000), + minute=int((time % 10000) / 100), + second=int(time % 100)) return date def __read_datetime2(self, column): @@ -241,12 +247,12 @@ def __read_datetime2(self, column): year_month = self.__read_binary_slice(data, 1, 17, 40) try: t = datetime.datetime( - year = int(year_month / 13), - month = year_month % 13, - day = self.__read_binary_slice(data, 18, 5, 40), - hour = self.__read_binary_slice(data, 23, 5, 40), - minute = self.__read_binary_slice(data, 28, 6, 40), - second = self.__read_binary_slice(data, 34, 6, 40)) + year=int(year_month / 13), + month=year_month % 13, + day=self.__read_binary_slice(data, 18, 5, 40), + hour=self.__read_binary_slice(data, 23, 5, 40), + minute=self.__read_binary_slice(data, 28, 6, 40), + second=self.__read_binary_slice(data, 34, 6, 40)) except ValueError: return None return self.__add_fsp_to_time(t, column) @@ -278,7 +284,6 @@ def __read_new_decimal(self, column): res = "-" self.packet.unread(struct.pack(' 0: value = self.packet.read_int_be_by_size(size) ^ mask @@ -316,7 +321,7 @@ def __read_binary_slice(self, binary, start, size, data_length): def _dump(self): super(RowsEvent, self)._dump() print("Table: %s.%s" % (self.schema, self.table)) - print("Affected columns: %d" % (self.number_of_columns)) + print("Affected columns: %d" % self.number_of_columns) print("Changed rows: %d" % (len(self.rows))) def _fetch_rows(self): @@ -332,7 +337,8 @@ def __getattr__(self, name): class DeleteRowsEvent(RowsEvent): - '''This evenement is trigger when a row in database is removed''' + """This event is trigger when a row in the database is removed""" + def __init__(self, from_packet, event_size, table_map, ctl_connection): super(DeleteRowsEvent, self).__init__(from_packet, event_size, table_map, ctl_connection) self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8) @@ -354,7 +360,8 @@ def _dump(self): class WriteRowsEvent(RowsEvent): - '''This evenement is trigger when a row in database is added''' + """This event is triggered when a row in database is added""" + def __init__(self, from_packet, event_size, table_map, ctl_connection): super(WriteRowsEvent, self).__init__(from_packet, event_size, table_map, ctl_connection) self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8) @@ -376,9 +383,10 @@ def _dump(self): class UpdateRowsEvent(RowsEvent): - '''This evenement is trigger when a row in database change''' + """This event is triggered when a row in the database is changed""" + def __init__(self, from_packet, event_size, table_map, ctl_connection): - super(UpdateRowsEvent,self).__init__(from_packet, event_size, table_map, ctl_connection) + super(UpdateRowsEvent, self).__init__(from_packet, event_size, table_map, ctl_connection) #Body self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8) self.columns_present_bitmap2 = self.packet.read((self.number_of_columns + 7) / 8) @@ -395,7 +403,7 @@ def _fetch_one_row(self): def _dump(self): super(UpdateRowsEvent, self)._dump() - print("Affected columns: %d" % (self.number_of_columns)) + print("Affected columns: %d" % self.number_of_columns) print("Values:") for row in self.rows: print("--") @@ -407,6 +415,7 @@ class TableMapEvent(BinLogEvent): '''This evenement describe the structure of a table. It's send before a change append on a table. A end user of the lib should have no usage of this''' + def __init__(self, from_packet, event_size, table_map, ctl_connection): super(TableMapEvent, self).__init__(from_packet, event_size, table_map, ctl_connection) @@ -414,12 +423,11 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection): self.table_id = self._read_table_id() self.flags = struct.unpack('