Skip to content

Commit ad88669

Browse files
Filters events by table names
1 parent 06fd091 commit ad88669

File tree

6 files changed

+83
-39
lines changed

6 files changed

+83
-39
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ class BinLogStreamReader(object):
3030
def __init__(self, connection_settings, server_id, resume_stream=False,
3131
blocking=False, only_events=None, log_file=None, log_pos=None,
3232
filter_non_implemented_events=True,
33-
ignored_events=None, auto_position=None):
33+
ignored_events=None, auto_position=None,
34+
only_tables = None):
3435
"""
3536
Attributes:
3637
resume_stream: Start for event from position or the latest event of
@@ -41,6 +42,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
4142
log_file: Set replication start log file
4243
log_pos: Set replication start log pos
4344
auto_position: Use master_auto_position gtid to set position
45+
only_tables: An array with the tables you want to watch
4446
"""
4547
self.__connection_settings = connection_settings
4648
self.__connection_settings["charset"] = "utf8"
@@ -49,6 +51,8 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
4951
self.__connected_ctl = False
5052
self.__resume_stream = resume_stream
5153
self.__blocking = blocking
54+
55+
self.__only_tables = only_tables
5256
self.__allowed_events = self._allowed_event_list(only_events, ignored_events, filter_non_implemented_events)
5357

5458
# We can't filter on packet level TABLE_MAP and rotate event because we need
@@ -233,7 +237,8 @@ def fetchone(self):
233237
binlog_event = BinLogPacketWrapper(pkt, self.table_map,
234238
self._ctl_connection,
235239
self.__use_checksum,
236-
self.__allowed_events_in_packet)
240+
allowed_events = self.__allowed_events_in_packet,
241+
only_tables = self.__only_tables)
237242
if binlog_event.event_type == TABLE_MAP_EVENT:
238243
self.table_map[binlog_event.event.table_id] = \
239244
binlog_event.event.get_table()

pymysqlreplication/event.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@
77

88

99
class BinLogEvent(object):
10-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
10+
def __init__(self, from_packet, event_size, table_map, ctl_connection, only_tables = None):
1111
self.packet = from_packet
1212
self.table_map = table_map
1313
self.event_type = self.packet.event_type
1414
self.timestamp = self.packet.timestamp
1515
self.event_size = event_size
1616
self._ctl_connection = ctl_connection
17+
# The event have been fully processed, if processed is false
18+
# the event will be skipped
19+
self._processed = True
1720

1821
def _read_table_id(self):
1922
# Table ID is 6 byte
@@ -39,9 +42,9 @@ def _dump(self):
3942
class GtidEvent(BinLogEvent):
4043
"""GTID change in binlog event
4144
"""
42-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
45+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
4346
super(GtidEvent, self).__init__(from_packet, event_size, table_map,
44-
ctl_connection)
47+
ctl_connection, **kwargs)
4548

4649
self.commit_flag = byte2int(self.packet.read(1)) == 1
4750
self.sid = self.packet.read(16)
@@ -72,9 +75,9 @@ class RotateEvent(BinLogEvent):
7275
position: Position inside next binlog
7376
next_binlog: Name of next binlog file
7477
"""
75-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
78+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
7679
super(RotateEvent, self).__init__(from_packet, event_size, table_map,
77-
ctl_connection)
80+
ctl_connection, **kwargs)
7881
self.position = struct.unpack('<Q', self.packet.read(8))[0]
7982
self.next_binlog = self.packet.read(event_size - 8).decode()
8083

@@ -96,9 +99,9 @@ class XidEvent(BinLogEvent):
9699
xid: Transaction ID for 2PC
97100
"""
98101

99-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
102+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
100103
super(XidEvent, self).__init__(from_packet, event_size, table_map,
101-
ctl_connection)
104+
ctl_connection, **kwargs)
102105
self.xid = struct.unpack('<Q', self.packet.read(8))[0]
103106

