Skip to content

Commit cf314b2

Browse files
Add support for specifying an end log_pos (julien-duponchelle#357)
Co-authored-by: dongwook-chan <[email protected]>
1 parent f70f05b commit cf314b2

File tree

4 files changed

+52
-6
lines changed

4 files changed

+52
-6
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ class BinLogStreamReader(object):
130130
def __init__(self, connection_settings, server_id,
131131
ctl_connection_settings=None, resume_stream=False,
132132
blocking=False, only_events=None, log_file=None,
133-
log_pos=None, filter_non_implemented_events=True,
133+
log_pos=None, end_log_pos=None,
134+
filter_non_implemented_events=True,
134135
ignored_events=None, auto_position=None,
135136
only_tables=None, ignored_tables=None,
136137
only_schemas=None, ignored_schemas=None,
@@ -152,6 +153,7 @@ def __init__(self, connection_settings, server_id,
152153
log_file: Set replication start log file
153154
log_pos: Set replication start log pos (resume_stream should be
154155
true)
156+
end_log_pos: Set replication end log pos
155157
auto_position: Use master_auto_position gtid to set position
156158
only_tables: An array with the tables you want to watch (only works
157159
in binlog_format ROW)
@@ -205,10 +207,14 @@ def __init__(self, connection_settings, server_id,
205207
# Store table meta information
206208
self.table_map = {}
207209
self.log_pos = log_pos
210+
self.end_log_pos = end_log_pos
208211
self.log_file = log_file
209212
self.auto_position = auto_position
210213
self.skip_to_timestamp = skip_to_timestamp
211214

215+
if end_log_pos:
216+
self.is_past_end_log_pos = False
217+
212218
if report_slave:
213219
self.report_slave = ReportSlave(report_slave)
214220
self.slave_uuid = slave_uuid
@@ -417,6 +423,9 @@ def __connect_to_stream(self):
417423

418424
def fetchone(self):
419425
while True:
426+
if self.end_log_pos and self.is_past_end_log_pos:
427+
return None
428+
420429
if not self.__connected_stream:
421430
self.__connect_to_stream()
422431

@@ -470,6 +479,10 @@ def fetchone(self):
470479
elif binlog_event.log_pos:
471480
self.log_pos = binlog_event.log_pos
472481

482+
if self.end_log_pos and self.log_pos >= self.end_log_pos:
483+
# We're currently at, or past, the specified end log position.
484+
self.is_past_end_log_pos = True
485+
473486
# This check must not occur before clearing the ``table_map`` as a
474487
# result of a RotateEvent.
475488
#

pymysqlreplication/event.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -188,11 +188,6 @@ def _dump(self):
188188
print("Execution time: %d" % (self.execution_time))
189189
print("Query: %s" % (self.query))
190190

191-
192-
# TODO: check if instance attribute with the same name already exists
193-
# TODO: put all the instace attribute in separate class? called status_vars
194-
# TODO: does length need to be remembered?
195-
# TODO: ref(mysql doc. and mysql-server) for each hunk
196191
def _read_status_vars_value_for_key(self, key):
197192
"""parse status variable VALUE for given KEY
198193

pymysqlreplication/tests/test_abnormal.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,4 @@ def _remove_trailing_rotate_event_from_first_binlog(self):
7272
for _ in reader:
7373
reader.truncatebinlog()
7474
break
75+

pymysqlreplication/tests/test_basic.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,43 @@ def test_skip_to_timestamp(self):
482482
self.assertIsInstance(event, QueryEvent)
483483
self.assertEqual(event.query, query2)
484484

485+
def test_end_log_pos(self):
486+
"""Test end_log_pos parameter for BinLogStreamReader
487+
488+
MUST BE TESTED IN DEFAULT SYSTEM VARIABLES SETTING
489+
490+
Raises:
491+
AssertionError: if null_bitmask isn't set as specified in 'bit_mask' variable
492+
"""
493+
494+
self.execute('CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, PRIMARY KEY(id))')
495+
self.execute('INSERT INTO test values (NULL)')
496+
self.execute('INSERT INTO test values (NULL)')
497+
self.execute('INSERT INTO test values (NULL)')
498+
self.execute('INSERT INTO test values (NULL)')
499+
self.execute('INSERT INTO test values (NULL)')
500+
self.execute('COMMIT')
501+
#import os
502+
#os._exit(1)
503+
504+
binlog = self.execute("SHOW BINARY LOGS").fetchone()[0]
505+
506+
self.stream.close()
507+
self.stream = BinLogStreamReader(
508+
self.database,
509+
server_id=1024,
510+
log_pos=0,
511+
log_file=binlog,
512+
end_log_pos=888)
513+
514+
last_log_pos = 0
515+
last_event_type = 0
516+
for event in self.stream:
517+
last_log_pos = self.stream.log_pos
518+
last_event_type = event.event_type
519+
520+
self.assertEqual(last_log_pos, 888)
521+
self.assertEqual(last_event_type, TABLE_MAP_EVENT)
485522

486523
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
487524
def ignoredEvents(self):

0 commit comments

Comments
 (0)