Skip to content

Commit 89d645e

Browse files
Working version with example
1 parent 05547f7 commit 89d645e

File tree

4 files changed

+320
-0
lines changed

4 files changed

+320
-0
lines changed

example.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env python
2+
3+
import pymysql
4+
from pymysqlreplication import BinLogStreamReader
5+
from pymysqlreplication.constants.BINLOG import *
6+
7+
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql')
8+
9+
10+
stream = BinLogStreamReader(conn, blocking = False, resume_stream = False)
11+
12+
for binlogevent in stream:
13+
print binlogevent.timestamp
14+
print binlogevent.event_type
15+
if binlogevent.event is not None:
16+
print binlogevent.event.dump()
17+
18+
conn.close()
19+
20+

pymysqlreplication/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
'''
2+
Python MySQL Replication:
3+
Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL.
4+
5+
Licence
6+
=======
7+
Copyright 2012 Julien Duponchelle
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
'''
21+
22+
from binlogstream import BinLogStreamReader

pymysqlreplication/binlogstream.py

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
import struct
2+
from pymysql.util import byte2int, int2byte
3+
from pymysql.constants.COMMAND import *
4+
from constants.BINLOG import *
5+
from pymysql.constants.FIELD_TYPE import *
6+
7+
class BinLogStreamReader(object):
8+
'''Connect to replication stream and read event'''
9+
10+
def __init__(self, connection, resume_stream = False, blocking = False):
11+
'''
12+
resume_stream: Start for latest event of binlog or from older available event
13+
blocking: Read on stream is blocking
14+
'''
15+
self.__connection = connection
16+
17+
cur = self.__connection.cursor()
18+
cur.execute("SHOW MASTER STATUS")
19+
(log_file, log_pos) = cur.fetchone()[:2]
20+
cur.close()
21+
22+
23+
# binlog_pos (4) -- position in the binlog-file to start the stream with
24+
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
25+
# server_id (4) -- server id of this slave
26+
# binlog-filename (string.EOF) -- filename of the binlog on the master
27+
command = COM_BINLOG_DUMP
28+
prelude = struct.pack('<i', len(log_file) + 11) \
29+
+ int2byte(command)
30+
if resume_stream:
31+
prelude += struct.pack('<I', log_pos)
32+
else:
33+
prelude += struct.pack('<I', 4)
34+
if blocking:
35+
prelude += struct.pack('<h', 0)
36+
else:
37+
prelude += struct.pack('<h', 1)
38+
prelude += struct.pack('<I', 3)
39+
self.__connection.wfile.write(prelude + log_file)
40+
self.__connection.wfile.flush()
41+
42+
#Store table meta informations
43+
self.table_map = {}
44+
45+
def fetchone(self):
46+
pkt = self.__connection.read_packet()
47+
if not pkt.is_ok_packet():
48+
return None
49+
binlog_event = BinLogPacketWrapper(pkt, self.table_map)
50+
if binlog_event.event_type == TABLE_MAP_EVENT:
51+
self.table_map[binlog_event.event.table_id] = binlog_event.event
52+
return binlog_event
53+
54+
def __iter__(self):
55+
return iter(self.fetchone, None)
56+
57+
class BinLogPacketWrapper(object):
58+
"""
59+
Bin Log Packet Wrapper. It uses an existing packet object, and wraps
60+
around it, exposing useful variables while still providing access
61+
to the original packet objects variables and methods.
62+
"""
63+
64+
def __init__(self, from_packet, table_map):
65+
if not from_packet.is_ok_packet():
66+
raise ValueError('Cannot create ' + str(self.__class__.__name__)
67+
+ ' object from invalid packet type')
68+
69+
# Ok Value
70+
self.packet = from_packet
71+
self.packet.advance(1)
72+
73+
# Header
74+
self.timestamp = struct.unpack('<I', self.packet.read(4))[0]
75+
self.event_type = byte2int(self.packet.read(1))
76+
self.server_id = struct.unpack('<I', self.packet.read(4))[0]
77+
self.event_size = struct.unpack('<I', self.packet.read(4))[0]
78+
# position of the next event
79+
self.log_pos = struct.unpack('<I', self.packet.read(4))[0]
80+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
81+
82+
83+
event_size_without_header = self.event_size - 19
84+
if self.event_type == QUERY_EVENT:
85+
self.event = BinLogQueryEvent(self.packet, event_size_without_header, table_map)
86+
elif self.event_type == UPDATE_ROWS_EVENT:
87+
self.event = UpdateRowsEvent(self.packet, event_size_without_header, table_map)
88+
elif self.event_type == WRITE_ROWS_EVENT:
89+
self.event = WriteRowsEvent(self.packet, event_size_without_header, table_map)
90+
elif self.event_type == DELETE_ROWS_EVENT:
91+
self.event = DeleteRowsEvent(self.packet, event_size_without_header, table_map)
92+
elif self.event_type == TABLE_MAP_EVENT:
93+
self.event = BinLogTableMapEvent(self.packet, event_size_without_header, table_map)
94+
else:
95+
self.event = None
96+
97+
def __getattr__(self, key):
98+
if hasattr(self.packet, key):
99+
return getattr(self.packet, key)
100+
101+
raise AttributeError(str(self.__class__)
102+
+ " instance has no attribute '" + key + "'")
103+
104+
105+
class BinLogEvent(object):
106+
def __init__(self, from_packet, event_size, table_map):
107+
self.packet = from_packet
108+
self.table_map = table_map
109+
110+
def _read_table_id(self):
111+
# Table ID is 6 byte
112+
table_id = self.packet.read(6) + int2byte(0) + int2byte(0) # pad little-endian number
113+
return struct.unpack('<Q', table_id)[0]
114+
115+
def _read_column_data(self):
116+
'''Use for WRITE, UPDATE and DELETE events. Return an array of column data'''
117+
values = []
118+
119+
for column in self.table_map[self.table_id].column_type_def:
120+
if column == LONG:
121+
values.append(struct.unpack("<I", self.packet.read(4))[0])
122+
elif column == VARCHAR:
123+
values.append(self.packet.read_length_coded_string())
124+
return values
125+
126+
127+
class DeleteRowsEvent(BinLogEvent):
128+
def __init__(self, from_packet, event_size, table_map):
129+
super(DeleteRowsEvent, self).__init__(from_packet, event_size, table_map)
130+
#Header
131+
self.table_id = self._read_table_id()
132+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
133+
134+
#Body
135+
self.number_of_columns = self.packet.read_length_coded_binary()
136+
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
137+
138+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
139+
self.packet.advance((self.number_of_columns + 7) / 8)
140+
141+
self.values = self._read_column_data()
142+
143+
def dump(self):
144+
table = self.table_map[self.table_id]
145+
print "== Delete Rows Event =="
146+
print "Table: %s.%s" % (table.schema, table.table)
147+
print "Affected columns: %d" % (self.number_of_columns)
148+
print "Values:"
149+
for i in range(len(self.values)):
150+
print "* ", self.values[i]
151+
print
152+
153+
154+
class WriteRowsEvent(BinLogEvent):
155+
def __init__(self, from_packet, event_size, table_map):
156+
super(WriteRowsEvent, self).__init__(from_packet, event_size, table_map)
157+
#Header
158+
self.table_id = self._read_table_id()
159+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
160+
161+
#Body
162+
self.number_of_columns = self.packet.read_length_coded_binary()
163+
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
164+
165+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
166+
self.packet.advance((self.number_of_columns + 7) / 8)
167+
168+
self.values = self._read_column_data()
169+
170+
171+
def dump(self):
172+
table = self.table_map[self.table_id]
173+
print "== Write Rows Event =="
174+
print "Table: %s.%s" % (table.schema, table.table)
175+
print "Affected columns: %d" % (self.number_of_columns)
176+
print "Values:"
177+
for i in range(len(self.values)):
178+
print "* ", self.values[i]
179+
print
180+
181+
182+
class UpdateRowsEvent(BinLogEvent):
183+
def __init__(self, from_packet, event_size, table_map):
184+
super(UpdateRowsEvent,self).__init__(from_packet, event_size, table_map)
185+
#Header
186+
self.table_id = self._read_table_id()
187+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
188+
189+
#Body
190+
self.number_of_columns = self.packet.read_length_coded_binary()
191+
self.columns_present_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
192+
self.columns_present_bitmap2 = self.packet.read((self.number_of_columns + 7) / 8)
193+
194+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
195+
self.packet.advance((self.number_of_columns + 7) / 8)
196+
197+
self.before_values = self._read_column_data()
198+
199+
#TODO: nul-bitmap, length (bits set in 'columns-present-bitmap'+7)/8
200+
self.packet.advance((self.number_of_columns + 7) / 8)
201+
202+
self.after_values = self._read_column_data()
203+
204+
def dump(self):
205+
table = self.table_map[self.table_id]
206+
print "== Update Rows Event =="
207+
print "Table: %s.%s" % (table.schema, table.table)
208+
print "Affected columns: %d" % (self.number_of_columns)
209+
print "Values:"
210+
for i in range(len(self.before_values)):
211+
print "* ", self.before_values[i] , " => ", self.after_values[i]
212+
print
213+
214+
215+
class BinLogTableMapEvent(BinLogEvent):
216+
def __init__(self, from_packet, event_size, table_map):
217+
super(BinLogTableMapEvent, self).__init__(from_packet, event_size, table_map)
218+
219+
# Post-Header
220+
self.table_id = self._read_table_id()
221+
self.flags = struct.unpack('<H', self.packet.read(2))[0]
222+
223+
224+
# Payload
225+
self.schema_length = byte2int(self.packet.read(1))
226+
self.schema = self.packet.read(self.schema_length)
227+
self.packet.advance(1)
228+
self.table_length = byte2int(self.packet.read(1))
229+
self.table = self.packet.read(self.table_length)
230+
self.packet.advance(1)
231+
self.column_count = self.packet.read_length_coded_binary()
232+
233+
self.column_type_def = []
234+
for column in list(self.packet.read(self.column_count)):
235+
self.column_type_def.append(byte2int(column))
236+
237+
238+
# TODO: get this informations instead of trashing data
239+
# lenenc-str column-def
240+
# n NULL-bitmask, length: (column-length * 8) / 7
241+
242+
def dump(self):
243+
print "== Table Map Event =="
244+
print "Table id: %d" % (self.table_id)
245+
print "Schema: %s" % (self.schema)
246+
print "Table: %s" % (self.table)
247+
print "Columns: %s" % (self.column_count)
248+
249+
print
250+
#import sys
251+
#sys.exit(0)
252+
253+
class BinLogQueryEvent(BinLogEvent):
254+
def __init__(self, from_packet, event_size, table_map):
255+
super(BinLogQueryEvent, self).__init__(from_packet, event_size, table_map)
256+
257+
# Post-header
258+
self.slave_proxy_id = struct.unpack('<I', self.packet.read(4))[0]
259+
self.execution_time = struct.unpack('<I', self.packet.read(4))[0]
260+
self.schema_length = byte2int(self.packet.read(1))
261+
self.error_code = struct.unpack('<H', self.packet.read(2))[0]
262+
self.status_vars_length = struct.unpack('<H', self.packet.read(2))[0]
263+
264+
# Payload
265+
self.status_vars = self.packet.read(self.status_vars_length)
266+
self.schema = self.packet.read(self.schema_length)
267+
self.packet.advance(1)
268+
269+
self.query = self.packet.read(event_size - 13 - self.status_vars_length - self.schema_length - 1)
270+
#string[EOF] query
271+
272+
def dump(self):
273+
print "== Query Event =="
274+
print "Schema: %s" % (self.schema)
275+
print "Execution time: %d" % (self.execution_time)
276+
print "Query: %s" % (self.query)
277+
278+
print

pymysqlreplication/constants/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)