Skip to content

Commit df857cf

Browse files
Split file
1 parent 89d645e commit df857cf

File tree

2 files changed

+180
-176
lines changed

2 files changed

+180
-176
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 1 addition & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from pymysql.util import byte2int, int2byte
33
from pymysql.constants.COMMAND import *
44
from constants.BINLOG import *
5-
from pymysql.constants.FIELD_TYPE import *
5+
from event import *
66

77
class BinLogStreamReader(object):
88
'''Connect to replication stream and read event'''
@@ -101,178 +101,3 @@ def __getattr__(self, key):
101101
raise AttributeError(str(self.__class__)
102102
+ " instance has no attribute '" + key + "'")
103103

104-
105-
class BinLogEvent(object):
106-
def __init__(self, from_packet, event_size, table_map):
107-
self.packet = from_packet
108-
self.table_map = table_map
109-
110-
def _read_table_id(self):
111-
# Table ID is 6 byte
112-
table_id = self.packet.read(6) + int2byte(0) + int2byte(0) # pad little-endian number
113-
return struct.unpack('<Q', table_id)[0]
114-
115-
def _read_column_data(self):
116-
'''Use for WRITE, UPDATE and DELETE events. Return an array of column data'''
117-
values = []
118-
119-
for column in self.table_map[self.table_id].column_type_def:
120-
if column == LONG:
121-
values.append(struct.unpack("<I", self.packet.read(4))[0])
122-
elif column == VARCHAR:
123-
values.append(self.packet.read_length_coded_string())
124-
return values
125-
126-
127-
class DeleteRowsEvent(BinLogEvent):
128-
def __init__(self, from_packet, event_size, table_map):
129-
super(DeleteRowsEvent, self).__init__(from_packet, event_size, table_map)
130-
#Header
131-
self.table_id = self._read_table_id()
132-
self.flags = struct.unpack('<H', self.packet.read(2))[0]
133-
134-
#Body
135-
self.number_of_columns = self.packet.read_length_coded_binary()
136-
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
137-
138-
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
139-
self.packet.advance((self.number_of_columns + 7) / 8)
140-
141-
self.values = self._read_column_data()
142-
143-
def dump(self):
144-
table = self.table_map[self.table_id]
145-
print "== Delete Rows Event =="
146-
print "Table: %s.%s" % (table.schema, table.table)
147-
print "Affected columns: %d" % (self.number_of_columns)
148-
print "Values:"
149-
for i in range(len(self.values)):
150-
print "* ", self.values[i]
151-
print
152-
153-
154-
class WriteRowsEvent(BinLogEvent):
155-
def __init__(self, from_packet, event_size, table_map):
156-
super(WriteRowsEvent, self).__init__(from_packet, event_size, table_map)
157-
#Header
158-
self.table_id = self._read_table_id()
159-
self.flags = struct.unpack('<H', self.packet.read(2))[0]
160-
161-
#Body
162-
self.number_of_columns = self.packet.read_length_coded_binary()
163-
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
164-
165-
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
166-
self.packet.advance((self.number_of_columns + 7) / 8)
167-
168-
self.values = self._read_column_data()
169-
170-
171-
def dump(self):
172-
table = self.table_map[self.table_id]
173-
print "== Write Rows Event =="
174-
print "Table: %s.%s" % (table.schema, table.table)
175-
print "Affected columns: %d" % (self.number_of_columns)
176-
print "Values:"
177-
for i in range(len(self.values)):
178-
print "* ", self.values[i]
179-
print
180-
181-
182-
class UpdateRowsEvent(BinLogEvent):
183-
def __init__(self, from_packet, event_size, table_map):
184-
super(UpdateRowsEvent,self).__init__(from_packet, event_size, table_map)
185-
#Header
186-
self.table_id = self._read_table_id()
187-
self.flags = struct.unpack('<H', self.packet.read(2))[0]
188-
189-
#Body
190-
self.number_of_columns = self.packet.read_length_coded_binary()
191-
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
192-
self.columns_present_bitmap2 = self.packet.read((self.number_of_columns + 7) / 8)
193-
194-
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
195-
self.packet.advance((self.number_of_columns + 7) / 8)
196-
197-
self.before_values = self._read_column_data()
198-
199-
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
200-
self.packet.advance((self.number_of_columns + 7) / 8)
201-
202-
self.after_values = self._read_column_data()
203-
204-
def dump(self):
205-
table = self.table_map[self.table_id]
206-
print "== Update Rows Event =="
207-
print "Table: %s.%s" % (table.schema, table.table)
208-
print "Affected columns: %d" % (self.number_of_columns)
209-
print "Values:"
210-
for i in range(len(self.before_values)):
211-
print "* ", self.before_values[i] , " => ", self.after_values[i]
212-
print
213-
214-
215-
class BinLogTableMapEvent(BinLogEvent):
216-
def __init__(self, from_packet, event_size, table_map):
217-
super(BinLogTableMapEvent, self).__init__(from_packet, event_size, table_map)
218-
219-
# Post-Header
220-
self.table_id = self._read_table_id()
221-
self.flags = struct.unpack('<H', self.packet.read(2))[0]
222-
223-
224-
# Payload
225-
self.schema_length = byte2int(self.packet.read(1))
226-
self.schema = self.packet.read(self.schema_length)
227-
self.packet.advance(1)
228-
self.table_length = byte2int(self.packet.read(1))
229-
self.table = self.packet.read(self.table_length)
230-
self.packet.advance(1)
231-
self.column_count = self.packet.read_length_coded_binary()
232-
233-
self.column_type_def = []
234-
for column in list(self.packet.read(self.column_count)):
235-
self.column_type_def.append(byte2int(column))
236-
237-
238-
# TODO: get this informations instead of trashing data
239-
# lenenc-str column-def
240-
# n NULL-bitmask, length: (column-length * 8) / 7
241-
242-
def dump(self):
243-
print "== Table Map Event =="
244-
print "Table id: %d" % (self.table_id)
245-
print "Schema: %s" % (self.schema)
246-
print "Table: %s" % (self.table)
247-
print "Columns: %s" % (self.column_count)
248-
249-
print
250-
#import sys
251-
#sys.exit(0)
252-
253-
class BinLogQueryEvent(BinLogEvent):
254-
def __init__(self, from_packet, event_size, table_map):
255-
super(BinLogQueryEvent, self).__init__(from_packet, event_size, table_map)
256-
257-
# Post-header
258-
self.slave_proxy_id = struct.unpack('<I', self.packet.read(4))[0]
259-
self.execution_time = struct.unpack('<I', self.packet.read(4))[0]
260-
self.schema_length = byte2int(self.packet.read(1))
261-
self.error_code = struct.unpack('<H', self.packet.read(2))[0]
262-
self.status_vars_length = struct.unpack('<H', self.packet.read(2))[0]
263-
264-
# Payload
265-
self.status_vars = self.packet.read(self.status_vars_length)
266-
self.schema = self.packet.read(self.schema_length)
267-
self.packet.advance(1)
268-
269-
self.query = self.packet.read(event_size - 13 - self.status_vars_length - self.schema_length - 1)
270-
#string[EOF] query
271-
272-
def dump(self):
273-
print "== Query Event =="
274-
print "Schema: %s" % (self.schema)
275-
print "Execution time: %d" % (self.execution_time)
276-
print "Query: %s" % (self.query)
277-
278-
print

