Skip to content

Introduce data objects for Table / Column #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ class BinLogStreamReader(object):
'''Connect to replication stream and read event'''

def __init__(self, connection_settings={}, resume_stream=False, blocking=False, only_events=None, server_id=255):
'''
"""
resume_stream: Start for latest event of binlog or from older available event
blocking: Read on stream is blocking
only_events: Array of allowed events
'''
"""
self.__connection_settings = connection_settings
self.__connection_settings['charset'] = 'utf8'

Expand All @@ -29,7 +29,7 @@ def __init__(self, connection_settings={}, resume_stream=False, blocking=False,
self.__log_pos = None
self.__log_file = None

#Store table meta informations
#Store table meta information
self.table_map = {}

def close(self):
Expand Down Expand Up @@ -97,7 +97,7 @@ def fetchone(self):
return None
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self._ctl_connection)
if binlog_event.event_type == TABLE_MAP_EVENT:
self.table_map[binlog_event.event.table_id] = binlog_event.event
self.table_map[binlog_event.event.table_id] = binlog_event.event.get_table()
if self.__filter_event(binlog_event.event):
continue
if binlog_event.event_type == ROTATE_EVENT:
Expand Down
81 changes: 53 additions & 28 deletions pymysqlreplication/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,85 @@
from .constants import FIELD_TYPE
from pymysql.util import byte2int, int2byte


class Column(object):
'''Definition of a column'''
"""Definition of a column"""

def __init__(self, *args, **kwargs):
self.data = {}
if len(args) == 3:
self.__parse_column_definition(*args)
else:
self.data = kwargs

def __init__(self, column_type, column_schema, packet):
self.type = column_type
self.name = column_schema["COLUMN_NAME"]
self.collation_name = column_schema["COLLATION_NAME"]
self.character_set_name = column_schema["CHARACTER_SET_NAME"]
self.comment = column_schema["COLUMN_COMMENT"]
self.unsigned = False
def __parse_column_definition(self, column_type, column_schema, packet):
self.data["type"] = column_type
self.data["name"] = column_schema["COLUMN_NAME"]
self.data["collation_name"] = column_schema["COLLATION_NAME"]
self.data["character_set_name"] = column_schema["CHARACTER_SET_NAME"]
self.data["comment"] = column_schema["COLUMN_COMMENT"]
self.data["unsigned"] = False
self.data["type_is_bool"] = False

if column_schema["COLUMN_TYPE"].find("unsigned") != -1:
self.unsigned = True
self.data["unsigned"] = True
if self.type == FIELD_TYPE.VAR_STRING or self.type == FIELD_TYPE.STRING:
self.__read_string_metadata(packet, column_schema)
elif self.type == FIELD_TYPE.VARCHAR:
self.max_length = struct.unpack('<H', packet.read(2))[0]
self.data["max_length"] = struct.unpack('<H', packet.read(2))[0]
elif self.type == FIELD_TYPE.BLOB:
self.length_size = packet.read_uint8()
self.data["length_size"] = packet.read_uint8()
elif self.type == FIELD_TYPE.GEOMETRY:
self.length_size = packet.read_uint8()
self.data["length_size"] = packet.read_uint8()
elif self.type == FIELD_TYPE.NEWDECIMAL:
self.precision = packet.read_uint8()
self.decimals = packet.read_uint8()
self.data["precision"] = packet.read_uint8()
self.data["decimals"] = packet.read_uint8()
elif self.type == FIELD_TYPE.DOUBLE:
self.size = packet.read_uint8()
self.data["size"] = packet.read_uint8()
elif self.type == FIELD_TYPE.FLOAT:
self.size = packet.read_uint8()
self.data["size"] = packet.read_uint8()
elif self.type == FIELD_TYPE.BIT:
bits = packet.read_uint8()
bytes = packet.read_uint8()
self.bits = (bytes * 8) + bits
self.bytes = int((self.bits + 7) / 8)
self.data["bits"] = (bytes * 8) + bits
self.data["bytes"] = int((self.bits + 7) / 8)
elif self.type == FIELD_TYPE.TIMESTAMP2:
self.fsp = packet.read_uint8()
self.data["fsp"] = packet.read_uint8()
elif self.type == FIELD_TYPE.DATETIME2:
self.fsp = packet.read_uint8()
self.data["fsp"] = packet.read_uint8()
elif self.type == FIELD_TYPE.TIME2:
self.fsp = packet.read_uint8()

self.data["fsp"] = packet.read_uint8()
elif self.type == FIELD_TYPE.TINY and column_schema["COLUMN_TYPE"] == "tinyint(1)":
self.data["type_is_bool"] = True

def __read_string_metadata(self, packet, column_schema):
metadata = (packet.read_uint8() << 8) + packet.read_uint8()
metadata = (packet.read_uint8() << 8) + packet.read_uint8()
real_type = metadata >> 8
if real_type == FIELD_TYPE.SET or real_type == FIELD_TYPE.ENUM:
self.type = real_type
self.size = metadata & 0x00ff
self.data["type"] = real_type
self.data["size"] = metadata & 0x00ff
self.__read_enum_metadata(column_schema)
else:
self.max_length = (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff)
self.data["max_length"] = (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff)

def __read_enum_metadata(self, column_schema):
enums = column_schema["COLUMN_TYPE"]
if self.type == FIELD_TYPE.ENUM:
self.enum_values = enums.replace('enum(', '').replace(')', '').replace('\'', '').split(',')
self.data["enum_values"] = enums.replace('enum(', '').replace(')', '').replace('\'', '').split(',')
else:
self.data["set_values"] = enums.replace('set(', '').replace(')', '').replace('\'', '').split(',')

def __eq__(self, other):
return self.data == other.data

def __ne__(self, other):
return not self.__eq__(other)

def serializable_data(self):
return self.data

def __getattr__(self, item):
if item in self.data:
return self.data[item]
else:
self.set_values = enums.replace('set(', '').replace(')', '').replace('\'', '').split(',')
raise AttributeError("{0} not found".format(item))
8 changes: 4 additions & 4 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ def read_length_coded_binary(self):
"""
c = byte2int(self.read(1))
if c == NULL_COLUMN:
return None
return None
if c < UNSIGNED_CHAR_COLUMN:
return c
return c
elif c == UNSIGNED_SHORT_COLUMN:
return self.unpack_uint16(self.read(UNSIGNED_SHORT_LENGTH))
elif c == UNSIGNED_INT24_COLUMN:
return self.unpack_int24(self.read(UNSIGNED_INT24_LENGTH))
return self.unpack_int24(self.read(UNSIGNED_INT24_LENGTH))
elif c == UNSIGNED_INT64_COLUMN:
return self.unpack_int64(self.read(UNSIGNED_INT64_LENGTH))
return self.unpack_int64(self.read(UNSIGNED_INT64_LENGTH))

def read_length_coded_string(self):
"""Read a 'Length Coded String' from the data buffer.
Expand Down
Loading