104107
def _dump(self):
@@ -109,9 +112,9 @@ def _dump(self):
109112
class QueryEvent(BinLogEvent):
110113
'''This evenement is trigger when a query is run of the database.
111114
Only replicated queries are logged.'''
112-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
115+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
113116
super(QueryEvent, self).__init__(from_packet, event_size, table_map,
114-
ctl_connection)
117+
ctl_connection, **kwargs)
115118

116119
# Post-header
117120
self.slave_proxy_id = self.packet.read_uint32()
@@ -137,7 +140,7 @@ def _dump(self):
137140

138141

139142
class NotImplementedEvent(BinLogEvent):
140-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
143+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
141144
super(NotImplementedEvent, self).__init__(
142-
from_packet, event_size, table_map, ctl_connection)
145+
from_packet, event_size, table_map, ctl_connection, **kwargs)
143146
self.packet.advance(event_size)

pymysqlreplication/packet.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class BinLogPacketWrapper(object):
4848

4949
}
5050

51-
def __init__(self, from_packet, table_map, ctl_connection, use_checksum, allowed_events = None):
51+
def __init__(self, from_packet, table_map, ctl_connection, use_checksum, allowed_events = None, only_tables = None):
5252
# -1 because we ignore the ok byte
5353
self.read_bytes = 0
5454
# Used when we want to override a value in the data buffer
@@ -86,7 +86,9 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum, allowed
8686
if event_class not in allowed_events:
8787
return
8888
self.event = event_class(self, event_size_without_header, table_map,
89-
ctl_connection)
89+
ctl_connection, only_tables = only_tables)
90+
if self.event._processed == False:
91+
self.event = None
9092

9193
def read(self, size):
9294
size = int(size)

pymysqlreplication/row_event.py

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,24 @@
1414

1515

1616
class RowsEvent(BinLogEvent):
17-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
17+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
1818
super(RowsEvent, self).__init__(from_packet, event_size, table_map,
19-
ctl_connection)
19+
ctl_connection, **kwargs)
2020
self.__rows = None
21+
self.__only_tables = kwargs["only_tables"]
2122

2223
#Header
2324
self.table_id = self._read_table_id()
2425
self.primary_key = table_map[self.table_id].data["primary_key"]
2526

27+
# Additional information
28+
self.schema = self.table_map[self.table_id].schema
29+
self.table = self.table_map[self.table_id].table
30+
31+
if self.__only_tables is not None and self.table not in self.__only_tables:
32+
self._processed = False
33+
return
34+
2635
#Event V2
2736
if self.event_type == BINLOG.WRITE_ROWS_EVENT_V2 or \
2837
self.event_type == BINLOG.DELETE_ROWS_EVENT_V2 or \
@@ -36,10 +45,6 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection):
3645
self.number_of_columns = self.packet.read_length_coded_binary()
3746
self.columns = self.table_map[self.table_id].columns
3847

39-
# Additional information
40-
self.schema = self.table_map[self.table_id].schema
41-
self.table = self.table_map[self.table_id].table
42-
4348
def __is_null(self, null_bitmap, position):
4449
bit = null_bitmap[int(position / 8)]
4550
if type(bit) is str:
@@ -370,11 +375,12 @@ class DeleteRowsEvent(RowsEvent):
370375
For each row you have a hash with a single key: values which contain the data of the removed line.
371376
"""
372377

373-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
378+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
374379
super(DeleteRowsEvent, self).__init__(from_packet, event_size,
375-
table_map, ctl_connection)
376-
self.columns_present_bitmap = self.packet.read(
377-
(self.number_of_columns + 7) / 8)
380+
table_map, ctl_connection, **kwargs)
381+
if self._processed:
382+
self.columns_present_bitmap = self.packet.read(
383+
(self.number_of_columns + 7) / 8)
378384

379385
def _fetch_one_row(self):
380386
row = {}
@@ -398,11 +404,12 @@ class WriteRowsEvent(RowsEvent):
398404
For each row you have a hash with a single key: values which contain the data of the new line.
399405
"""
400406

