From 459943223057a83ee66eaf018f1a2445e2556994 Mon Sep 17 00:00:00 2001 From: mirageoasis Date: Thu, 31 Aug 2023 16:25:39 +0900 Subject: [PATCH 01/15] feat : mirageoasis typing complete --- pymysqlreplication/tests/test_basic.py | 223 +++++++++++++------------ 1 file changed, 112 insertions(+), 111 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index cb27dada..0e5d8e81 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -18,6 +18,7 @@ from pymysqlreplication.event import * from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * +from pymysql import Connection __all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"] @@ -723,7 +724,7 @@ def test_drop_column(self): self.execute("INSERT INTO test_drop_column VALUES (2)") self.execute("COMMIT") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,) @@ -747,12 +748,12 @@ def test_alter_column(self): self.execute("INSERT INTO test_alter_column VALUES (2, 'Another value', 'A value')") self.execute("COMMIT") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), ) - event = self.stream.fetchone() # insert with two values + event: BinLogEvent = self.stream.fetchone() # insert with two values # both of these asserts fail because of issue underlying proble described in issue #118 # because it got table schema info after the alter table, it wrongly assumes the second # column of the first insert is 'another_data' @@ -769,17 +770,17 @@ class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase): def setUp(self): super().setUp() self.stream.close() - ctl_db = copy.copy(self.database) - ctl_db["db"] = None - ctl_db["port"] = int(os.environ.get("MYSQL_5_7_CTL_PORT") or 3307) - ctl_db["host"] = os.environ.get("MYSQL_5_7_CTL") or "localhost" - self.ctl_conn_control = pymysql.connect(**ctl_db) + ctl_db: dict = copy.copy(self.database) + ctl_db["db"]: str = None + ctl_db["port"]: int = int(os.environ.get("MYSQL_5_7_CTL_PORT") or 3307) + ctl_db["host"]: str = os.environ.get("MYSQL_5_7_CTL") or "localhost" + self.ctl_conn_control: Connection = pymysql.connect(**ctl_db) self.ctl_conn_control.cursor().execute("DROP DATABASE IF EXISTS pymysqlreplication_test") self.ctl_conn_control.cursor().execute("CREATE DATABASE pymysqlreplication_test") self.ctl_conn_control.close() - ctl_db["db"] = "pymysqlreplication_test" - self.ctl_conn_control = pymysql.connect(**ctl_db) - self.stream = BinLogStreamReader( + ctl_db["db"]: str = "pymysqlreplication_test" + self.ctl_conn_control: Connection = pymysql.connect(**ctl_db) + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, ctl_connection_settings=ctl_db, server_id=1024, @@ -798,9 +799,9 @@ def test_separate_ctl_settings_table_metadata_unavailable(self): had_error = False try: - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() except TableMetadataUnavailableError as e: - had_error = True + had_error: bool = True assert "test" in e.args[0] finally: self.resetBinLog() @@ -829,13 +830,13 @@ def setUp(self): raise unittest.SkipTest("database does not support GTID, skipping GTID tests") def test_read_query_event(self): - query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "SELECT @@global.gtid_executed;" - gtid = self.execute(query).fetchone()[0] + query: str = "SELECT @@global.gtid_executed;" + gtid: int = self.execute(query).fetchone()[0] self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=True, auto_position=gtid, ignored_events=[HeartbeatLogEvent]) @@ -843,14 +844,14 @@ def test_read_query_event(self): self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) # Insert first event - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - query = "INSERT INTO test (id, data) VALUES(1, 'Hello');" + query: str = "INSERT INTO test (id, data) VALUES(1, 'Hello');" self.execute(query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) - firstevent = self.stream.fetchone() + firstevent: BinLogEvent = self.stream.fetchone() self.assertIsInstance(firstevent, GtidEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) @@ -859,14 +860,14 @@ def test_read_query_event(self): self.assertIsInstance(self.stream.fetchone(), XidEvent) # Insert second event - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - query = "INSERT INTO test (id, data) VALUES(2, 'Hello');" + query: str = "INSERT INTO test (id, data) VALUES(2, 'Hello');" self.execute(query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) - secondevent = self.stream.fetchone() + secondevent: BinLogEvent = self.stream.fetchone() self.assertIsInstance(secondevent, GtidEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) @@ -877,180 +878,180 @@ def test_read_query_event(self): self.assertEqual(secondevent.gno, firstevent.gno + 1) def test_position_gtid(self): - query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - query = "INSERT INTO test (id, data) VALUES(1, 'Hello');" + query: str = "INSERT INTO test (id, data) VALUES(1, 'Hello');" self.execute(query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) - query = "SELECT @@global.gtid_executed;" - gtid = self.execute(query).fetchone()[0] + query: str = "SELECT @@global.gtid_executed;" + gtid: int = self.execute(query).fetchone()[0] - query = "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=True, auto_position=gtid, ignored_events=[HeartbeatLogEvent]) self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) self.assertIsInstance(self.stream.fetchone(), GtidEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.query, 'CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))'); class TestGtidRepresentation(unittest.TestCase): def test_gtidset_representation(self): - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(set_repr) + myset: GtidSet = GtidSet(set_repr) self.assertEqual(str(myset), set_repr) def test_gtidset_representation_newline(self): - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - mysql_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,\n' \ + mysql_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,\n' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(mysql_repr) + myset: GtidSet = GtidSet(mysql_repr) self.assertEqual(str(myset), set_repr) def test_gtidset_representation_payload(self): - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(set_repr) - payload = myset.encode() - parsedset = myset.decode(io.BytesIO(payload)) + myset: GtidSet = GtidSet(set_repr) + payload: bytes = myset.encode() + parsedset: GtidSet = myset.decode(io.BytesIO(payload)) self.assertEqual(str(myset), str(parsedset)) - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(set_repr) - payload = myset.encode() - parsedset = myset.decode(io.BytesIO(payload)) + myset: GtidSet = GtidSet(set_repr) + payload: bytes = myset.encode() + parsedset: GtidSet = myset.decode(io.BytesIO(payload)) self.assertEqual(str(myset), str(parsedset)) class GtidTests(unittest.TestCase): def test_ordering(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-10") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: int = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-10") assert gtid.__lt__(other) assert gtid.__le__(other) assert other.__gt__(gtid) assert other.__ge__(gtid) - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("deadbeef-20d3-11e5-a393-4a63946f7eac:5-10") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: int = Gtid("deadbeef-20d3-11e5-a393-4a63946f7eac:5-10") assert gtid.__lt__(other) assert gtid.__le__(other) assert other.__gt__(gtid) assert other.__ge__(gtid) def test_encode_decode(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - payload = gtid.encode() - decoded = Gtid.decode(io.BytesIO(payload)) + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + payload: bytes = gtid.encode() + decoded: bytes = Gtid.decode(io.BytesIO(payload)) assert str(gtid) == str(decoded) def test_add_interval(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-56") - end = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-58") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-56") + end: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-58") assert (gtid + end).intervals == [(5, 59)] - start = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-2") + start: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-2") assert (gtid + start).intervals == [(1, 3), (5, 57)] - sparse = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-4:7-10") - within = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-6") + sparse: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-4:7-10") + within: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-6") assert (sparse + within).intervals == [(1, 11)] def test_interval_non_merging(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:58-59") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:58-59") gtid = gtid + other self.assertEqual(str(gtid), "57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56:58-59") def test_merging(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-59") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-59") gtid = gtid + other self.assertEqual(str(gtid), "57b70f4e-20d3-11e5-a393-4a63946f7eac:1-59") def test_sub_interval(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - start = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + start: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5") assert (gtid - start).intervals == [(6, 57)] - end = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:55-56") + end: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:55-56") assert (gtid - end).intervals == [(1, 55)] - within = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:25-26") + within: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:25-26") assert (gtid - within).intervals == [(1, 25), (27, 57)] def test_parsing(self): with self.assertRaises(ValueError) as exc: - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5 57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - gtid = Gtid("NNNNNNNN-20d3-11e5-a393-4a63946f7eac:1-5") - gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-5") - gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:A-1") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:-1") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5 57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + gtid: Gtid = Gtid("NNNNNNNN-20d3-11e5-a393-4a63946f7eac:1-5") + gtid: Gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-5") + gtid: Gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:A-1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:-1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): def test_binlog_checkpoint_event(self): self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1023, blocking=False, is_mariadb=True ) - query = "DROP TABLE IF EXISTS test" + query: str = "DROP TABLE IF EXISTS test" self.execute(query) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.stream.close() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, RotateEvent) - event = self.stream.fetchone() - self.assertIsInstance(event,FormatDescriptionEvent) + event: BinLogEvent = self.stream.fetchone() + self.assertIsInstance(event, FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, MariadbBinLogCheckPointEvent) self.assertEqual(event.filename, self.bin_log_basename()+".000001") class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): def test_annotate_rows_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) # Insert first event - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - insert_query = b"INSERT INTO test (id, data) VALUES(1, 'Hello')" + insert_query: bytes = b"INSERT INTO test (id, data) VALUES(1, 'Hello')" self.execute(insert_query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=False, @@ -1059,38 +1060,38 @@ def test_annotate_rows_event(self): annotate_rows_event=True, ) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() #Check event type 160,MariadbAnnotateRowsEvent - self.assertEqual(event.event_type,160) + self.assertEqual(event.event_type, 160) #Check self.sql_statement - self.assertEqual(event.sql_statement,insert_query) - self.assertIsInstance(event,MariadbAnnotateRowsEvent) + self.assertEqual(event.sql_statement, insert_query) + self.assertIsInstance(event, MariadbAnnotateRowsEvent) def test_start_encryption_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.execute("COMMIT") self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - start_encryption_event = self.stream.fetchone() + start_encryption_event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(start_encryption_event, MariadbStartEncryptionEvent) - schema = start_encryption_event.schema - key_version = start_encryption_event.key_version - nonce = start_encryption_event.nonce + schema: int = start_encryption_event.schema + key_version: int = start_encryption_event.key_version + nonce: bytes = start_encryption_event.nonce from pathlib import Path - encryption_key_file_path = Path(__file__).parent.parent.parent + encryption_key_file_path: Path = Path(__file__).parent.parent.parent try: with open(f"{encryption_key_file_path}/.mariadb/no_encryption_key.key", "r") as key_file: - first_line = key_file.readline() - key_version_from_key_file = int(first_line.split(";")[0]) + first_line: str = key_file.readline() + key_version_from_key_file: int = int(first_line.split(";")[0]) except Exception as e: self.fail("raised unexpected exception: {exception}".format(exception=e)) finally: @@ -1104,11 +1105,11 @@ def test_start_encryption_event(self): def test_gtid_list_event(self): # set max_binlog_size to create new binlog file - query = 'SET GLOBAL max_binlog_size=4096' + query: str = 'SET GLOBAL max_binlog_size=4096' self.execute(query) # parse only Maradb GTID list event self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=False, @@ -1116,21 +1117,21 @@ def test_gtid_list_event(self): is_mariadb=True, ) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" - for cnt in range(0,15): + for cnt in range(0, 15): self.execute(query) self.execute("COMMIT") # 'mariadb gtid list event' of first binlog file - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.event_type,163) self.assertIsInstance(event,MariadbGtidListEvent) # 'mariadb gtid list event' of second binlog file - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.event_type,163) self.assertEqual(event.gtid_list[0].gtid, '0-1-15') @@ -1140,7 +1141,7 @@ class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase): def setUp(self): super().setUp() self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(RandEvent, QueryEvent), @@ -1157,7 +1158,7 @@ def test_rand_event(self): self.assertIsInstance(self.stream.fetchone(), QueryEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) - expect_rand_event = self.stream.fetchone() + expect_rand_event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(expect_rand_event, RandEvent) self.assertEqual(type(expect_rand_event.seed1), int) self.assertEqual(type(expect_rand_event.seed2), int) @@ -1179,7 +1180,7 @@ def tearDown(self): def test_rows_query_log_event(self): self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[RowsQueryLogEvent], @@ -1187,7 +1188,7 @@ def test_rows_query_log_event(self): self.execute("CREATE TABLE IF NOT EXISTS test (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255))") self.execute("INSERT INTO test (name) VALUES ('Soul Lee')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, RowsQueryLogEvent) From 4a6bca2655398c80c74c6f153beb32315320162e Mon Sep 17 00:00:00 2001 From: davinc71998 Date: Thu, 31 Aug 2023 19:13:51 +0900 Subject: [PATCH 02/15] Add type annotations to test_abnormal.py --- pymysqlreplication/tests/test_abnormal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_abnormal.py b/pymysqlreplication/tests/test_abnormal.py index a3e75d3a..26dc9817 100644 --- a/pymysqlreplication/tests/test_abnormal.py +++ b/pymysqlreplication/tests/test_abnormal.py @@ -18,7 +18,7 @@ def ignored_events(): '''Events the BinLogStreamReader should ignore''' return [GtidEvent] - def test_no_trailing_rotate_event(self): + def test_no_trailing_rotate_event(self) -> None: '''A missing RotateEvent and skip_to_timestamp cause corruption This test shows that a binlog file which lacks the trailing RotateEvent @@ -54,7 +54,7 @@ def test_no_trailing_rotate_event(self): # The table_map should be empty because of the binlog being rotated. self.assertEqual({}, self.stream.table_map) - def _remove_trailing_rotate_event_from_first_binlog(self): + def _remove_trailing_rotate_event_from_first_binlog(self) -> None: '''Remove the trailing RotateEvent from the first binlog According to the MySQL Internals Manual, a RotateEvent will be added to From 510b48bfe473426d19694a753064158edac72779 Mon Sep 17 00:00:00 2001 From: davinc71998 Date: Thu, 31 Aug 2023 19:14:28 +0900 Subject: [PATCH 03/15] Add type annotations to column.py --- pymysqlreplication/column.py | 42 ++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/pymysqlreplication/column.py b/pymysqlreplication/column.py index faa1ab7e..5cc85e47 100644 --- a/pymysqlreplication/column.py +++ b/pymysqlreplication/column.py @@ -3,32 +3,36 @@ import struct from .constants import FIELD_TYPE - +from typing import Any, Dict, Optional class Column(object): """Definition of a column """ - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: if len(args) == 3: self.__parse_column_definition(*args) else: self.__dict__.update(kwargs) - def __parse_column_definition(self, column_type, column_schema, packet): - self.type = column_type - self.name = column_schema["COLUMN_NAME"] - self.collation_name = column_schema["COLLATION_NAME"] - self.character_set_name = column_schema["CHARACTER_SET_NAME"] - self.comment = column_schema["COLUMN_COMMENT"] - self.unsigned = column_schema["COLUMN_TYPE"].find("unsigned") != -1 - self.zerofill = column_schema["COLUMN_TYPE"].find("zerofill") != -1 - self.type_is_bool = False - self.is_primary = column_schema["COLUMN_KEY"] == "PRI" + def __parse_column_definition(self, + column_type: str, + column_schema: Dict[str, Any], + packet: Any + ) -> None: + self.type: str = column_type + self.name: str = column_schema["COLUMN_NAME"] + self.collation_name: str = column_schema["COLLATION_NAME"] + self.character_set_name: str = column_schema["CHARACTER_SET_NAME"] + self.comment: str = column_schema["COLUMN_COMMENT"] + self.unsigned: bool = column_schema["COLUMN_TYPE"].find("unsigned") != -1 + self.zerofill: bool = column_schema["COLUMN_TYPE"].find("zerofill") != -1 + self.type_is_bool: bool = False + self.is_primary: bool = column_schema["COLUMN_KEY"] == "PRI" # Check for fixed-length binary type. When that's the case then we need # to zero-pad the values to full length at read time. - self.fixed_binary_length = None + self.fixed_binary_length: Optional[str] = None if column_schema["DATA_TYPE"] == "binary": self.fixed_binary_length = column_schema["CHARACTER_OCTET_LENGTH"] @@ -65,7 +69,7 @@ def __parse_column_definition(self, column_type, column_schema, packet): self.bits = (bytes * 8) + bits self.bytes = int((self.bits + 7) / 8) - def __read_string_metadata(self, packet, column_schema): + def __read_string_metadata(self, packet: Any, column_schema: Dict[str, Any]) -> None: metadata = (packet.read_uint8() << 8) + packet.read_uint8() real_type = metadata >> 8 if real_type == FIELD_TYPE.SET or real_type == FIELD_TYPE.ENUM: @@ -76,7 +80,7 @@ def __read_string_metadata(self, packet, column_schema): self.max_length = (((metadata >> 4) & 0x300) ^ 0x300) \ + (metadata & 0x00ff) - def __read_enum_metadata(self, column_schema): + def __read_enum_metadata(self, column_schema: Dict[str, Any]) -> None: enums = column_schema["COLUMN_TYPE"] if self.type == FIELD_TYPE.ENUM: self.enum_values = [''] + enums.replace('enum(', '')\ @@ -85,15 +89,15 @@ def __read_enum_metadata(self, column_schema): self.set_values = enums.replace('set(', '')\ .replace(')', '').replace('\'', '').split(',') - def __eq__(self, other): + def __eq__(self, other: 'Column') -> bool: return self.data == other.data - def __ne__(self, other): + def __ne__(self, other: 'Column') -> bool: return not self.__eq__(other) - def serializable_data(self): + def serializable_data(self) -> Dict[str, Any]: return self.data @property - def data(self): + def data(self) -> Dict[str, Any]: return dict((k, v) for (k, v) in self.__dict__.items() if not k.startswith('_')) From 2a68bb4af2d81229bcd5c84bd58102430c21202e Mon Sep 17 00:00:00 2001 From: davinc71998 Date: Thu, 31 Aug 2023 19:16:08 +0900 Subject: [PATCH 04/15] Add type annotations to benchmark.py --- pymysqlreplication/tests/benchmark.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/benchmark.py b/pymysqlreplication/tests/benchmark.py index c947d116..c915a9e7 100644 --- a/pymysqlreplication/tests/benchmark.py +++ b/pymysqlreplication/tests/benchmark.py @@ -11,15 +11,19 @@ import os from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import * +from pymysql.connections import Connection +from pymysql.cursors import Cursor + +from typing import Any import cProfile -def execute(con, query): +def execute(con: Connection, query: str) -> Cursor: c = con.cursor() c.execute(query) return c -def consume_events(): +def consume_events() -> None: stream = BinLogStreamReader(connection_settings=database, server_id=3, resume_stream=False, From 248d14463655e76207d9e95e991e6ec083c89a37 Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Thu, 31 Aug 2023 22:28:40 +0900 Subject: [PATCH 05/15] add: type hint for test_basic.py --- pymysqlreplication/tests/test_basic.py | 255 +++++++++++++------------ 1 file changed, 128 insertions(+), 127 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 0e5d8e81..1a697e75 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -4,8 +4,8 @@ import os import sys import time - import pymysql +from typing import List, Type, Union, Optional, Tuple, Dict, Any, Callable if sys.version_info < (2, 7): import unittest2 as unittest @@ -19,59 +19,60 @@ from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * from pymysql import Connection +from pymysqlreplication.event import BinLogEvent __all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): - def ignoredEvents(self): + def ignoredEvents(self) -> List[Type[BinLogEvent]]: return [GtidEvent] - def test_allowed_event_list(self): + def test_allowed_event_list(self) -> None: self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 22) self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 21) self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 21) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) - def test_read_query_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_read_query_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.position, 4) self.assertEqual(event.next_binlog, self.bin_log_basename() + ".000001") self.assertIsInstance(event, RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) - def test_read_query_event_with_unicode(self): - query = u"CREATE TABLE `testÈ` (id INT NOT NULL AUTO_INCREMENT, dataÈ VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_read_query_event_with_unicode(self) -> None: + query: str = u"CREATE TABLE `testÈ` (id INT NOT NULL AUTO_INCREMENT, dataÈ VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.position, 4) self.assertEqual(event.next_binlog, self.bin_log_basename() + ".000001") self.assertIsInstance(event, RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) - def test_reading_rotate_event(self): - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_reading_rotate_event(self) -> None: + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.stream.close() - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) # Rotate event @@ -104,15 +105,15 @@ def test_load_query_event(self): self.assertIsInstance(self.stream.fetchone(), XidEvent) """ - def test_connection_stream_lost_event(self): + def test_connection_stream_lost_event(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=True, ignored_events=self.ignoredEvents()) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query2 = "INSERT INTO test (data) VALUES('a')" + query2: str = "INSERT INTO test (data) VALUES('a')" for i in range(0, 10000): self.execute(query2) self.execute("COMMIT") @@ -120,87 +121,87 @@ def test_connection_stream_lost_event(self): self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) self.conn_control.kill(self.stream._stream_connection.thread_id()) for i in range(0, 10000): - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsNotNone(event) - def test_filtering_only_events(self): + def test_filtering_only_events(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[QueryEvent]) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: BinLogEvent = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) - def test_filtering_ignore_events(self): + def test_filtering_ignore_events(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, ignored_events=[QueryEvent]) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, RotateEvent) - def test_filtering_table_event_with_only_tables(self): + def test_filtering_table_event_with_only_tables(self) -> None: self.stream.close() self.assertEqual(self.bin_log_format(), "ROW") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[WriteRowsEvent], only_tables = ["test_2"] ) - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.execute("INSERT INTO test_2 (data) VALUES ('alpha')") self.execute("INSERT INTO test_3 (data) VALUES ('alpha')") self.execute("INSERT INTO test_2 (data) VALUES ('beta')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_2") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_2") - def test_filtering_table_event_with_ignored_tables(self): + def test_filtering_table_event_with_ignored_tables(self) -> None: self.stream.close() self.assertEqual(self.bin_log_format(), "ROW") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[WriteRowsEvent], ignored_tables = ["test_2"] ) - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.execute("INSERT INTO test_2 (data) VALUES ('alpha')") self.execute("INSERT INTO test_3 (data) VALUES ('alpha')") self.execute("INSERT INTO test_2 (data) VALUES ('beta')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_3") - def test_filtering_table_event_with_only_tables_and_ignored_tables(self): + def test_filtering_table_event_with_only_tables_and_ignored_tables(self) -> None: self.stream.close() self.assertEqual(self.bin_log_format(), "ROW") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[WriteRowsEvent], @@ -208,22 +209,22 @@ def test_filtering_table_event_with_only_tables_and_ignored_tables(self): ignored_tables = ["test_3"] ) - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.execute("INSERT INTO test_2 (data) VALUES ('alpha')") self.execute("INSERT INTO test_3 (data) VALUES ('alpha')") self.execute("INSERT INTO test_2 (data) VALUES ('beta')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_2") - def test_write_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_write_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.execute("COMMIT") @@ -236,7 +237,7 @@ def test_write_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -248,15 +249,15 @@ def test_write_row_event(self): self.assertEqual(event.table, "test") self.assertEqual(event.columns[1].name, 'data') - def test_delete_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_delete_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.resetBinLog() - query = "DELETE FROM test WHERE id = 1" + query: str = "DELETE FROM test WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -277,15 +278,15 @@ def test_delete_row_event(self): self.assertEqual(event.rows[0]["values"]["id"], 1) self.assertEqual(event.rows[0]["values"]["data"], "Hello World") - def test_update_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_update_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) self.resetBinLog() - query = "UPDATE test SET data = 'World' WHERE id = 1" + query: str = "UPDATE test SET data = 'World' WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -297,7 +298,7 @@ def test_update_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) else: @@ -308,12 +309,12 @@ def test_update_row_event(self): self.assertEqual(event.rows[0]["after_values"]["id"], 1) self.assertEqual(event.rows[0]["after_values"]["data"], "World") - def test_minimal_image_write_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_minimal_image_write_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "SET SESSION binlog_row_image = 'minimal'" + query: str = "SET SESSION binlog_row_image = 'minimal'" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.execute("COMMIT") @@ -326,7 +327,7 @@ def test_minimal_image_write_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -338,16 +339,16 @@ def test_minimal_image_write_row_event(self): self.assertEqual(event.table, "test") self.assertEqual(event.columns[1].name, 'data') - def test_minimal_image_delete_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_minimal_image_delete_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) - query = "SET SESSION binlog_row_image = 'minimal'" + query: str = "SET SESSION binlog_row_image = 'minimal'" self.execute(query) self.resetBinLog() - query = "DELETE FROM test WHERE id = 1" + query: str = "DELETE FROM test WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -359,7 +360,7 @@ def test_minimal_image_delete_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V2) else: @@ -368,16 +369,16 @@ def test_minimal_image_delete_row_event(self): self.assertEqual(event.rows[0]["values"]["id"], 1) self.assertEqual(event.rows[0]["values"]["data"], None) - def test_minimal_image_update_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_minimal_image_update_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) - query = "SET SESSION binlog_row_image = 'minimal'" + query: str = "SET SESSION binlog_row_image = 'minimal'" self.execute(query) self.resetBinLog() - query = "UPDATE test SET data = 'World' WHERE id = 1" + query: str = "UPDATE test SET data = 'World' WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -389,7 +390,7 @@ def test_minimal_image_update_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) else: @@ -400,10 +401,10 @@ def test_minimal_image_update_row_event(self): self.assertEqual(event.rows[0]["after_values"]["id"], None) self.assertEqual(event.rows[0]["after_values"]["data"], "World") - def test_log_pos(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_log_pos(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) self.execute("COMMIT") @@ -412,14 +413,14 @@ def test_log_pos(self): # record position after insert log_file, log_pos = self.stream.log_file, self.stream.log_pos - query = "UPDATE test SET data = 'World' WHERE id = 1" + query: str = "UPDATE test SET data = 'World' WHERE id = 1" self.execute(query) self.execute("COMMIT") # resume stream from previous position if self.stream is not None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, resume_stream=True, @@ -438,18 +439,18 @@ def test_log_pos(self): self.assertIsInstance(self.stream.fetchone(), XidEvent) - def test_log_pos_handles_disconnects(self): + def test_log_pos_handles_disconnects(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, resume_stream=False, only_events = [FormatDescriptionEvent, QueryEvent, TableMapEvent, WriteRowsEvent, XidEvent] ) - query = "CREATE TABLE test (id INT PRIMARY KEY AUTO_INCREMENT, data VARCHAR (50) NOT NULL)" + query: str = "CREATE TABLE test (id INT PRIMARY KEY AUTO_INCREMENT, data VARCHAR (50) NOT NULL)" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) self.execute("COMMIT") @@ -465,27 +466,27 @@ def test_log_pos_handles_disconnects(self): self.assertGreater(self.stream.log_pos, 0) - def test_skip_to_timestamp(self): + def test_skip_to_timestamp(self) -> None: self.stream.close() - query = "CREATE TABLE test_1 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_1 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) time.sleep(1) - query = "SELECT UNIX_TIMESTAMP();" + query: str = "SELECT UNIX_TIMESTAMP();" timestamp = self.execute(query).fetchone()[0] query2 = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query2) - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, skip_to_timestamp=timestamp, ignored_events=self.ignoredEvents(), ) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query2) - def test_end_log_pos(self): + def test_end_log_pos(self) -> None: """Test end_log_pos parameter for BinLogStreamReader MUST BE TESTED IN DEFAULT SYSTEM VARIABLES SETTING @@ -507,15 +508,15 @@ def test_end_log_pos(self): binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, log_pos=0, log_file=binlog, end_log_pos=888) - last_log_pos = 0 - last_event_type = 0 + last_log_pos: int = 0 + last_event_type: int = 0 for event in self.stream: last_log_pos = self.stream.log_pos last_event_type = event.event_type @@ -525,16 +526,16 @@ def test_end_log_pos(self): class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): - def ignoredEvents(self): + def ignoredEvents(self) -> List[Type[BinLogEvent]]: return [GtidEvent] - def test_insert_multiple_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_insert_multiple_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.resetBinLog() - query = "INSERT INTO test (data) VALUES('Hello'),('World')" + query: str = "INSERT INTO test (data) VALUES('Hello'),('World')" self.execute(query) self.execute("COMMIT") @@ -545,7 +546,7 @@ def test_insert_multiple_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: str = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -558,17 +559,17 @@ def test_insert_multiple_row_event(self): self.assertEqual(event.rows[1]["values"]["id"], 2) self.assertEqual(event.rows[1]["values"]["data"], "World") - def test_update_multiple_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_update_multiple_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) - query = "INSERT INTO test (data) VALUES('World')" + query: str = "INSERT INTO test (data) VALUES('World')" self.execute(query) self.resetBinLog() - query = "UPDATE test SET data = 'Toto'" + query: str = "UPDATE test SET data = 'Toto'" self.execute(query) self.execute("COMMIT") @@ -596,17 +597,17 @@ def test_update_multiple_row_event(self): self.assertEqual(event.rows[1]["after_values"]["id"], 2) self.assertEqual(event.rows[1]["after_values"]["data"], "Toto") - def test_delete_multiple_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_delete_multiple_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) - query = "INSERT INTO test (data) VALUES('World')" + query: str = "INSERT INTO test (data) VALUES('World')" self.execute(query) self.resetBinLog() - query = "DELETE FROM test" + query: str = "DELETE FROM test" self.execute(query) self.execute("COMMIT") @@ -631,7 +632,7 @@ def test_delete_multiple_row_event(self): self.assertEqual(event.rows[1]["values"]["id"], 2) self.assertEqual(event.rows[1]["values"]["data"], "World") - def test_drop_table(self): + def test_drop_table(self) -> None: self.execute("CREATE TABLE test (id INTEGER(11))") self.execute("INSERT INTO test VALUES (1)") self.execute("DROP TABLE test") @@ -647,10 +648,10 @@ def test_drop_table(self): #QueryEvent for the BEGIN self.stream.fetchone() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -659,51 +660,51 @@ def test_drop_table(self): self.assertEqual([], event.rows) - def test_drop_table_tablemetadata_unavailable(self): + def test_drop_table_tablemetadata_unavailable(self) -> None: self.stream.close() self.execute("CREATE TABLE test (id INTEGER(11))") self.execute("INSERT INTO test VALUES (1)") self.execute("DROP TABLE test") self.execute("COMMIT") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), fail_on_table_metadata_unavailable=True ) - had_error = False + had_error: bool = False try: - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() except TableMetadataUnavailableError as e: - had_error = True + had_error: bool = True assert "test" in e.args[0] finally: self.resetBinLog() assert had_error - def test_ignore_decode_errors(self): - problematic_unicode_string = b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]' + def test_ignore_decode_errors(self) -> None: + problematic_unicode_string: bytes = b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]' self.stream.close() self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)") self.execute_with_args("INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string)) self.execute("COMMIT") # Initialize with ignore_decode_errors=False - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), ignore_decode_errors=False ) - event = self.stream.fetchone() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() with self.assertRaises(UnicodeError) as exception: - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() data = event.rows[0]["values"]["data"] # Initialize with ignore_decode_errors=True - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), @@ -711,11 +712,11 @@ def test_ignore_decode_errors(self): ) self.stream.fetchone() self.stream.fetchone() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() data = event.rows[0]["values"]["data"] self.assertEqual(data, '[{"text":" Some string"}]') - def test_drop_column(self): + def test_drop_column(self) -> None self.stream.close() self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))") self.execute("INSERT INTO test_drop_column VALUES (1, 'A value')") @@ -738,7 +739,7 @@ def test_drop_column(self): self.resetBinLog() @unittest.expectedFailure - def test_alter_column(self): + def test_alter_column(self) -> None: self.stream.close() self.execute("CREATE TABLE test_alter_column (id INTEGER(11), data VARCHAR(50))") self.execute("INSERT INTO test_alter_column VALUES (1, 'A value')") @@ -788,11 +789,11 @@ def setUp(self): fail_on_table_metadata_unavailable=True ) - def tearDown(self): + def tearDown(self) -> None: super().tearDown() self.ctl_conn_control.close() - def test_separate_ctl_settings_table_metadata_unavailable(self): + def test_separate_ctl_settings_table_metadata_unavailable(self) -> None: self.execute("CREATE TABLE test (id INTEGER(11))") self.execute("INSERT INTO test VALUES (1)") self.execute("COMMIT") From 3730e1846958a3d100e28256c125d1bfde8c911d Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Thu, 31 Aug 2023 22:34:11 +0900 Subject: [PATCH 06/15] fix: return value for test_drop_column --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 1a697e75..3b52175c 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -716,7 +716,7 @@ def test_ignore_decode_errors(self) -> None: data = event.rows[0]["values"]["data"] self.assertEqual(data, '[{"text":" Some string"}]') - def test_drop_column(self) -> None + def test_drop_column(self) -> None: self.stream.close() self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))") self.execute("INSERT INTO test_drop_column VALUES (1, 'A value')") From 351b974e110fc6289337c249920f251e762a6ad4 Mon Sep 17 00:00:00 2001 From: lre12 Date: Thu, 31 Aug 2023 22:41:23 +0900 Subject: [PATCH 07/15] Add type annotations to base.py --- pymysqlreplication/tests/base.py | 41 +++++++++++++++++--------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index fd18cb3d..46ffea03 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -2,6 +2,9 @@ import pymysql import copy + +from pymysql.cursors import Cursor + from pymysqlreplication import BinLogStreamReader import os import sys @@ -15,10 +18,10 @@ class PyMySQLReplicationTestCase(base): - def ignoredEvents(self): + def ignoredEvents(self) -> list: return [] - def setUp(self): + def setUp(self) -> None: # default self.database = { "host": os.environ.get("MYSQL_5_7") or "localhost", @@ -43,66 +46,66 @@ def setUp(self): self.isMySQL56AndMore() self.__is_mariaDB = None - def getMySQLVersion(self): + def getMySQLVersion(self) -> str: """Return the MySQL version of the server If version is 5.6.10-log the result is 5.6.10 """ return self.execute("SELECT VERSION()").fetchone()[0].split('-')[0] - def isMySQL56AndMore(self): + def isMySQL56AndMore(self) -> bool: version = float(self.getMySQLVersion().rsplit('.', 1)[0]) if version >= 5.6: return True return False - def isMySQL57(self): + def isMySQL57(self) -> bool: version = float(self.getMySQLVersion().rsplit('.', 1)[0]) return version == 5.7 - def isMySQL80AndMore(self): + def isMySQL80AndMore(self) -> bool: version = float(self.getMySQLVersion().rsplit('.', 1)[0]) return version >= 8.0 - def isMariaDB(self): + def isMariaDB(self) -> bool: if self.__is_mariaDB is None: self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] return self.__is_mariaDB @property - def supportsGTID(self): + def supportsGTID(self) -> bool: if not self.isMySQL56AndMore(): return False return self.execute("SELECT @@global.gtid_mode ").fetchone()[0] == "ON" - def connect_conn_control(self, db): + def connect_conn_control(self, db) -> None: if self.conn_control is not None: self.conn_control.close() self.conn_control = pymysql.connect(**db) - def tearDown(self): + def tearDown(self) -> None: self.conn_control.close() self.conn_control = None self.stream.close() self.stream = None - def execute(self, query): + def execute(self, query: str) -> Cursor: c = self.conn_control.cursor() c.execute(query) return c - def execute_with_args(self, query, args): + def execute_with_args(self, query: str, args) -> Cursor: c = self.conn_control.cursor() c.execute(query, args) return c - def resetBinLog(self): + def resetBinLog(self) -> None: self.execute("RESET MASTER") if self.stream is not None: self.stream.close() self.stream = BinLogStreamReader(self.database, server_id=1024, ignored_events=self.ignoredEvents()) - def set_sql_mode(self): + def set_sql_mode(self) -> None: """set sql_mode to test with same sql_mode (mysql 5.7 sql_mode default is changed)""" version = float(self.getMySQLVersion().rsplit('.', 1)[0]) if version == 5.7: @@ -114,15 +117,15 @@ def bin_log_format(self): result = cursor.fetchone() return result[0] - def bin_log_basename(self): - cursor = self.execute('SELECT @@log_bin_basename') + def bin_log_basename(self) -> str: + cursor: Cursor = self.execute('SELECT @@log_bin_basename') bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase): - def setUp(self): + def setUp(self) -> None: # default self.database = { "host": os.environ.get("MARIADB_10_6") or "localhost", @@ -145,8 +148,8 @@ def setUp(self): self.stream = None self.resetBinLog() - def bin_log_basename(self): - cursor = self.execute('SELECT @@log_bin_basename') + def bin_log_basename(self) -> str: + cursor: Cursor = self.execute('SELECT @@log_bin_basename') bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename From 0778c83e359f36b93833d9fe0aaeceb95a8935b3 Mon Sep 17 00:00:00 2001 From: lre12 Date: Thu, 31 Aug 2023 22:46:02 +0900 Subject: [PATCH 08/15] Fix formatting to base.py --- pymysqlreplication/tests/base.py | 43 ++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 46ffea03..32e09422 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -1,19 +1,21 @@ # -*- coding: utf-8 -*- -import pymysql import copy - -from pymysql.cursors import Cursor - -from pymysqlreplication import BinLogStreamReader import os import sys +import typing +import pymysql + +from pymysqlreplication import BinLogStreamReader if sys.version_info < (2, 7): import unittest2 as unittest else: import unittest +if typing.TYPE_CHECKING: + from pymysql.cursors import Cursor + base = unittest.TestCase @@ -30,7 +32,7 @@ def setUp(self) -> None: "port": 3306, "use_unicode": True, "charset": "utf8", - "db": "pymysqlreplication_test" + "db": "pymysqlreplication_test", } self.conn_control = None @@ -50,25 +52,27 @@ def getMySQLVersion(self) -> str: """Return the MySQL version of the server If version is 5.6.10-log the result is 5.6.10 """ - return self.execute("SELECT VERSION()").fetchone()[0].split('-')[0] + return self.execute("SELECT VERSION()").fetchone()[0].split("-")[0] def isMySQL56AndMore(self) -> bool: - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) if version >= 5.6: return True return False def isMySQL57(self) -> bool: - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) return version == 5.7 def isMySQL80AndMore(self) -> bool: - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) return version >= 8.0 def isMariaDB(self) -> bool: if self.__is_mariaDB is None: - self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] + self.__is_mariaDB = ( + "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] + ) return self.__is_mariaDB @property @@ -92,7 +96,7 @@ def execute(self, query: str) -> Cursor: c = self.conn_control.cursor() c.execute(query) return c - + def execute_with_args(self, query: str, args) -> Cursor: c = self.conn_control.cursor() c.execute(query, args) @@ -102,12 +106,13 @@ def resetBinLog(self) -> None: self.execute("RESET MASTER") if self.stream is not None: self.stream.close() - self.stream = BinLogStreamReader(self.database, server_id=1024, - ignored_events=self.ignoredEvents()) + self.stream = BinLogStreamReader( + self.database, server_id=1024, ignored_events=self.ignoredEvents() + ) def set_sql_mode(self) -> None: """set sql_mode to test with same sql_mode (mysql 5.7 sql_mode default is changed)""" - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) if version == 5.7: self.execute("SET @@sql_mode='NO_ENGINE_SUBSTITUTION'") @@ -118,7 +123,7 @@ def bin_log_format(self): return result[0] def bin_log_basename(self) -> str: - cursor: Cursor = self.execute('SELECT @@log_bin_basename') + cursor: Cursor = self.execute("SELECT @@log_bin_basename") bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename @@ -134,7 +139,7 @@ def setUp(self) -> None: "port": int(os.environ.get("MARIADB_10_6_PORT") or 3308), "use_unicode": True, "charset": "utf8", - "db": "pymysqlreplication_test" + "db": "pymysqlreplication_test", } self.conn_control = None @@ -147,9 +152,9 @@ def setUp(self) -> None: self.connect_conn_control(db) self.stream = None self.resetBinLog() - + def bin_log_basename(self) -> str: - cursor: Cursor = self.execute('SELECT @@log_bin_basename') + cursor: Cursor = self.execute("SELECT @@log_bin_basename") bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename From 7b847b20c8acb5fbf4b9c97715687dd0587d2fd5 Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Fri, 1 Sep 2023 00:40:10 +0900 Subject: [PATCH 09/15] fix: remove unused imports --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 3b52175c..9309354f 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -5,7 +5,7 @@ import sys import time import pymysql -from typing import List, Type, Union, Optional, Tuple, Dict, Any, Callable +from typing import List, Type if sys.version_info < (2, 7): import unittest2 as unittest From 89d34bb1467ed6d6e871e45a49bc0d529d328a4a Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Fri, 1 Sep 2023 00:45:51 +0900 Subject: [PATCH 10/15] fix: import pymysql.cursors always --- pymysqlreplication/tests/base.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 32e09422..17dd10b1 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -3,7 +3,7 @@ import copy import os import sys -import typing +from pymysql.cursors import Cursor import pymysql from pymysqlreplication import BinLogStreamReader @@ -13,9 +13,6 @@ else: import unittest -if typing.TYPE_CHECKING: - from pymysql.cursors import Cursor - base = unittest.TestCase From ec8d8fdb7eec32bb913e93bd9e4ee413aebae5a7 Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Fri, 1 Sep 2023 01:32:59 +0900 Subject: [PATCH 11/15] add: type hints for PyMyReplTestCase --- pymysqlreplication/tests/base.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 17dd10b1..ee952275 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -5,6 +5,8 @@ import sys from pymysql.cursors import Cursor import pymysql +from pymysql import Connection +from typing import Optional from pymysqlreplication import BinLogStreamReader @@ -22,7 +24,7 @@ def ignoredEvents(self) -> list: def setUp(self) -> None: # default - self.database = { + self.database: dict = { "host": os.environ.get("MYSQL_5_7") or "localhost", "user": "root", "passwd": "", @@ -32,7 +34,7 @@ def setUp(self) -> None: "db": "pymysqlreplication_test", } - self.conn_control = None + self.conn_control: Optional[Connection] = None db = copy.copy(self.database) db["db"] = None self.connect_conn_control(db) @@ -40,10 +42,10 @@ def setUp(self) -> None: self.execute("CREATE DATABASE pymysqlreplication_test") db = copy.copy(self.database) self.connect_conn_control(db) - self.stream = None + self.stream: Optional[BinLogStreamReader] = None self.resetBinLog() self.isMySQL56AndMore() - self.__is_mariaDB = None + self.__is_mariaDB: Optional[bool] = None def getMySQLVersion(self) -> str: """Return the MySQL version of the server @@ -67,7 +69,7 @@ def isMySQL80AndMore(self) -> bool: def isMariaDB(self) -> bool: if self.__is_mariaDB is None: - self.__is_mariaDB = ( + self.__is_mariaDB: bool = ( "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] ) return self.__is_mariaDB @@ -81,13 +83,13 @@ def supportsGTID(self) -> bool: def connect_conn_control(self, db) -> None: if self.conn_control is not None: self.conn_control.close() - self.conn_control = pymysql.connect(**db) + self.conn_control: Connection = pymysql.connect(**db) def tearDown(self) -> None: self.conn_control.close() - self.conn_control = None + self.conn_control: Optional[Connection] = None self.stream.close() - self.stream = None + self.stream: Optional[BinLogStreamReader] = None def execute(self, query: str) -> Cursor: c = self.conn_control.cursor() @@ -103,7 +105,7 @@ def resetBinLog(self) -> None: self.execute("RESET MASTER") if self.stream is not None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, ignored_events=self.ignoredEvents() ) @@ -139,7 +141,7 @@ def setUp(self) -> None: "db": "pymysqlreplication_test", } - self.conn_control = None + self.conn_control: Optional[Connection] = None db = copy.copy(self.database) db["db"] = None self.connect_conn_control(db) @@ -147,7 +149,7 @@ def setUp(self) -> None: self.execute("CREATE DATABASE pymysqlreplication_test") db = copy.copy(self.database) self.connect_conn_control(db) - self.stream = None + self.stream: Optional[BinLogStreamReader] = None self.resetBinLog() def bin_log_basename(self) -> str: From a56f4ce8edb0e8ea336037d84b2fe113992c1520 Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Fri, 1 Sep 2023 01:33:25 +0900 Subject: [PATCH 12/15] add: type hints for connections --- pymysqlreplication/tests/benchmark.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/benchmark.py b/pymysqlreplication/tests/benchmark.py index c915a9e7..67d698b2 100644 --- a/pymysqlreplication/tests/benchmark.py +++ b/pymysqlreplication/tests/benchmark.py @@ -48,11 +48,11 @@ def consume_events() -> None: "db": "pymysqlreplication_test" } -conn = pymysql.connect(**database) +conn: Connection = pymysql.connect(**database) execute(conn, "DROP DATABASE IF EXISTS pymysqlreplication_test") execute(conn, "CREATE DATABASE pymysqlreplication_test") -conn = pymysql.connect(**database) +conn: Connection = pymysql.connect(**database) execute(conn, "CREATE TABLE test (i INT) ENGINE = MEMORY") execute(conn, "INSERT INTO test VALUES(1)") execute(conn, "CREATE TABLE test2 (i INT) ENGINE = MEMORY") From 7bca90c738ce473f4c04805a2c5b4f8d803986b9 Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Fri, 1 Sep 2023 01:33:41 +0900 Subject: [PATCH 13/15] add: type hints for stream --- pymysqlreplication/tests/test_abnormal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_abnormal.py b/pymysqlreplication/tests/test_abnormal.py index 26dc9817..7548ca2e 100644 --- a/pymysqlreplication/tests/test_abnormal.py +++ b/pymysqlreplication/tests/test_abnormal.py @@ -42,7 +42,7 @@ def test_no_trailing_rotate_event(self) -> None: binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, log_pos=4, From 7ad5e1a16e22f1db7256fe4e4bbde97b6d334345 Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Fri, 1 Sep 2023 01:33:56 +0900 Subject: [PATCH 14/15] fix: wrong type hint for Gtid --- pymysqlreplication/tests/test_basic.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 9309354f..36d5a871 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -135,7 +135,7 @@ def test_filtering_only_events(self) -> None: self.stream.close() self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[QueryEvent]) - query: BinLogEvent = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) event: BinLogEvent = self.stream.fetchone() @@ -390,7 +390,7 @@ def test_minimal_image_update_row_event(self) -> None: self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event: BinLogEvent = self.stream.fetchone() + event: UpdateRowsEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) else: @@ -947,13 +947,13 @@ def test_gtidset_representation_payload(self): class GtidTests(unittest.TestCase): def test_ordering(self): gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other: int = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-10") + other: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-10") assert gtid.__lt__(other) assert gtid.__le__(other) assert other.__gt__(gtid) assert other.__ge__(gtid) gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other: int = Gtid("deadbeef-20d3-11e5-a393-4a63946f7eac:5-10") + other: Gtid = Gtid("deadbeef-20d3-11e5-a393-4a63946f7eac:5-10") assert gtid.__lt__(other) assert gtid.__le__(other) assert other.__gt__(gtid) @@ -1046,7 +1046,7 @@ def test_annotate_rows_event(self): # Insert first event query: str = "BEGIN;" self.execute(query) - insert_query: bytes = b"INSERT INTO test (id, data) VALUES(1, 'Hello')" + insert_query: str = "INSERT INTO test (id, data) VALUES(1, 'Hello')" self.execute(insert_query) query: str = "COMMIT;" self.execute(query) From 28b1f5ce0a20a0129dbd4bbe97c07bd31c08fa95 Mon Sep 17 00:00:00 2001 From: Soul Lee Date: Fri, 1 Sep 2023 01:35:22 +0900 Subject: [PATCH 15/15] fix: add bytes to query string --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 36d5a871..5f981c5b 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1046,7 +1046,7 @@ def test_annotate_rows_event(self): # Insert first event query: str = "BEGIN;" self.execute(query) - insert_query: str = "INSERT INTO test (id, data) VALUES(1, 'Hello')" + insert_query: str = b"INSERT INTO test (id, data) VALUES(1, 'Hello')" self.execute(insert_query) query: str = "COMMIT;" self.execute(query)