Skip to content

Commit 3386cda

Browse files
author
Bartek Ogryczak
committed
Adding support for skipping the binlog until reaching specified timestamp.
1 parent fe5d8f4 commit 3386cda

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
4040
filter_non_implemented_events=True,
4141
ignored_events=None, auto_position=None,
4242
only_tables=None, only_schemas=None,
43-
freeze_schema=False):
43+
freeze_schema=False, skip_to_timestamp=None):
4444
"""
4545
Attributes:
4646
resume_stream: Start for event from position or the latest event of
@@ -54,6 +54,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
5454
only_tables: An array with the tables you want to watch
5555
only_schemas: An array with the schemas you want to watch
5656
freeze_schema: If true do not support ALTER TABLE. It's faster.
57+
skip_to_timestamp: Ignore all events until reaching specified timestamp.
5758
"""
5859
self.__connection_settings = connection_settings
5960
self.__connection_settings["charset"] = "utf8"
@@ -82,6 +83,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
8283
self.log_pos = log_pos
8384
self.log_file = log_file
8485
self.auto_position = auto_position
86+
self.skip_to_timestamp = skip_to_timestamp
8587

8688
def close(self):
8789
if self.__connected_stream:
@@ -259,6 +261,9 @@ def fetchone(self):
259261
self.__only_schemas,
260262
self.__freeze_schema)
261263

264+
if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
265+
continue
266+
262267
if binlog_event.event_type == TABLE_MAP_EVENT and \
263268
binlog_event.event is not None:
264269
self.table_map[binlog_event.event.table_id] = \

pymysqlreplication/tests/test_basic.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# -*- coding: utf-8 -*-
22

3+
import time
4+
35
from pymysqlreplication.tests import base
46
from pymysqlreplication import BinLogStreamReader
57
from pymysqlreplication.event import *
@@ -404,6 +406,26 @@ def test_log_pos_handles_disconnects(self):
404406

405407
self.assertGreater(self.stream.log_pos, 0)
406408

409+
def test_skip_to_timestamp(self):
410+
self.stream.close()
411+
query = "CREATE TABLE test_1 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
412+
self.execute(query)
413+
time.sleep(1)
414+
query = "SELECT UNIX_TIMESTAMP();"
415+
timestamp = self.execute(query).fetchone()[0]
416+
query2 = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
417+
self.execute(query2)
418+
419+
self.stream = BinLogStreamReader(
420+
self.database,
421+
server_id=1024,
422+
skip_to_timestamp=timestamp,
423+
ignored_events=self.ignoredEvents(),
424+
)
425+
event = self.stream.fetchone()
426+
self.assertIsInstance(event, QueryEvent)
427+
self.assertEqual(event.query, query2)
428+
407429

408430
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
409431
def ignoredEvents(self):

0 commit comments

Comments
 (0)