401-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
407+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
402408
super(WriteRowsEvent, self).__init__(from_packet, event_size,
403-
table_map, ctl_connection)
404-
self.columns_present_bitmap = self.packet.read(
405-
(self.number_of_columns + 7) / 8)
409+
table_map, ctl_connection, **kwargs)
410+
if self._processed:
411+
self.columns_present_bitmap = self.packet.read(
412+
(self.number_of_columns + 7) / 8)
406413

407414
def _fetch_one_row(self):
408415
row = {}
@@ -431,14 +438,15 @@ class UpdateRowsEvent(RowsEvent):
431438
http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#sysvar_binlog_row_image
432439
"""
433440

434-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
441+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
435442
super(UpdateRowsEvent, self).__init__(from_packet, event_size,
436-
table_map, ctl_connection)
437-
#Body
438-
self.columns_present_bitmap = self.packet.read(
439-
(self.number_of_columns + 7) / 8)
440-
self.columns_present_bitmap2 = self.packet.read(
441-
(self.number_of_columns + 7) / 8)
443+
table_map, ctl_connection, **kwargs)
444+
if self._processed:
445+
#Body
446+
self.columns_present_bitmap = self.packet.read(
447+
(self.number_of_columns + 7) / 8)
448+
self.columns_present_bitmap2 = self.packet.read(
449+
(self.number_of_columns + 7) / 8)
442450

443451
def _fetch_one_row(self):
444452
row = {}
@@ -468,9 +476,9 @@ class TableMapEvent(BinLogEvent):
468476
A end user of the lib should have no usage of this
469477
"""
470478

471-
def __init__(self, from_packet, event_size, table_map, ctl_connection):
479+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
472480
super(TableMapEvent, self).__init__(from_packet, event_size,
473-
table_map, ctl_connection)
481+
table_map, ctl_connection, **kwargs)
474482

475483
# Post-Header
476484
self.table_id = self._read_table_id()

pymysqlreplication/tests/benchmark.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ def consume_events():
2424
server_id=3,
2525
resume_stream=False,
2626
blocking=True,
27-
only_events = [UpdateRowsEvent])
27+
only_events = [UpdateRowsEvent],
28+
only_tables = ['test'] )
2829
start = time.clock()
2930
i = 0.0
3031
for binlogevent in stream:
@@ -50,13 +51,16 @@ def consume_events():
5051
conn = pymysql.connect(**database)
5152
execute(conn, "CREATE TABLE test (i INT) ENGINE = MEMORY")
5253
execute(conn, "INSERT INTO test VALUES(1)")
54+
execute(conn, "CREATE TABLE test2 (i INT) ENGINE = MEMORY")
55+
execute(conn, "INSERT INTO test2 VALUES(1)")
5356
execute(conn, "RESET MASTER")
5457

5558

5659
if os.fork() != 0:
5760
print("Start insert data")
5861
while True:
5962
execute(conn, "UPDATE test SET i = i + 1;")
63+
execute(conn, "UPDATE test2 SET i = i + 1;")
6064
else:
6165
consume_events()
6266
#cProfile.run('consume_events()')

pymysqlreplication/tests/test_basic.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,28 @@ def test_filtering_ignore_events(self):
111111
event = self.stream.fetchone()
112112
self.assertIsInstance(event, RotateEvent)
113113

114+
def test_filtering_table_event(self):
115+
self.stream.close()
116+
self.stream = BinLogStreamReader(
117+
self.database,
118+
server_id=1024,
119+
only_events=[WriteRowsEvent],
120+
only_tables = ["test_2"])
121+
122+
query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
123+
self.execute(query)
124+
query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
125+
self.execute(query)
126+
127+
self.execute("INSERT INTO test_2 (data) VALUES ('alpha')")
128+
self.execute("INSERT INTO test_3 (data) VALUES ('alpha')")
129+
self.execute("INSERT INTO test_2 (data) VALUES ('beta')")
130+
self.execute("COMMIT")
131+
event = self.stream.fetchone()
132+
self.assertEqual(event.table, "test_2")
133+
event = self.stream.fetchone()
134+
self.assertEqual(event.table, "test_2")
135+
114136
def test_write_row_event(self):
115137
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
116138
self.execute(query)

0 commit comments

Comments
 (0)