Skip to content

Commit 4766123

Browse files
Merge pull request #90 from noplay/speedup
Speedup event filtering
2 parents 93be38b + 9d41eb7 commit 4766123

File tree

4 files changed

+53
-29
lines changed

4 files changed

+53
-29
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99

1010
from .packet import BinLogPacketWrapper
1111
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
12-
from .event import NotImplementedEvent
1312
from .gtid import GtidSet
13+
from .event import QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, NotImplementedEvent
14+
from .row_event import UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent
1415

1516
try:
1617
from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID
@@ -48,9 +49,12 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
4849
self.__connected_ctl = False
4950
self.__resume_stream = resume_stream
5051
self.__blocking = blocking
51-
self.__only_events = only_events
52-
self.__ignored_events = ignored_events
53-
self.__filter_non_implemented_events = filter_non_implemented_events
52+
self.__allowed_events = self._allowed_event_list(only_events, ignored_events, filter_non_implemented_events)
53+
54+
# We can't filter on packet level TABLE_MAP and rotate event because we need
55+
# them for handling other operations
56+
self.__allowed_events_in_packet = frozenset([TableMapEvent, RotateEvent]).union(self.__allowed_events)
57+
5458
self.__server_id = server_id
5559
self.__use_checksum = False
5660

@@ -156,7 +160,7 @@ def __connect_to_stream(self):
156160
# A gtid set looks like:
157161
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
158162
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
159-
#
163+
#
160164
# In this particular gtid set, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
161165
# is the first member of the set, it is called a gtid.
162166
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
@@ -228,7 +232,8 @@ def fetchone(self):
228232

229233
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
230234
self._ctl_connection,
231-
self.__use_checksum)
235+
self.__use_checksum,
236+
self.__allowed_events_in_packet)
232237
if binlog_event.event_type == TABLE_MAP_EVENT:
233238
self.table_map[binlog_event.event.table_id] = \
234239
binlog_event.event.get_table()
@@ -242,35 +247,44 @@ def fetchone(self):
242247
# wrong table schema.
243248
# The fix is to rely on the fact that MySQL will also rotate to a new binlog file every time it
244249
# restarts. That means every rotation we see *could* be a sign of restart and so potentially
245-
# invalidates all our cached table id to schema mappings. This means we have to load them all
250+
# invalidates all our cached table id to schema mappings. This means we have to load them all
246251
# again for each logfile which is potentially wasted effort but we can't really do much better
247252
# without being broken in restart case
248253
self.table_map = {}
249254
elif binlog_event.log_pos:
250255
self.log_pos = binlog_event.log_pos
251256

252-
if self.__filter_event(binlog_event.event):
257+
# event is none if we have filter it on packet level
258+
# we filter also not allowed events
259+
if binlog_event.event is None or (binlog_event.event.__class__ not in self.__allowed_events):
253260
continue
254261

255262
return binlog_event.event
256263

257-
def __filter_event(self, event):
258-
if self.__filter_non_implemented_events and isinstance(event, NotImplementedEvent):
259-
return True
260-
261-
if self.__ignored_events is not None:
262-
for ignored_event in self.__ignored_events:
263-
if isinstance(event, ignored_event):
264-
return True
265-
266-
if self.__only_events is not None:
267-
for allowed_event in self.__only_events:
268-
if isinstance(event, allowed_event):
269-
return False
270-
else:
271-
return True
272-
273-
return False
264+
def _allowed_event_list(self, only_events, ignored_events, filter_non_implemented_events):
265+
if only_events is not None:
266+
events = set(only_events)
267+
else:
268+
events = set((
269+
QueryEvent,
270+
RotateEvent,
271+
FormatDescriptionEvent,
272+
XidEvent,
273+
GtidEvent,
274+
UpdateRowsEvent,
275+
WriteRowsEvent,
276+
DeleteRowsEvent,
277+
TableMapEvent,
278+
NotImplementedEvent))
279+
if ignored_events is not None:
280+
for e in ignored_events:
281+
events.remove(e)
282+
if filter_non_implemented_events:
283+
try:
284+
events.remove(NotImplementedEvent)
285+
except KeyError:
286+
pass
287+
return frozenset(events)
274288

275289
def __get_table_information(self, schema, table):
276290
for i in range(1, 3):

pymysqlreplication/packet.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BinLogPacketWrapper(object):
4848

4949
}
5050

51-
def __init__(self, from_packet, table_map, ctl_connection, use_checksum):
51+
def __init__(self, from_packet, table_map, ctl_connection, use_checksum, allowed_events = None):
5252
if not from_packet.is_ok_packet():
5353
raise ValueError(
5454
"Cannot create %s object from invalid packet type" %
@@ -79,8 +79,11 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum):
7979
else:
8080
event_size_without_header = self.event_size - 19
8181

82-
event_class = self.__event_map.get(self.event_type,
83-
event.NotImplementedEvent)
82+
self.event = None
83+
event_class = self.__event_map.get(self.event_type, event.NotImplementedEvent)
84+
85+
if event_class not in allowed_events:
86+
return
8487
self.event = event_class(self, event_size_without_header, table_map,
8588
ctl_connection)
8689

pymysqlreplication/tests/benchmark.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,6 @@ def consume_events():
5858
while True:
5959
execute(conn, "UPDATE test SET i = i + 1;")
6060
else:
61-
cProfile.run('consume_events()')
61+
consume_events()
62+
#cProfile.run('consume_events()')
6263

pymysqlreplication/tests/test_basic.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
1313
def ignoredEvents(self):
1414
return [GtidEvent]
1515

16+
def test_allowed_event_list(self):
17+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 10)
18+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 9)
19+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 9)
20+
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
21+
1622
def test_read_query_event(self):
1723
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
1824
self.execute(query)

0 commit comments

Comments
 (0)