diff --git a/pymysqlreplication/column.py b/pymysqlreplication/column.py index faa1ab7e..5cc85e47 100644 --- a/pymysqlreplication/column.py +++ b/pymysqlreplication/column.py @@ -3,32 +3,36 @@ import struct from .constants import FIELD_TYPE - +from typing import Any, Dict, Optional class Column(object): """Definition of a column """ - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: if len(args) == 3: self.__parse_column_definition(*args) else: self.__dict__.update(kwargs) - def __parse_column_definition(self, column_type, column_schema, packet): - self.type = column_type - self.name = column_schema["COLUMN_NAME"] - self.collation_name = column_schema["COLLATION_NAME"] - self.character_set_name = column_schema["CHARACTER_SET_NAME"] - self.comment = column_schema["COLUMN_COMMENT"] - self.unsigned = column_schema["COLUMN_TYPE"].find("unsigned") != -1 - self.zerofill = column_schema["COLUMN_TYPE"].find("zerofill") != -1 - self.type_is_bool = False - self.is_primary = column_schema["COLUMN_KEY"] == "PRI" + def __parse_column_definition(self, + column_type: str, + column_schema: Dict[str, Any], + packet: Any + ) -> None: + self.type: str = column_type + self.name: str = column_schema["COLUMN_NAME"] + self.collation_name: str = column_schema["COLLATION_NAME"] + self.character_set_name: str = column_schema["CHARACTER_SET_NAME"] + self.comment: str = column_schema["COLUMN_COMMENT"] + self.unsigned: bool = column_schema["COLUMN_TYPE"].find("unsigned") != -1 + self.zerofill: bool = column_schema["COLUMN_TYPE"].find("zerofill") != -1 + self.type_is_bool: bool = False + self.is_primary: bool = column_schema["COLUMN_KEY"] == "PRI" # Check for fixed-length binary type. When that's the case then we need # to zero-pad the values to full length at read time. - self.fixed_binary_length = None + self.fixed_binary_length: Optional[str] = None if column_schema["DATA_TYPE"] == "binary": self.fixed_binary_length = column_schema["CHARACTER_OCTET_LENGTH"] @@ -65,7 +69,7 @@ def __parse_column_definition(self, column_type, column_schema, packet): self.bits = (bytes * 8) + bits self.bytes = int((self.bits + 7) / 8) - def __read_string_metadata(self, packet, column_schema): + def __read_string_metadata(self, packet: Any, column_schema: Dict[str, Any]) -> None: metadata = (packet.read_uint8() << 8) + packet.read_uint8() real_type = metadata >> 8 if real_type == FIELD_TYPE.SET or real_type == FIELD_TYPE.ENUM: @@ -76,7 +80,7 @@ def __read_string_metadata(self, packet, column_schema): self.max_length = (((metadata >> 4) & 0x300) ^ 0x300) \ + (metadata & 0x00ff) - def __read_enum_metadata(self, column_schema): + def __read_enum_metadata(self, column_schema: Dict[str, Any]) -> None: enums = column_schema["COLUMN_TYPE"] if self.type == FIELD_TYPE.ENUM: self.enum_values = [''] + enums.replace('enum(', '')\ @@ -85,15 +89,15 @@ def __read_enum_metadata(self, column_schema): self.set_values = enums.replace('set(', '')\ .replace(')', '').replace('\'', '').split(',') - def __eq__(self, other): + def __eq__(self, other: 'Column') -> bool: return self.data == other.data - def __ne__(self, other): + def __ne__(self, other: 'Column') -> bool: return not self.__eq__(other) - def serializable_data(self): + def serializable_data(self) -> Dict[str, Any]: return self.data @property - def data(self): + def data(self) -> Dict[str, Any]: return dict((k, v) for (k, v) in self.__dict__.items() if not k.startswith('_')) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index fd18cb3d..ee952275 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -1,10 +1,14 @@ # -*- coding: utf-8 -*- -import pymysql import copy -from pymysqlreplication import BinLogStreamReader import os import sys +from pymysql.cursors import Cursor +import pymysql +from pymysql import Connection +from typing import Optional + +from pymysqlreplication import BinLogStreamReader if sys.version_info < (2, 7): import unittest2 as unittest @@ -15,22 +19,22 @@ class PyMySQLReplicationTestCase(base): - def ignoredEvents(self): + def ignoredEvents(self) -> list: return [] - def setUp(self): + def setUp(self) -> None: # default - self.database = { + self.database: dict = { "host": os.environ.get("MYSQL_5_7") or "localhost", "user": "root", "passwd": "", "port": 3306, "use_unicode": True, "charset": "utf8", - "db": "pymysqlreplication_test" + "db": "pymysqlreplication_test", } - self.conn_control = None + self.conn_control: Optional[Connection] = None db = copy.copy(self.database) db["db"] = None self.connect_conn_control(db) @@ -38,73 +42,76 @@ def setUp(self): self.execute("CREATE DATABASE pymysqlreplication_test") db = copy.copy(self.database) self.connect_conn_control(db) - self.stream = None + self.stream: Optional[BinLogStreamReader] = None self.resetBinLog() self.isMySQL56AndMore() - self.__is_mariaDB = None + self.__is_mariaDB: Optional[bool] = None - def getMySQLVersion(self): + def getMySQLVersion(self) -> str: """Return the MySQL version of the server If version is 5.6.10-log the result is 5.6.10 """ - return self.execute("SELECT VERSION()").fetchone()[0].split('-')[0] + return self.execute("SELECT VERSION()").fetchone()[0].split("-")[0] - def isMySQL56AndMore(self): - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + def isMySQL56AndMore(self) -> bool: + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) if version >= 5.6: return True return False - def isMySQL57(self): - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + def isMySQL57(self) -> bool: + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) return version == 5.7 - def isMySQL80AndMore(self): - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + def isMySQL80AndMore(self) -> bool: + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) return version >= 8.0 - def isMariaDB(self): + def isMariaDB(self) -> bool: if self.__is_mariaDB is None: - self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] + self.__is_mariaDB: bool = ( + "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] + ) return self.__is_mariaDB @property - def supportsGTID(self): + def supportsGTID(self) -> bool: if not self.isMySQL56AndMore(): return False return self.execute("SELECT @@global.gtid_mode ").fetchone()[0] == "ON" - def connect_conn_control(self, db): + def connect_conn_control(self, db) -> None: if self.conn_control is not None: self.conn_control.close() - self.conn_control = pymysql.connect(**db) + self.conn_control: Connection = pymysql.connect(**db) - def tearDown(self): + def tearDown(self) -> None: self.conn_control.close() - self.conn_control = None + self.conn_control: Optional[Connection] = None self.stream.close() - self.stream = None + self.stream: Optional[BinLogStreamReader] = None - def execute(self, query): + def execute(self, query: str) -> Cursor: c = self.conn_control.cursor() c.execute(query) return c - - def execute_with_args(self, query, args): + + def execute_with_args(self, query: str, args) -> Cursor: c = self.conn_control.cursor() c.execute(query, args) return c - def resetBinLog(self): + def resetBinLog(self) -> None: self.execute("RESET MASTER") if self.stream is not None: self.stream.close() - self.stream = BinLogStreamReader(self.database, server_id=1024, - ignored_events=self.ignoredEvents()) + self.stream: BinLogStreamReader = BinLogStreamReader( + self.database, server_id=1024, ignored_events=self.ignoredEvents() + ) - def set_sql_mode(self): + def set_sql_mode(self) -> None: """set sql_mode to test with same sql_mode (mysql 5.7 sql_mode default is changed)""" - version = float(self.getMySQLVersion().rsplit('.', 1)[0]) + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) if version == 5.7: self.execute("SET @@sql_mode='NO_ENGINE_SUBSTITUTION'") @@ -114,15 +121,15 @@ def bin_log_format(self): result = cursor.fetchone() return result[0] - def bin_log_basename(self): - cursor = self.execute('SELECT @@log_bin_basename') + def bin_log_basename(self) -> str: + cursor: Cursor = self.execute("SELECT @@log_bin_basename") bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase): - def setUp(self): + def setUp(self) -> None: # default self.database = { "host": os.environ.get("MARIADB_10_6") or "localhost", @@ -131,10 +138,10 @@ def setUp(self): "port": int(os.environ.get("MARIADB_10_6_PORT") or 3308), "use_unicode": True, "charset": "utf8", - "db": "pymysqlreplication_test" + "db": "pymysqlreplication_test", } - self.conn_control = None + self.conn_control: Optional[Connection] = None db = copy.copy(self.database) db["db"] = None self.connect_conn_control(db) @@ -142,11 +149,11 @@ def setUp(self): self.execute("CREATE DATABASE pymysqlreplication_test") db = copy.copy(self.database) self.connect_conn_control(db) - self.stream = None + self.stream: Optional[BinLogStreamReader] = None self.resetBinLog() - - def bin_log_basename(self): - cursor = self.execute('SELECT @@log_bin_basename') + + def bin_log_basename(self) -> str: + cursor: Cursor = self.execute("SELECT @@log_bin_basename") bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename diff --git a/pymysqlreplication/tests/benchmark.py b/pymysqlreplication/tests/benchmark.py index c947d116..67d698b2 100644 --- a/pymysqlreplication/tests/benchmark.py +++ b/pymysqlreplication/tests/benchmark.py @@ -11,15 +11,19 @@ import os from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import * +from pymysql.connections import Connection +from pymysql.cursors import Cursor + +from typing import Any import cProfile -def execute(con, query): +def execute(con: Connection, query: str) -> Cursor: c = con.cursor() c.execute(query) return c -def consume_events(): +def consume_events() -> None: stream = BinLogStreamReader(connection_settings=database, server_id=3, resume_stream=False, @@ -44,11 +48,11 @@ def consume_events(): "db": "pymysqlreplication_test" } -conn = pymysql.connect(**database) +conn: Connection = pymysql.connect(**database) execute(conn, "DROP DATABASE IF EXISTS pymysqlreplication_test") execute(conn, "CREATE DATABASE pymysqlreplication_test") -conn = pymysql.connect(**database) +conn: Connection = pymysql.connect(**database) execute(conn, "CREATE TABLE test (i INT) ENGINE = MEMORY") execute(conn, "INSERT INTO test VALUES(1)") execute(conn, "CREATE TABLE test2 (i INT) ENGINE = MEMORY") diff --git a/pymysqlreplication/tests/test_abnormal.py b/pymysqlreplication/tests/test_abnormal.py index a3e75d3a..7548ca2e 100644 --- a/pymysqlreplication/tests/test_abnormal.py +++ b/pymysqlreplication/tests/test_abnormal.py @@ -18,7 +18,7 @@ def ignored_events(): '''Events the BinLogStreamReader should ignore''' return [GtidEvent] - def test_no_trailing_rotate_event(self): + def test_no_trailing_rotate_event(self) -> None: '''A missing RotateEvent and skip_to_timestamp cause corruption This test shows that a binlog file which lacks the trailing RotateEvent @@ -42,7 +42,7 @@ def test_no_trailing_rotate_event(self): binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, log_pos=4, @@ -54,7 +54,7 @@ def test_no_trailing_rotate_event(self): # The table_map should be empty because of the binlog being rotated. self.assertEqual({}, self.stream.table_map) - def _remove_trailing_rotate_event_from_first_binlog(self): + def _remove_trailing_rotate_event_from_first_binlog(self) -> None: '''Remove the trailing RotateEvent from the first binlog According to the MySQL Internals Manual, a RotateEvent will be added to diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index cb27dada..5f981c5b 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -4,8 +4,8 @@ import os import sys import time - import pymysql +from typing import List, Type if sys.version_info < (2, 7): import unittest2 as unittest @@ -18,59 +18,61 @@ from pymysqlreplication.event import * from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * +from pymysql import Connection +from pymysqlreplication.event import BinLogEvent __all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): - def ignoredEvents(self): + def ignoredEvents(self) -> List[Type[BinLogEvent]]: return [GtidEvent] - def test_allowed_event_list(self): + def test_allowed_event_list(self) -> None: self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 22) self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 21) self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 21) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) - def test_read_query_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_read_query_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.position, 4) self.assertEqual(event.next_binlog, self.bin_log_basename() + ".000001") self.assertIsInstance(event, RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) - def test_read_query_event_with_unicode(self): - query = u"CREATE TABLE `testÈ` (id INT NOT NULL AUTO_INCREMENT, dataÈ VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_read_query_event_with_unicode(self) -> None: + query: str = u"CREATE TABLE `testÈ` (id INT NOT NULL AUTO_INCREMENT, dataÈ VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.position, 4) self.assertEqual(event.next_binlog, self.bin_log_basename() + ".000001") self.assertIsInstance(event, RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) - def test_reading_rotate_event(self): - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_reading_rotate_event(self) -> None: + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.stream.close() - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) # Rotate event @@ -103,15 +105,15 @@ def test_load_query_event(self): self.assertIsInstance(self.stream.fetchone(), XidEvent) """ - def test_connection_stream_lost_event(self): + def test_connection_stream_lost_event(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=True, ignored_events=self.ignoredEvents()) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query2 = "INSERT INTO test (data) VALUES('a')" + query2: str = "INSERT INTO test (data) VALUES('a')" for i in range(0, 10000): self.execute(query2) self.execute("COMMIT") @@ -119,87 +121,87 @@ def test_connection_stream_lost_event(self): self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) self.conn_control.kill(self.stream._stream_connection.thread_id()) for i in range(0, 10000): - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsNotNone(event) - def test_filtering_only_events(self): + def test_filtering_only_events(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[QueryEvent]) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query) - def test_filtering_ignore_events(self): + def test_filtering_ignore_events(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, ignored_events=[QueryEvent]) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, RotateEvent) - def test_filtering_table_event_with_only_tables(self): + def test_filtering_table_event_with_only_tables(self) -> None: self.stream.close() self.assertEqual(self.bin_log_format(), "ROW") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[WriteRowsEvent], only_tables = ["test_2"] ) - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.execute("INSERT INTO test_2 (data) VALUES ('alpha')") self.execute("INSERT INTO test_3 (data) VALUES ('alpha')") self.execute("INSERT INTO test_2 (data) VALUES ('beta')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_2") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_2") - def test_filtering_table_event_with_ignored_tables(self): + def test_filtering_table_event_with_ignored_tables(self) -> None: self.stream.close() self.assertEqual(self.bin_log_format(), "ROW") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[WriteRowsEvent], ignored_tables = ["test_2"] ) - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.execute("INSERT INTO test_2 (data) VALUES ('alpha')") self.execute("INSERT INTO test_3 (data) VALUES ('alpha')") self.execute("INSERT INTO test_2 (data) VALUES ('beta')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_3") - def test_filtering_table_event_with_only_tables_and_ignored_tables(self): + def test_filtering_table_event_with_only_tables_and_ignored_tables(self) -> None: self.stream.close() self.assertEqual(self.bin_log_format(), "ROW") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[WriteRowsEvent], @@ -207,22 +209,22 @@ def test_filtering_table_event_with_only_tables_and_ignored_tables(self): ignored_tables = ["test_3"] ) - query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.execute("INSERT INTO test_2 (data) VALUES ('alpha')") self.execute("INSERT INTO test_3 (data) VALUES ('alpha')") self.execute("INSERT INTO test_2 (data) VALUES ('beta')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.table, "test_2") - def test_write_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_write_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.execute("COMMIT") @@ -235,7 +237,7 @@ def test_write_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -247,15 +249,15 @@ def test_write_row_event(self): self.assertEqual(event.table, "test") self.assertEqual(event.columns[1].name, 'data') - def test_delete_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_delete_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.resetBinLog() - query = "DELETE FROM test WHERE id = 1" + query: str = "DELETE FROM test WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -276,15 +278,15 @@ def test_delete_row_event(self): self.assertEqual(event.rows[0]["values"]["id"], 1) self.assertEqual(event.rows[0]["values"]["data"], "Hello World") - def test_update_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_update_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) self.resetBinLog() - query = "UPDATE test SET data = 'World' WHERE id = 1" + query: str = "UPDATE test SET data = 'World' WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -296,7 +298,7 @@ def test_update_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) else: @@ -307,12 +309,12 @@ def test_update_row_event(self): self.assertEqual(event.rows[0]["after_values"]["id"], 1) self.assertEqual(event.rows[0]["after_values"]["data"], "World") - def test_minimal_image_write_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_minimal_image_write_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "SET SESSION binlog_row_image = 'minimal'" + query: str = "SET SESSION binlog_row_image = 'minimal'" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.execute("COMMIT") @@ -325,7 +327,7 @@ def test_minimal_image_write_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -337,16 +339,16 @@ def test_minimal_image_write_row_event(self): self.assertEqual(event.table, "test") self.assertEqual(event.columns[1].name, 'data') - def test_minimal_image_delete_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_minimal_image_delete_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) - query = "SET SESSION binlog_row_image = 'minimal'" + query: str = "SET SESSION binlog_row_image = 'minimal'" self.execute(query) self.resetBinLog() - query = "DELETE FROM test WHERE id = 1" + query: str = "DELETE FROM test WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -358,7 +360,7 @@ def test_minimal_image_delete_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V2) else: @@ -367,16 +369,16 @@ def test_minimal_image_delete_row_event(self): self.assertEqual(event.rows[0]["values"]["id"], 1) self.assertEqual(event.rows[0]["values"]["data"], None) - def test_minimal_image_update_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_minimal_image_update_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) - query = "SET SESSION binlog_row_image = 'minimal'" + query: str = "SET SESSION binlog_row_image = 'minimal'" self.execute(query) self.resetBinLog() - query = "UPDATE test SET data = 'World' WHERE id = 1" + query: str = "UPDATE test SET data = 'World' WHERE id = 1" self.execute(query) self.execute("COMMIT") @@ -388,7 +390,7 @@ def test_minimal_image_update_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: UpdateRowsEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) else: @@ -399,10 +401,10 @@ def test_minimal_image_update_row_event(self): self.assertEqual(event.rows[0]["after_values"]["id"], None) self.assertEqual(event.rows[0]["after_values"]["data"], "World") - def test_log_pos(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_log_pos(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) self.execute("COMMIT") @@ -411,14 +413,14 @@ def test_log_pos(self): # record position after insert log_file, log_pos = self.stream.log_file, self.stream.log_pos - query = "UPDATE test SET data = 'World' WHERE id = 1" + query: str = "UPDATE test SET data = 'World' WHERE id = 1" self.execute(query) self.execute("COMMIT") # resume stream from previous position if self.stream is not None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, resume_stream=True, @@ -437,18 +439,18 @@ def test_log_pos(self): self.assertIsInstance(self.stream.fetchone(), XidEvent) - def test_log_pos_handles_disconnects(self): + def test_log_pos_handles_disconnects(self) -> None: self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, resume_stream=False, only_events = [FormatDescriptionEvent, QueryEvent, TableMapEvent, WriteRowsEvent, XidEvent] ) - query = "CREATE TABLE test (id INT PRIMARY KEY AUTO_INCREMENT, data VARCHAR (50) NOT NULL)" + query: str = "CREATE TABLE test (id INT PRIMARY KEY AUTO_INCREMENT, data VARCHAR (50) NOT NULL)" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) self.execute("COMMIT") @@ -464,27 +466,27 @@ def test_log_pos_handles_disconnects(self): self.assertGreater(self.stream.log_pos, 0) - def test_skip_to_timestamp(self): + def test_skip_to_timestamp(self) -> None: self.stream.close() - query = "CREATE TABLE test_1 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test_1 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) time.sleep(1) - query = "SELECT UNIX_TIMESTAMP();" + query: str = "SELECT UNIX_TIMESTAMP();" timestamp = self.execute(query).fetchone()[0] query2 = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query2) - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, skip_to_timestamp=timestamp, ignored_events=self.ignoredEvents(), ) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, QueryEvent) self.assertEqual(event.query, query2) - def test_end_log_pos(self): + def test_end_log_pos(self) -> None: """Test end_log_pos parameter for BinLogStreamReader MUST BE TESTED IN DEFAULT SYSTEM VARIABLES SETTING @@ -506,15 +508,15 @@ def test_end_log_pos(self): binlog = self.execute("SHOW BINARY LOGS").fetchone()[0] self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, log_pos=0, log_file=binlog, end_log_pos=888) - last_log_pos = 0 - last_event_type = 0 + last_log_pos: int = 0 + last_event_type: int = 0 for event in self.stream: last_log_pos = self.stream.log_pos last_event_type = event.event_type @@ -524,16 +526,16 @@ def test_end_log_pos(self): class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): - def ignoredEvents(self): + def ignoredEvents(self) -> List[Type[BinLogEvent]]: return [GtidEvent] - def test_insert_multiple_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_insert_multiple_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.resetBinLog() - query = "INSERT INTO test (data) VALUES('Hello'),('World')" + query: str = "INSERT INTO test (data) VALUES('Hello'),('World')" self.execute(query) self.execute("COMMIT") @@ -544,7 +546,7 @@ def test_insert_multiple_row_event(self): self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - event = self.stream.fetchone() + event: str = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -557,17 +559,17 @@ def test_insert_multiple_row_event(self): self.assertEqual(event.rows[1]["values"]["id"], 2) self.assertEqual(event.rows[1]["values"]["data"], "World") - def test_update_multiple_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_update_multiple_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) - query = "INSERT INTO test (data) VALUES('World')" + query: str = "INSERT INTO test (data) VALUES('World')" self.execute(query) self.resetBinLog() - query = "UPDATE test SET data = 'Toto'" + query: str = "UPDATE test SET data = 'Toto'" self.execute(query) self.execute("COMMIT") @@ -595,17 +597,17 @@ def test_update_multiple_row_event(self): self.assertEqual(event.rows[1]["after_values"]["id"], 2) self.assertEqual(event.rows[1]["after_values"]["data"], "Toto") - def test_delete_multiple_row_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + def test_delete_multiple_row_event(self) -> None: + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello')" + query: str = "INSERT INTO test (data) VALUES('Hello')" self.execute(query) - query = "INSERT INTO test (data) VALUES('World')" + query: str = "INSERT INTO test (data) VALUES('World')" self.execute(query) self.resetBinLog() - query = "DELETE FROM test" + query: str = "DELETE FROM test" self.execute(query) self.execute("COMMIT") @@ -630,7 +632,7 @@ def test_delete_multiple_row_event(self): self.assertEqual(event.rows[1]["values"]["id"], 2) self.assertEqual(event.rows[1]["values"]["data"], "World") - def test_drop_table(self): + def test_drop_table(self) -> None: self.execute("CREATE TABLE test (id INTEGER(11))") self.execute("INSERT INTO test VALUES (1)") self.execute("DROP TABLE test") @@ -646,10 +648,10 @@ def test_drop_table(self): #QueryEvent for the BEGIN self.stream.fetchone() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, TableMapEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() if self.isMySQL56AndMore(): self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) else: @@ -658,51 +660,51 @@ def test_drop_table(self): self.assertEqual([], event.rows) - def test_drop_table_tablemetadata_unavailable(self): + def test_drop_table_tablemetadata_unavailable(self) -> None: self.stream.close() self.execute("CREATE TABLE test (id INTEGER(11))") self.execute("INSERT INTO test VALUES (1)") self.execute("DROP TABLE test") self.execute("COMMIT") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), fail_on_table_metadata_unavailable=True ) - had_error = False + had_error: bool = False try: - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() except TableMetadataUnavailableError as e: - had_error = True + had_error: bool = True assert "test" in e.args[0] finally: self.resetBinLog() assert had_error - def test_ignore_decode_errors(self): - problematic_unicode_string = b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]' + def test_ignore_decode_errors(self) -> None: + problematic_unicode_string: bytes = b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]' self.stream.close() self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)") self.execute_with_args("INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string)) self.execute("COMMIT") # Initialize with ignore_decode_errors=False - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), ignore_decode_errors=False ) - event = self.stream.fetchone() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() with self.assertRaises(UnicodeError) as exception: - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() data = event.rows[0]["values"]["data"] # Initialize with ignore_decode_errors=True - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), @@ -710,11 +712,11 @@ def test_ignore_decode_errors(self): ) self.stream.fetchone() self.stream.fetchone() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() data = event.rows[0]["values"]["data"] self.assertEqual(data, '[{"text":" Some string"}]') - def test_drop_column(self): + def test_drop_column(self) -> None: self.stream.close() self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))") self.execute("INSERT INTO test_drop_column VALUES (1, 'A value')") @@ -723,7 +725,7 @@ def test_drop_column(self): self.execute("INSERT INTO test_drop_column VALUES (2)") self.execute("COMMIT") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,) @@ -737,7 +739,7 @@ def test_drop_column(self): self.resetBinLog() @unittest.expectedFailure - def test_alter_column(self): + def test_alter_column(self) -> None: self.stream.close() self.execute("CREATE TABLE test_alter_column (id INTEGER(11), data VARCHAR(50))") self.execute("INSERT INTO test_alter_column VALUES (1, 'A value')") @@ -747,12 +749,12 @@ def test_alter_column(self): self.execute("INSERT INTO test_alter_column VALUES (2, 'Another value', 'A value')") self.execute("COMMIT") - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(WriteRowsEvent,), ) - event = self.stream.fetchone() # insert with two values + event: BinLogEvent = self.stream.fetchone() # insert with two values # both of these asserts fail because of issue underlying proble described in issue #118 # because it got table schema info after the alter table, it wrongly assumes the second # column of the first insert is 'another_data' @@ -769,17 +771,17 @@ class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase): def setUp(self): super().setUp() self.stream.close() - ctl_db = copy.copy(self.database) - ctl_db["db"] = None - ctl_db["port"] = int(os.environ.get("MYSQL_5_7_CTL_PORT") or 3307) - ctl_db["host"] = os.environ.get("MYSQL_5_7_CTL") or "localhost" - self.ctl_conn_control = pymysql.connect(**ctl_db) + ctl_db: dict = copy.copy(self.database) + ctl_db["db"]: str = None + ctl_db["port"]: int = int(os.environ.get("MYSQL_5_7_CTL_PORT") or 3307) + ctl_db["host"]: str = os.environ.get("MYSQL_5_7_CTL") or "localhost" + self.ctl_conn_control: Connection = pymysql.connect(**ctl_db) self.ctl_conn_control.cursor().execute("DROP DATABASE IF EXISTS pymysqlreplication_test") self.ctl_conn_control.cursor().execute("CREATE DATABASE pymysqlreplication_test") self.ctl_conn_control.close() - ctl_db["db"] = "pymysqlreplication_test" - self.ctl_conn_control = pymysql.connect(**ctl_db) - self.stream = BinLogStreamReader( + ctl_db["db"]: str = "pymysqlreplication_test" + self.ctl_conn_control: Connection = pymysql.connect(**ctl_db) + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, ctl_connection_settings=ctl_db, server_id=1024, @@ -787,20 +789,20 @@ def setUp(self): fail_on_table_metadata_unavailable=True ) - def tearDown(self): + def tearDown(self) -> None: super().tearDown() self.ctl_conn_control.close() - def test_separate_ctl_settings_table_metadata_unavailable(self): + def test_separate_ctl_settings_table_metadata_unavailable(self) -> None: self.execute("CREATE TABLE test (id INTEGER(11))") self.execute("INSERT INTO test VALUES (1)") self.execute("COMMIT") had_error = False try: - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() except TableMetadataUnavailableError as e: - had_error = True + had_error: bool = True assert "test" in e.args[0] finally: self.resetBinLog() @@ -829,13 +831,13 @@ def setUp(self): raise unittest.SkipTest("database does not support GTID, skipping GTID tests") def test_read_query_event(self): - query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "SELECT @@global.gtid_executed;" - gtid = self.execute(query).fetchone()[0] + query: str = "SELECT @@global.gtid_executed;" + gtid: int = self.execute(query).fetchone()[0] self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=True, auto_position=gtid, ignored_events=[HeartbeatLogEvent]) @@ -843,14 +845,14 @@ def test_read_query_event(self): self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) # Insert first event - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - query = "INSERT INTO test (id, data) VALUES(1, 'Hello');" + query: str = "INSERT INTO test (id, data) VALUES(1, 'Hello');" self.execute(query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) - firstevent = self.stream.fetchone() + firstevent: BinLogEvent = self.stream.fetchone() self.assertIsInstance(firstevent, GtidEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) @@ -859,14 +861,14 @@ def test_read_query_event(self): self.assertIsInstance(self.stream.fetchone(), XidEvent) # Insert second event - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - query = "INSERT INTO test (id, data) VALUES(2, 'Hello');" + query: str = "INSERT INTO test (id, data) VALUES(2, 'Hello');" self.execute(query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) - secondevent = self.stream.fetchone() + secondevent: BinLogEvent = self.stream.fetchone() self.assertIsInstance(secondevent, GtidEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) @@ -877,180 +879,180 @@ def test_read_query_event(self): self.assertEqual(secondevent.gno, firstevent.gno + 1) def test_position_gtid(self): - query = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - query = "INSERT INTO test (id, data) VALUES(1, 'Hello');" + query: str = "INSERT INTO test (id, data) VALUES(1, 'Hello');" self.execute(query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) - query = "SELECT @@global.gtid_executed;" - gtid = self.execute(query).fetchone()[0] + query: str = "SELECT @@global.gtid_executed;" + gtid: int = self.execute(query).fetchone()[0] - query = "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=True, auto_position=gtid, ignored_events=[HeartbeatLogEvent]) self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) self.assertIsInstance(self.stream.fetchone(), GtidEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.query, 'CREATE TABLE test2 (id INT NOT NULL, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))'); class TestGtidRepresentation(unittest.TestCase): def test_gtidset_representation(self): - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(set_repr) + myset: GtidSet = GtidSet(set_repr) self.assertEqual(str(myset), set_repr) def test_gtidset_representation_newline(self): - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - mysql_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,\n' \ + mysql_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,\n' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(mysql_repr) + myset: GtidSet = GtidSet(mysql_repr) self.assertEqual(str(myset), set_repr) def test_gtidset_representation_payload(self): - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(set_repr) - payload = myset.encode() - parsedset = myset.decode(io.BytesIO(payload)) + myset: GtidSet = GtidSet(set_repr) + payload: bytes = myset.encode() + parsedset: GtidSet = myset.decode(io.BytesIO(payload)) self.assertEqual(str(myset), str(parsedset)) - set_repr = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1,' \ + set_repr: str = '57b70f4e-20d3-11e5-a393-4a63946f7eac:1,' \ '4350f323-7565-4e59-8763-4b1b83a0ce0e:1-20' - myset = GtidSet(set_repr) - payload = myset.encode() - parsedset = myset.decode(io.BytesIO(payload)) + myset: GtidSet = GtidSet(set_repr) + payload: bytes = myset.encode() + parsedset: GtidSet = myset.decode(io.BytesIO(payload)) self.assertEqual(str(myset), str(parsedset)) class GtidTests(unittest.TestCase): def test_ordering(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-10") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-10") assert gtid.__lt__(other) assert gtid.__le__(other) assert other.__gt__(gtid) assert other.__ge__(gtid) - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("deadbeef-20d3-11e5-a393-4a63946f7eac:5-10") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: Gtid = Gtid("deadbeef-20d3-11e5-a393-4a63946f7eac:5-10") assert gtid.__lt__(other) assert gtid.__le__(other) assert other.__gt__(gtid) assert other.__ge__(gtid) def test_encode_decode(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - payload = gtid.encode() - decoded = Gtid.decode(io.BytesIO(payload)) + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + payload: bytes = gtid.encode() + decoded: bytes = Gtid.decode(io.BytesIO(payload)) assert str(gtid) == str(decoded) def test_add_interval(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-56") - end = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-58") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-56") + end: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-58") assert (gtid + end).intervals == [(5, 59)] - start = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-2") + start: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-2") assert (gtid + start).intervals == [(1, 3), (5, 57)] - sparse = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-4:7-10") - within = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-6") + sparse: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-4:7-10") + within: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:5-6") assert (sparse + within).intervals == [(1, 11)] def test_interval_non_merging(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:58-59") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:58-59") gtid = gtid + other self.assertEqual(str(gtid), "57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56:58-59") def test_merging(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - other = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-59") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + other: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:57-59") gtid = gtid + other self.assertEqual(str(gtid), "57b70f4e-20d3-11e5-a393-4a63946f7eac:1-59") def test_sub_interval(self): - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - start = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + start: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5") assert (gtid - start).intervals == [(6, 57)] - end = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:55-56") + end: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:55-56") assert (gtid - end).intervals == [(1, 55)] - within = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:25-26") + within: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:25-26") assert (gtid - within).intervals == [(1, 25), (27, 57)] def test_parsing(self): with self.assertRaises(ValueError) as exc: - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5 57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") - gtid = Gtid("NNNNNNNN-20d3-11e5-a393-4a63946f7eac:1-5") - gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-5") - gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:A-1") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:-1") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") - gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-5 57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56") + gtid: Gtid = Gtid("NNNNNNNN-20d3-11e5-a393-4a63946f7eac:1-5") + gtid: Gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-5") + gtid: Gtid = Gtid("-20d3-11e5-a393-4a63946f7eac:1-") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:A-1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:-1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") + gtid: Gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): def test_binlog_checkpoint_event(self): self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1023, blocking=False, is_mariadb=True ) - query = "DROP TABLE IF EXISTS test" + query: str = "DROP TABLE IF EXISTS test" self.execute(query) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) self.stream.close() - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, RotateEvent) - event = self.stream.fetchone() - self.assertIsInstance(event,FormatDescriptionEvent) + event: BinLogEvent = self.stream.fetchone() + self.assertIsInstance(event, FormatDescriptionEvent) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, MariadbBinLogCheckPointEvent) self.assertEqual(event.filename, self.bin_log_basename()+".000001") class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase): def test_annotate_rows_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) # Insert first event - query = "BEGIN;" + query: str = "BEGIN;" self.execute(query) - insert_query = b"INSERT INTO test (id, data) VALUES(1, 'Hello')" + insert_query: str = b"INSERT INTO test (id, data) VALUES(1, 'Hello')" self.execute(insert_query) - query = "COMMIT;" + query: str = "COMMIT;" self.execute(query) self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=False, @@ -1059,38 +1061,38 @@ def test_annotate_rows_event(self): annotate_rows_event=True, ) - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() #Check event type 160,MariadbAnnotateRowsEvent - self.assertEqual(event.event_type,160) + self.assertEqual(event.event_type, 160) #Check self.sql_statement - self.assertEqual(event.sql_statement,insert_query) - self.assertIsInstance(event,MariadbAnnotateRowsEvent) + self.assertEqual(event.sql_statement, insert_query) + self.assertIsInstance(event, MariadbAnnotateRowsEvent) def test_start_encryption_event(self): - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" self.execute(query) self.execute("COMMIT") self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - start_encryption_event = self.stream.fetchone() + start_encryption_event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(start_encryption_event, MariadbStartEncryptionEvent) - schema = start_encryption_event.schema - key_version = start_encryption_event.key_version - nonce = start_encryption_event.nonce + schema: int = start_encryption_event.schema + key_version: int = start_encryption_event.key_version + nonce: bytes = start_encryption_event.nonce from pathlib import Path - encryption_key_file_path = Path(__file__).parent.parent.parent + encryption_key_file_path: Path = Path(__file__).parent.parent.parent try: with open(f"{encryption_key_file_path}/.mariadb/no_encryption_key.key", "r") as key_file: - first_line = key_file.readline() - key_version_from_key_file = int(first_line.split(";")[0]) + first_line: str = key_file.readline() + key_version_from_key_file: int = int(first_line.split(";")[0]) except Exception as e: self.fail("raised unexpected exception: {exception}".format(exception=e)) finally: @@ -1104,11 +1106,11 @@ def test_start_encryption_event(self): def test_gtid_list_event(self): # set max_binlog_size to create new binlog file - query = 'SET GLOBAL max_binlog_size=4096' + query: str = 'SET GLOBAL max_binlog_size=4096' self.execute(query) # parse only Maradb GTID list event self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, blocking=False, @@ -1116,21 +1118,21 @@ def test_gtid_list_event(self): is_mariadb=True, ) - query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + query: str = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query) - query = "INSERT INTO test (data) VALUES('Hello World')" + query: str = "INSERT INTO test (data) VALUES('Hello World')" - for cnt in range(0,15): + for cnt in range(0, 15): self.execute(query) self.execute("COMMIT") # 'mariadb gtid list event' of first binlog file - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.event_type,163) self.assertIsInstance(event,MariadbGtidListEvent) # 'mariadb gtid list event' of second binlog file - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertEqual(event.event_type,163) self.assertEqual(event.gtid_list[0].gtid, '0-1-15') @@ -1140,7 +1142,7 @@ class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase): def setUp(self): super().setUp() self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=(RandEvent, QueryEvent), @@ -1157,7 +1159,7 @@ def test_rand_event(self): self.assertIsInstance(self.stream.fetchone(), QueryEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) - expect_rand_event = self.stream.fetchone() + expect_rand_event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(expect_rand_event, RandEvent) self.assertEqual(type(expect_rand_event.seed1), int) self.assertEqual(type(expect_rand_event.seed2), int) @@ -1179,7 +1181,7 @@ def tearDown(self): def test_rows_query_log_event(self): self.stream.close() - self.stream = BinLogStreamReader( + self.stream: BinLogStreamReader = BinLogStreamReader( self.database, server_id=1024, only_events=[RowsQueryLogEvent], @@ -1187,7 +1189,7 @@ def test_rows_query_log_event(self): self.execute("CREATE TABLE IF NOT EXISTS test (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255))") self.execute("INSERT INTO test (name) VALUES ('Soul Lee')") self.execute("COMMIT") - event = self.stream.fetchone() + event: BinLogEvent = self.stream.fetchone() self.assertIsInstance(event, RowsQueryLogEvent)