forked from julien-duponchelle/python-mysql-replication
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbinlogfilereader.py
149 lines (127 loc) · 4.96 KB
/
binlogfilereader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
'''Read binlog files'''
import struct
from pymysqlreplication import constants
from pymysqlreplication.event import FormatDescriptionEvent
from pymysqlreplication.event import QueryEvent
from pymysqlreplication.event import RotateEvent
from pymysqlreplication.event import XidEvent
from pymysqlreplication.row_event import TableMapEvent
from pymysqlreplication.row_event import WriteRowsEvent
class SimpleBinLogFileReader(object):
'''Read binlog files'''
_expected_magic = b'\xfebin'
def __init__(self, file_path, only_events=None):
self._current_event = None
self._file = None
self._file_path = file_path
self._only_events = only_events
self._pos = None
def fetchone(self):
'''Fetch one record from the binlog file'''
if self._pos is None or self._pos < 4:
self._read_magic()
while True:
event = self._read_event()
self._current_event = event
if event is None:
return None
if self._filter_events(event):
return event
def truncatebinlog(self):
'''Truncate the binlog file at the current event'''
if self._current_event is not None:
self._file.truncate(self._current_event.pos)
def _filter_events(self, event):
'''Return True if an event can be returned'''
# It would be good if we could reuse the __event_map in
# packet.BinLogPacketWrapper.
event_type = {
constants.QUERY_EVENT: QueryEvent,
constants.ROTATE_EVENT: RotateEvent,
constants.FORMAT_DESCRIPTION_EVENT: FormatDescriptionEvent,
constants.XID_EVENT: XidEvent,
constants.TABLE_MAP_EVENT: TableMapEvent,
constants.WRITE_ROWS_EVENT_V2: WriteRowsEvent,
}.get(event.event_type)
return event_type in self._only_events
def _open_file(self):
'''Open the file at ``self._file_path``'''
if self._file is None:
self._file = open(self._file_path, 'rb+')
self._pos = self._file.tell()
assert self._pos == 0
def _read_event(self):
'''Read an event from the binlog file'''
# Assuming a binlog version > 1
headerlength = 19
header = self._file.read(headerlength)
event_pos = self._pos
self._pos += len(header)
if len(header) == 0:
return None
event = SimpleBinLogEvent(header)
event.set_pos(event_pos)
if event.event_size < headerlength:
messagefmt = 'Event size {0} is too small'
message = messagefmt.format(event.event_size)
raise EventSizeTooSmallError(message)
else:
body = self._file.read(event.event_size - headerlength)
self._pos += len(body)
event.set_body(body)
return event
def _read_magic(self):
'''Read the first four *magic* bytes of the binlog file'''
self._open_file()
if self._pos == 0:
magic = self._file.read(4)
if magic == self._expected_magic:
self._pos += len(magic)
else:
messagefmt = 'Magic bytes {0!r} did not match expected {1!r}'
message = messagefmt.format(magic, self._expected_magic)
raise BadMagicBytesError(message)
def __iter__(self):
return iter(self.fetchone, None)
def __repr__(self):
cls = self.__class__
mod = cls.__module__
name = cls.__name__
only = [type(x).__name__ for x in self._only_events]
fmt = '<{mod}.{name}(file_path={fpath}, only_events={only})>'
return fmt.format(mod=mod, name=name, fpath=self._file_path, only=only)
# pylint: disable=too-many-instance-attributes
class SimpleBinLogEvent(object):
'''An event from a binlog file'''
def __init__(self, header):
'''Initialize the Event with the event header'''
unpacked = struct.unpack('<IcIIIH', header)
self.timestamp = unpacked[0]
self.event_type = unpacked[1][0]
self.server_id = unpacked[2]
self.event_size = unpacked[3]
self.log_pos = unpacked[4]
self.flags = unpacked[5]
self.body = None
self.pos = None
def set_body(self, body):
'''Save the body bytes'''
self.body = body
def set_pos(self, pos):
'''Save the event position'''
self.pos = pos
def __repr__(self):
cls = self.__class__
mod = cls.__module__
name = cls.__name__
fmt = '<{mod}.{name}(timestamp={ts}, event_type={et}, log_pos={pos})>'
return fmt.format(
mod=mod,
name=name,
ts=int(self.timestamp),
et=self.event_type,
pos=self.log_pos)
class BadMagicBytesError(Exception):
'''The binlog file magic bytes did not match the specification'''
class EventSizeTooSmallError(Exception):
'''The event size was smaller than the length of the event header'''