pymysqlreplication/event.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import struct
2+
from pymysql.util import byte2int, int2byte
3+
from pymysql.constants.FIELD_TYPE import *
4+
5+
class BinLogEvent(object):
6+
def __init__(self, from_packet, event_size, table_map):
7+
self.packet = from_packet
8+
self.table_map = table_map
9+
10+
def _read_table_id(self):
11+
# Table ID is 6 byte
12+
table_id = self.packet.read(6) + int2byte(0) + int2byte(0) # pad little-endian number
13+
return struct.unpack('<Q', table_id)[0]
14+
15+
def _read_column_data(self):
16+
'''Use for WRITE, UPDATE and DELETE events. Return an array of column data'''
17+
values = []
18+
19+
for column in self.table_map[self.table_id].column_type_def:
20+
if column == LONG:
21+
values.append(struct.unpack("<I", self.packet.read(4))[0])
22+
elif column == VARCHAR:
23+
values.append(self.packet.read_length_coded_string())
24+
return values
25+
26+
27+
class DeleteRowsEvent(BinLogEvent):
28+
def __init__(self, from_packet, event_size, table_map):
29+
super(DeleteRowsEvent, self).__init__(from_packet, event_size, table_map)
30+
#Header
31+
self.table_id = self._read_table_id()
32+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
33+
34+
#Body
35+
self.number_of_columns = self.packet.read_length_coded_binary()
36+
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
37+
38+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
39+
self.packet.advance((self.number_of_columns + 7) / 8)
40+
41+
self.values = self._read_column_data()
42+
43+
def dump(self):
44+
table = self.table_map[self.table_id]
45+
print "== Delete Rows Event =="
46+
print "Table: %s.%s" % (table.schema, table.table)
47+
print "Affected columns: %d" % (self.number_of_columns)
48+
print "Values:"
49+
for i in range(len(self.values)):
50+
print "* ", self.values[i]
51+
print
52+
53+
54+
class WriteRowsEvent(BinLogEvent):
55+
def __init__(self, from_packet, event_size, table_map):
56+
super(WriteRowsEvent, self).__init__(from_packet, event_size, table_map)
57+
#Header
58+
self.table_id = self._read_table_id()
59+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
60+
61+
#Body
62+
self.number_of_columns = self.packet.read_length_coded_binary()
63+
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
64+
65+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
66+
self.packet.advance((self.number_of_columns + 7) / 8)
67+
68+
self.values = self._read_column_data()
69+
70+
71+
def dump(self):
72+
table = self.table_map[self.table_id]
73+
print "== Write Rows Event =="
74+
print "Table: %s.%s" % (table.schema, table.table)
75+
print "Affected columns: %d" % (self.number_of_columns)
76+
print "Values:"
77+
for i in range(len(self.values)):
78+
print "* ", self.values[i]
79+
print
80+
81+
82+
class UpdateRowsEvent(BinLogEvent):
83+
def __init__(self, from_packet, event_size, table_map):
84+
super(UpdateRowsEvent,self).__init__(from_packet, event_size, table_map)
85+
#Header
86+
self.table_id = self._read_table_id()
87+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
88+
89+
#Body
90+
self.number_of_columns = self.packet.read_length_coded_binary()
91+
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
92+
self.columns_present_bitmap2 = self.packet.read((self.number_of_columns + 7) / 8)
93+
94+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
95+
self.packet.advance((self.number_of_columns + 7) / 8)
96+
97+
self.before_values = self._read_column_data()
98+
99+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
100+
self.packet.advance((self.number_of_columns + 7) / 8)
101+
102+
self.after_values = self._read_column_data()
103+
104+
def dump(self):
105+
table = self.table_map[self.table_id]
106+
print "== Update Rows Event =="
107+
print "Table: %s.%s" % (table.schema, table.table)
108+
print "Affected columns: %d" % (self.number_of_columns)
109+
print "Values:"
110+
for i in range(len(self.before_values)):
111+
print "* ", self.before_values[i] , " => ", self.after_values[i]
112+
print
113+
114+
115+
class BinLogTableMapEvent(BinLogEvent):
116+
def __init__(self, from_packet, event_size, table_map):
117+
super(BinLogTableMapEvent, self).__init__(from_packet, event_size, table_map)
118+
119+
# Post-Header
120+
self.table_id = self._read_table_id()
121+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
122+
123+
124+
# Payload
125+
self.schema_length = byte2int(self.packet.read(1))
126+
self.schema = self.packet.read(self.schema_length)
127+
self.packet.advance(1)
128+
self.table_length = byte2int(self.packet.read(1))
129+
self.table = self.packet.read(self.table_length)
130+
self.packet.advance(1)
131+
self.column_count = self.packet.read_length_coded_binary()
132+
133+
self.column_type_def = []
134+
for column in list(self.packet.read(self.column_count)):
135+
self.column_type_def.append(byte2int(column))
136+
137+
138+
# TODO: get this informations instead of trashing data
139+
# lenenc-str column-def
140+
# n NULL-bitmask, length: (column-length * 8) / 7
141+
142+
def dump(self):
143+
print "== Table Map Event =="
144+
print "Table id: %d" % (self.table_id)
145+
print "Schema: %s" % (self.schema)
146+
print "Table: %s" % (self.table)
147+
print "Columns: %s" % (self.column_count)
148+
149+
print
150+
#import sys
151+
#sys.exit(0)
152+
153+
class BinLogQueryEvent(BinLogEvent):
154+
def __init__(self, from_packet, event_size, table_map):
155+
super(BinLogQueryEvent, self).__init__(from_packet, event_size, table_map)
156+
157+
# Post-header
158+
self.slave_proxy_id = struct.unpack('<I', self.packet.read(4))[0]
159+
self.execution_time = struct.unpack('<I', self.packet.read(4))[0]
160+
self.schema_length = byte2int(self.packet.read(1))
161+
self.error_code = struct.unpack('<H', self.packet.read(2))[0]
162+
self.status_vars_length = struct.unpack('<H', self.packet.read(2))[0]
163+
164+
# Payload
165+
self.status_vars = self.packet.read(self.status_vars_length)
166+
self.schema = self.packet.read(self.schema_length)
167+
self.packet.advance(1)
168+
169+
self.query = self.packet.read(event_size - 13 - self.status_vars_length - self.schema_length - 1)
170+
#string[EOF] query
171+
172+
def dump(self):
173+
print "== Query Event =="
174+
print "Schema: %s" % (self.schema)
175+
print "Execution time: %d" % (self.execution_time)
176+
print "Query: %s" % (self.query)
177+
178+
print
179+

0 commit comments

Comments
 (0)