diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 1c2154f6..4d0c2b53 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -130,7 +130,8 @@ class BinLogStreamReader(object): def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False, blocking=False, only_events=None, log_file=None, - log_pos=None, filter_non_implemented_events=True, + log_pos=None, end_log_pos=None, + filter_non_implemented_events=True, ignored_events=None, auto_position=None, only_tables=None, ignored_tables=None, only_schemas=None, ignored_schemas=None, @@ -152,6 +153,7 @@ def __init__(self, connection_settings, server_id, log_file: Set replication start log file log_pos: Set replication start log pos (resume_stream should be true) + end_log_pos: Set replication end log pos auto_position: Use master_auto_position gtid to set position only_tables: An array with the tables you want to watch (only works in binlog_format ROW) @@ -205,6 +207,7 @@ def __init__(self, connection_settings, server_id, # Store table meta information self.table_map = {} self.log_pos = log_pos + self.end_log_pos = end_log_pos self.log_file = log_file self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp @@ -416,7 +419,8 @@ def __connect_to_stream(self): self.__connected_stream = True def fetchone(self): - while True: + should_continue = True + while should_continue: if not self.__connected_stream: self.__connect_to_stream() @@ -470,6 +474,10 @@ def fetchone(self): elif binlog_event.log_pos: self.log_pos = binlog_event.log_pos + if self.end_log_pos and self.log_pos >= self.end_log_pos: + # We're currently at, or past, the specified end log position. + should_continue = False + # This check must not occur before clearing the ``table_map`` as a # result of a RotateEvent. #