Skip to content

Add feature to start stream from position #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 24, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ class BinLogStreamReader(object):
"""

def __init__(self, connection_settings={}, resume_stream=False,
blocking=False, only_events=None, server_id=255):
blocking=False, only_events=None, server_id=255,
log_file=None, log_pos=None):
"""
Attributes:
resume_stream: Start for event from position or the latest event of
binlog or from older available event
blocking: Read on stream is blocking
only_events: Array of allowed events
log_file: Set replication start log file
log_pos: Set replication start log pos
"""
self.__connection_settings = connection_settings
self.__connection_settings["charset"] = "utf8"
Expand All @@ -33,11 +36,11 @@ def __init__(self, connection_settings={}, resume_stream=False,
self.__blocking = blocking
self.__only_events = only_events
self.__server_id = server_id
self.__log_pos = None
self.__log_file = None

#Store table meta information
self.table_map = {}
self.log_pos = log_pos
self.log_file = log_file

def close(self):
if self.__connected_stream:
Expand All @@ -56,45 +59,48 @@ def __connect_to_ctl(self):
self.__connected_ctl = True

def __connect_to_stream(self):
self._stream_connection = pymysql.connect(**self.__connection_settings)
cur = self._stream_connection.cursor()
cur.execute("SHOW MASTER STATUS")
log_file, log_pos = cur.fetchone()[:2]
cur.close()

if self.__log_file is None:
self.__log_file = log_file
# log_pos (4) -- position in the binlog-file to start the stream with
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
# server_id (4) -- server id of this slave
# log_file (string.EOF) -- filename of the binlog on the master
command = COM_BINLOG_DUMP
prelude = struct.pack('<i', len(self.__log_file) + 11) \
+ int2byte(command)
if self.__log_pos is None:
if self.__resume_stream:
prelude += struct.pack('<I', log_pos)
else:
prelude += struct.pack('<I', 4)
self._stream_connection = pymysql.connect(**self.__connection_settings)

# only when log_file and log_pos both provided, the position info is
# valid, if not, get the current position from master
if self.log_file is None or self.log_pos is None:
cur = self._stream_connection.cursor()
cur.execute("SHOW MASTER STATUS")
self.log_file, self.log_pos = cur.fetchone()[:2]
cur.close()

prelude = struct.pack('<i', len(self.log_file) + 11) \
+ int2byte(COM_BINLOG_DUMP)

if self.__resume_stream:
prelude += struct.pack('<I', self.log_pos)
else:
prelude += struct.pack('<I', self.__log_pos)
prelude += struct.pack('<I', 4)

if self.__blocking:
prelude += struct.pack('<h', 0)
else:
prelude += struct.pack('<h', 1)

prelude += struct.pack('<I', self.__server_id)
self._stream_connection.wfile.write(prelude + self.__log_file.encode())
prelude += self.log_file.encode()

self._stream_connection.wfile.write(prelude)
self._stream_connection.wfile.flush()
self.__connected_stream = True

def fetchone(self):
while True:
if not self.__connected_stream:
self.__connect_to_stream()

if not self.__connected_ctl:
self.__connect_to_ctl()
pkt = None

try:
pkt = self._stream_connection.read_packet()
except pymysql.OperationalError as error:
Expand All @@ -103,20 +109,26 @@ def fetchone(self):
if code == 2013:
self.__connected_stream = False
continue

if not pkt.is_ok_packet():
return None
binlog_event = BinLogPacketWrapper(
pkt, self.table_map, self._ctl_connection)

binlog_event = BinLogPacketWrapper(pkt, self.table_map,
self._ctl_connection)

if binlog_event.event_type == TABLE_MAP_EVENT:
self.table_map[binlog_event.event.table_id] = \
binlog_event.event.get_table()
if self.__filter_event(binlog_event.event):
continue

if binlog_event.event_type == ROTATE_EVENT:
self.__log_pos = binlog_event.event.position
self.__log_file = binlog_event.event.next_binlog
self.log_pos = binlog_event.event.position
self.log_file = binlog_event.event.next_binlog
else:
self.__log_pos = binlog_event.log_pos
self.log_pos = binlog_event.log_pos

if self.__filter_event(binlog_event.event):
continue

return binlog_event.event

def __filter_event(self, event):
Expand Down
1 change: 1 addition & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class BinLogPacketWrapper(object):
constants.XID_EVENT: event.XidEvent,
constants.INTVAR_EVENT: event.NotImplementedEvent,
constants.GTID_LOG_EVENT: event.NotImplementedEvent,
constants.STOP_EVENT: event.NotImplementedEvent,
# row_event
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,
Expand Down
56 changes: 51 additions & 5 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@ def test_reading_rotate_event(self):
query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)

rotate_event = self.stream.fetchone()
# Rotate event
self.stream.fetchone()
self.stream.close()

query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)

rotate_event = self.stream.fetchone()
# Rotate event
self.stream.fetchone()

def test_connection_stream_lost_event(self):
self.stream.close()
self.stream = BinLogStreamReader(connection_settings = self.database, blocking = True)
self.stream = BinLogStreamReader(connection_settings=self.database,
blocking=True)

query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
Expand All @@ -66,7 +69,8 @@ def test_connection_stream_lost_event(self):

def test_filtering_events(self):
self.stream.close()
self.stream = BinLogStreamReader(connection_settings = self.database, only_events = [QueryEvent])
self.stream = BinLogStreamReader(connection_settings=self.database,
only_events=[QueryEvent])
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)

Expand Down Expand Up @@ -172,6 +176,48 @@ def test_update_row_event(self):
self.assertEqual(event.rows[0]["after_values"]["id"], 1)
self.assertEqual(event.rows[0]["after_values"]["data"], "World")

def test_log_pos(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
query = "INSERT INTO test (data) VALUES('Hello')"
self.execute(query)
self.execute("COMMIT")

for i in range(6):
self.stream.fetchone()
# record position after insert
log_file, log_pos = self.stream.log_file, self.stream.log_pos

query = "UPDATE test SET data = 'World' WHERE id = 1"
self.execute(query)
self.execute("COMMIT")

# resume stream from previous position
if self.stream is not None:
self.stream.close()
self.stream = BinLogStreamReader(
connection_settings=self.database,
resume_stream=True,
log_file=log_file,
log_pos=log_pos
)

# RotateEvent
self.stream.fetchone()
# FormatDescription
self.stream.fetchone()
# XvidEvent
self.stream.fetchone()
# QueryEvent for the BEGIN
self.stream.fetchone()

event = self.stream.fetchone()
self.assertIsInstance(event, TableMapEvent)

event = self.stream.fetchone()
self.assertIsInstance(event, UpdateRowsEvent)


class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
def test_insert_multiple_row_event(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
Expand Down Expand Up @@ -234,7 +280,7 @@ def test_update_multiple_row_event(self):

event = self.stream.fetchone()
if self.isMySQL56AndMore():
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2)
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2)
else:
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1)
self.assertIsInstance(event, UpdateRowsEvent)
Expand Down