From b61572598ccbd94baf68e97ab4facc82f1900466 Mon Sep 17 00:00:00 2001 From: woods-chen Date: Tue, 30 Nov 2021 15:39:56 +0800 Subject: [PATCH 1/3] improve performance and documentation * symplify BinLogStreamReader.fetchone * improve documentation of BinLogStreamReader * tests/test_data_type.py: fix syntax errors * tests/test_basic.py: dynamic get an end position instead a fixed one --- pymysqlreplication/binlogstream.py | 127 +++++++++++++++------ pymysqlreplication/tests/test_basic.py | 18 ++- pymysqlreplication/tests/test_data_type.py | 6 +- 3 files changed, 108 insertions(+), 43 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index f2f29e65..a69b5e5e 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -141,20 +141,48 @@ def __init__(self, connection_settings, server_id, fail_on_table_metadata_unavailable=False, slave_heartbeat=None): """ - Attributes: + Parameters: + connection_settings: a dict of parameters passed to `pymysql.connect` + or `pymysql_wrapper`, of which "db" parameter is not necessary + pymysql_wrapper: custom replacement for `pymysql.connect` ctl_connection_settings: Connection settings for cluster holding - schema information - resume_stream: Start for event from position or the latest event of - binlog or from older available event + schema information, which could be None, in which case + `connection_settings` will be used as ctl_connection_settings, + except for that "db" will be replaced to "information_schema" + resume_stream: True or False. control the start point of the returned + events, only works when `auto_position` is None. + `fetchone` will fetch data from: + 1.the begining of `log_file`: if `resume_stream` is False + 2.`log_pos` of `log_file`: if resume_stream is True, and it's + the first time to fetch the data + 3.the event right next to the last fetched event: when resume_stream + is True and it's not the first time to fetch data + note: the log position will be set back to the begging of `log_file` + each time the client is disconnected and then reconnected + to the mysql server (OperationalError 2006/2013) if resume_stream + is False. so it's suggested to set resume_stream to True. + blocking: When master has finished reading/sending binlog it will send EOF instead of blocking connection. only_events: Array of allowed events ignored_events: Array of ignored events - log_file: Set replication start log file + log_file: Set replication start log file. if ether `log_file` or + `log_pos` is None, and auto_position is None, then log_pos + and log_file will be set as the values returned by the query + "SHOW MASTER STATUS" log_pos: Set replication start log pos (resume_stream should be - true) + true). if ether `log_file` or `log_pos` is None, and auto_position + is None, then log_pos and log_file will be set as the values + returned by the query "SHOW MASTER STATUS", and log_pos will + be set as 4 (the start position of any log file) if resume_stream + is a false value end_log_pos: Set replication end log pos - auto_position: Use master_auto_position gtid to set position + auto_position: a string of replicated GTIDs. all the events except + for thoses included in `auto_position` and those purged by + the source server will be sent to the client. a valid `auto_position` + looks like: + 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10, + 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140 only_tables: An array with the tables you want to watch (only works in binlog_format ROW) ignored_tables: An array with the tables you want to skip @@ -174,13 +202,22 @@ def __init__(self, connection_settings, server_id, many event to skip in binlog). See MASTER_HEARTBEAT_PERIOD in mysql documentation for semantics + + Notes: + the log position will be set back to the begging of `log_file` + each time the client is disconnected and then auto-reconnected + to the mysql server (OperationalError 2006/2013) if resume_stream + is False. so it's suggested to set resume_stream to True. + + an additional RotateEvent and FormatDescriptionEvent will be + fetched each time the client is disconnected and then auto- + reconnected to the server. (no matter resume_stream is True + or False) """ self.__connection_settings = connection_settings self.__connection_settings.setdefault("charset", "utf8") - self.__connected_stream = False - self.__connected_ctl = False self.__resume_stream = resume_stream self.__blocking = blocking self._ctl_connection_settings = ctl_connection_settings @@ -226,24 +263,26 @@ def __init__(self, connection_settings, server_id, self.pymysql_wrapper = pymysql.connect def close(self): - if self.__connected_stream: + if getattr(self, '_stream_connection', None) and self._stream_connection.open: self._stream_connection.close() - self.__connected_stream = False - if self.__connected_ctl: + if getattr(self, '_ctl_connection', None): # break reference cycle between stream reader and underlying # mysql connection object self._ctl_connection._get_table_information = None - self._ctl_connection.close() - self.__connected_ctl = False + if self._ctl_connection.open: + self._ctl_connection.close() - def __connect_to_ctl(self): + def __connect_to_ctl(self, force_reconnect=False): + if self.__connected_ctl: + if not force_reconnect: + return + self._ctl_connection.close() if not self._ctl_connection_settings: self._ctl_connection_settings = dict(self.__connection_settings) self._ctl_connection_settings["db"] = "information_schema" self._ctl_connection_settings["cursorclass"] = DictCursor self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) self._ctl_connection._get_table_information = self.__get_table_information - self.__connected_ctl = True def __checksum_enabled(self): """Return True if binlog-checksum = CRC32. Only for MySQL > 5.6""" @@ -274,12 +313,29 @@ def _register_slave(self): self._stream_connection._next_seq_id = 1 self._stream_connection._read_packet() - def __connect_to_stream(self): + @property + def __connected_stream(self): + return bool(getattr(self, '_stream_connection', None) and \ + self._stream_connection.open) + + @property + def __connected_ctl(self): + return bool(getattr(self, '_ctl_connection', None) and \ + self._ctl_connection.open) + + def __connect_to_stream(self, force_reconnect=False): + if self.__connected_stream: + if not force_reconnect: + return + self._stream_connection.close() + # log_pos (4) -- position in the binlog-file to start the stream with # flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1) # server_id (4) -- server id of this slave # log_file (string.EOF) -- filename of the binlog on the master self._stream_connection = self.pymysql_wrapper(**self.__connection_settings) + if pymysql.__version__ < LooseVersion("0.6"): + self._stream_connection._read_packet = self._stream_connection.read_packet self.__use_checksum = self.__checksum_enabled() @@ -301,9 +357,7 @@ def __connect_to_stream(self): 4294967)) # If heartbeat is too low, the connection will disconnect before, # this is also the behavior in mysql - heartbeat = float(min(net_timeout/2., self.slave_heartbeat)) - if heartbeat > 4294967: - heartbeat = 4294967 + heartbeat = float(min(net_timeout/2., self.slave_heartbeat, 4294967)) # master_heartbeat_period is nanoseconds heartbeat = int(heartbeat * 1000000000) @@ -419,29 +473,27 @@ def __connect_to_stream(self): else: self._stream_connection._write_bytes(prelude) self._stream_connection._next_seq_id = 1 - self.__connected_stream = True - def fetchone(self): + def fetchone(self, force_reconnect=False): + self.__prefetch(force_reconnect=force_reconnect) + return self.__fetchone() + + def __prefetch(self, force_reconnect=False): + self.__connect_to_ctl(force_reconnect=force_reconnect) + self.__connect_to_stream(force_reconnect=force_reconnect) + + def __fetchone(self): + # let `__fetchone` be as light weight as possible. while True: if self.end_log_pos and self.is_past_end_log_pos: return None - if not self.__connected_stream: - self.__connect_to_stream() - - if not self.__connected_ctl: - self.__connect_to_ctl() - try: - if pymysql.__version__ < LooseVersion("0.6"): - pkt = self._stream_connection.read_packet() - else: - pkt = self._stream_connection._read_packet() + pkt = self._stream_connection._read_packet() except pymysql.OperationalError as error: code, message = error.args if code in MYSQL_EXPECTED_ERROR_CODES: - self._stream_connection.close() - self.__connected_stream = False + self.__connect_to_stream(force_reconnect=True) continue raise @@ -555,9 +607,8 @@ def _allowed_event_list(self, only_events, ignored_events, def __get_table_information(self, schema, table): for i in range(1, 3): + self.__connect_to_ctl() try: - if not self.__connected_ctl: - self.__connect_to_ctl() cur = self._ctl_connection.cursor() cur.execute(""" @@ -575,10 +626,10 @@ def __get_table_information(self, schema, table): except pymysql.OperationalError as error: code, message = error.args if code in MYSQL_EXPECTED_ERROR_CODES: - self.__connected_ctl = False continue else: raise error def __iter__(self): - return iter(self.fetchone, None) + self.__prefetch(force_reconnect=False) + return iter(self.__fetchone, None) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index a2ea52fa..66d51616 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -503,13 +503,27 @@ def test_end_log_pos(self): binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + log_pos=0, + log_file=binlog) + # fetch several events then get the end position of + # the last event + # do not use a fixed int as end position, cause that + # may be an invalid position + for i in range(13): + _ = self.stream.fetchone() + binlog = self.stream.log_file + end_position = self.stream.log_pos self.stream.close() self.stream = BinLogStreamReader( self.database, server_id=1024, log_pos=0, log_file=binlog, - end_log_pos=888) + end_log_pos=end_position) last_log_pos = 0 last_event_type = 0 @@ -517,7 +531,7 @@ def test_end_log_pos(self): last_log_pos = self.stream.log_pos last_event_type = event.event_type - self.assertEqual(last_log_pos, 888) + self.assertEqual(last_log_pos, end_position) self.assertEqual(last_event_type, TABLE_MAP_EVENT) class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 2fca96f0..74ab64cf 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -701,7 +701,7 @@ def test_status_vars(self): self.assertEqual(event.catalog_nz_code, b'std') self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test']) - def test_null_bitmask(self) + def test_null_bitmask(self): """Test parse of null-bitmask in table map events Create table with 16 columns with nullability specified by 'bit_mask' variable @@ -730,7 +730,7 @@ def test_null_bitmask(self) column_type = "INT" column_definition.append(column_type) - nullability = "NOT NULL" if not RowsEvent.__is_null(bit_mask, i) else "" + nullability = "NOT NULL" if not RowsEvent._RowsEvent__is_null(None, bit_mask, i) else "" column_definition.append(nullability) columns.append(" ".join(column_definition)) @@ -744,7 +744,7 @@ def test_null_bitmask(self) for i in range(16): values.append('0') - insert_query += f' ({",".join(values)})') + insert_query += f' ({",".join(values)})' self.execute(create_query) self.execute(insert_query) From cbe2234cac6ec9b2cf9f0ef08181ea70d8340b1d Mon Sep 17 00:00:00 2001 From: woods-chen Date: Thu, 2 Dec 2021 00:55:03 +0000 Subject: [PATCH 2/3] skip the first 2 events after reconnected * sleep 5s after disconnected accidently and skip the first 2 events after reconnected * some cleanup and compacting works --- pymysqlreplication/binlogstream.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index a69b5e5e..15ac60bc 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -2,6 +2,7 @@ import pymysql import struct +import time from distutils.version import LooseVersion from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE @@ -30,6 +31,7 @@ # 2006 MySQL server has gone away MYSQL_EXPECTED_ERROR_CODES = [2013, 2006] +PYMYSQL_VERSION_LT_06 = pymysql.__version__ < LooseVersion("0.6") class ReportSlave(object): @@ -208,11 +210,6 @@ def __init__(self, connection_settings, server_id, each time the client is disconnected and then auto-reconnected to the mysql server (OperationalError 2006/2013) if resume_stream is False. so it's suggested to set resume_stream to True. - - an additional RotateEvent and FormatDescriptionEvent will be - fetched each time the client is disconnected and then auto- - reconnected to the server. (no matter resume_stream is True - or False) """ self.__connection_settings = connection_settings @@ -263,7 +260,7 @@ def __init__(self, connection_settings, server_id, self.pymysql_wrapper = pymysql.connect def close(self): - if getattr(self, '_stream_connection', None) and self._stream_connection.open: + if self.__connected_stream: self._stream_connection.close() if getattr(self, '_ctl_connection', None): # break reference cycle between stream reader and underlying @@ -304,7 +301,7 @@ def _register_slave(self): packet = self.report_slave.encoded(self.__server_id) - if pymysql.__version__ < LooseVersion("0.6"): + if PYMYSQL_VERSION_LT_06: self._stream_connection.wfile.write(packet) self._stream_connection.wfile.flush() self._stream_connection.read_packet() @@ -334,7 +331,7 @@ def __connect_to_stream(self, force_reconnect=False): # server_id (4) -- server id of this slave # log_file (string.EOF) -- filename of the binlog on the master self._stream_connection = self.pymysql_wrapper(**self.__connection_settings) - if pymysql.__version__ < LooseVersion("0.6"): + if PYMYSQL_VERSION_LT_06: self._stream_connection._read_packet = self._stream_connection.read_packet self.__use_checksum = self.__checksum_enabled() @@ -467,7 +464,7 @@ def __connect_to_stream(self, force_reconnect=False): # encoded_data prelude += gtid_set.encoded() - if pymysql.__version__ < LooseVersion("0.6"): + if PYMYSQL_VERSION_LT_06: self._stream_connection.wfile.write(prelude) self._stream_connection.wfile.flush() else: @@ -493,7 +490,10 @@ def __fetchone(self): except pymysql.OperationalError as error: code, message = error.args if code in MYSQL_EXPECTED_ERROR_CODES: + time.sleep(5) self.__connect_to_stream(force_reconnect=True) + # skip the first 2 events (RotateEvent and FormatDescriptionEvent) + _ = self.__fetchone(), self.__fetchone() continue raise From 02baf9923331ba5facd32cd311ce409bed560fc4 Mon Sep 17 00:00:00 2001 From: woods-chen Date: Thu, 10 Mar 2022 22:45:21 +0800 Subject: [PATCH 3/3] undo the sleeping and event skipping process when reconnecting modified: pymysqlreplication/binlogstream.py --- pymysqlreplication/binlogstream.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 15ac60bc..ebe7ca60 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -490,10 +490,7 @@ def __fetchone(self): except pymysql.OperationalError as error: code, message = error.args if code in MYSQL_EXPECTED_ERROR_CODES: - time.sleep(5) self.__connect_to_stream(force_reconnect=True) - # skip the first 2 events (RotateEvent and FormatDescriptionEvent) - _ = self.__fetchone(), self.__fetchone() continue raise