diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 1c2154f6..f2f29e65 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -130,7 +130,8 @@ class BinLogStreamReader(object): def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False, blocking=False, only_events=None, log_file=None, - log_pos=None, filter_non_implemented_events=True, + log_pos=None, end_log_pos=None, + filter_non_implemented_events=True, ignored_events=None, auto_position=None, only_tables=None, ignored_tables=None, only_schemas=None, ignored_schemas=None, @@ -152,6 +153,7 @@ def __init__(self, connection_settings, server_id, log_file: Set replication start log file log_pos: Set replication start log pos (resume_stream should be true) + end_log_pos: Set replication end log pos auto_position: Use master_auto_position gtid to set position only_tables: An array with the tables you want to watch (only works in binlog_format ROW) @@ -205,10 +207,14 @@ def __init__(self, connection_settings, server_id, # Store table meta information self.table_map = {} self.log_pos = log_pos + self.end_log_pos = end_log_pos self.log_file = log_file self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp + if end_log_pos: + self.is_past_end_log_pos = False + if report_slave: self.report_slave = ReportSlave(report_slave) self.slave_uuid = slave_uuid @@ -417,6 +423,9 @@ def __connect_to_stream(self): def fetchone(self): while True: + if self.end_log_pos and self.is_past_end_log_pos: + return None + if not self.__connected_stream: self.__connect_to_stream() @@ -470,6 +479,10 @@ def fetchone(self): elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos + if self.end_log_pos and self.log_pos >= self.end_log_pos: + # We're currently at, or past, the specified end log position. + self.is_past_end_log_pos = True + # This check must not occur before clearing the ``table_map`` as a # result of a RotateEvent. # diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index d5ee1060..f5bedc1a 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -188,11 +188,6 @@ def _dump(self): print("Execution time: %d" % (self.execution_time)) print("Query: %s" % (self.query)) - - # TODO: check if instance attribute with the same name already exists - # TODO: put all the instace attribute in separate class? called status_vars - # TODO: does length need to be remembered? - # TODO: ref(mysql doc. and mysql-server) for each hunk def _read_status_vars_value_for_key(self, key): """parse status variable VALUE for given KEY diff --git a/pymysqlreplication/tests/test_abnormal.py b/pymysqlreplication/tests/test_abnormal.py index 9b0f6818..a3e75d3a 100644 --- a/pymysqlreplication/tests/test_abnormal.py +++ b/pymysqlreplication/tests/test_abnormal.py @@ -72,3 +72,4 @@ def _remove_trailing_rotate_event_from_first_binlog(self): for _ in reader: reader.truncatebinlog() break + diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 42e3b340..a2ea52fa 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -482,6 +482,43 @@ def test_skip_to_timestamp(self): self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query2) + def test_end_log_pos(self): + """Test end_log_pos parameter for BinLogStreamReader + + MUST BE TESTED IN DEFAULT SYSTEM VARIABLES SETTING + + Raises: + AssertionError: if null_bitmask isn't set as specified in 'bit_mask' variable + """ + + self.execute('CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, PRIMARY KEY(id))') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('INSERT INTO test values (NULL)') + self.execute('COMMIT') + #import os + #os._exit(1) + + binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] + + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + log_pos=0, + log_file=binlog, + end_log_pos=888) + + last_log_pos = 0 + last_event_type = 0 + for event in self.stream: + last_log_pos = self.stream.log_pos + last_event_type = event.event_type + + self.assertEqual(last_log_pos, 888) + self.assertEqual(last_event_type, TABLE_MAP_EVENT) class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self):