Skip to content

Commit 06fd091

Browse files
Merge pull request #93 from noplay/speedup2
Speed up event processing
2 parents 4766123 + 818f0e0 commit 06fd091

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

pymysqlreplication/packet.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,29 +49,30 @@ class BinLogPacketWrapper(object):
4949
}
5050

5151
def __init__(self, from_packet, table_map, ctl_connection, use_checksum, allowed_events = None):
52-
if not from_packet.is_ok_packet():
53-
raise ValueError(
54-
"Cannot create %s object from invalid packet type" %
55-
self.__class__.__name__)
56-
5752
# -1 because we ignore the ok byte
5853
self.read_bytes = 0
5954
# Used when we want to override a value in the data buffer
6055
self.__data_buffer = b''
6156

62-
# Ok Value
6357
self.packet = from_packet
64-
self.packet.advance(1)
6558
self.charset = ctl_connection.charset
6659

60+
# OK value
61+
# timestamp
62+
# event_type
63+
# server_id
64+
# log_pos
65+
# flags
66+
unpack = struct.unpack('<cIcIIIH', self.packet.read(20))
67+
6768
# Header
68-
self.timestamp = struct.unpack('<I', self.packet.read(4))[0]
69-
self.event_type = byte2int(self.packet.read(1))
70-
self.server_id = struct.unpack('<I', self.packet.read(4))[0]
71-
self.event_size = struct.unpack('<I', self.packet.read(4))[0]
69+
self.timestamp = unpack[1]
70+
self.event_type = byte2int(unpack[2])
71+
self.server_id = unpack[3]
72+
self.event_size = unpack[4]
7273
# position of the next event
73-
self.log_pos = struct.unpack('<I', self.packet.read(4))[0]
74-
self.flags = struct.unpack('<H', self.packet.read(2))[0]
74+
self.log_pos = unpack[5]
75+
self.flags = unpack[6]
7576

7677
# MySQL 5.6 and more if binlog-checksum = CRC32
7778
if use_checksum:

pymysqlreplication/row_event.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,16 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection):
2222
#Header
2323
self.table_id = self._read_table_id()
2424
self.primary_key = table_map[self.table_id].data["primary_key"]
25-
self.flags = struct.unpack('<H', self.packet.read(2))[0]
2625

2726
#Event V2
2827
if self.event_type == BINLOG.WRITE_ROWS_EVENT_V2 or \
2928
self.event_type == BINLOG.DELETE_ROWS_EVENT_V2 or \
3029
self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
31-
self.extra_data_length = struct.unpack('<H',
32-
self.packet.read(2))[0]
30+
self.flags, self.extra_data_length = struct.unpack('<HH', self.packet.read(4))
3331
self.extra_data = self.packet.read(self.extra_data_length / 8)
32+
else:
33+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
34+
3435
#Body
3536
self.number_of_columns = self.packet.read_length_coded_binary()
3637
self.columns = self.table_map[self.table_id].columns

0 commit comments

Comments
 (0)