diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index aeea07aa..a789a244 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,12 +3,15 @@ import binascii import struct import datetime +import pymysql import decimal import zlib - +from pymysql.connection import MysqlPacket +from typing import Dict, List, NoReturn, Tuple, Type, Union from pymysqlreplication.constants.STATUS_VAR_KEY import * from pymysqlreplication.exceptions import StatusVariableMismatch from typing import Union, Optional +from pymysqlreplication.table import Table class BinLogEvent(object): @@ -22,23 +25,24 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, fail_on_table_metadata_unavailable=False, ignore_decode_errors=False, verify_checksum=False,): - self.packet = from_packet - self.table_map = table_map - self.event_type = self.packet.event_type - self.timestamp = self.packet.timestamp - self.event_size = event_size - self._ctl_connection = ctl_connection - self.mysql_version = mysql_version - self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable - self._ignore_decode_errors = ignore_decode_errors + self.packet:Type[MysqlPacket] = from_packet + self.table_map: Dict[int, Table] = table_map + self.event_type: int = self.packet.event_type + self.timestamp: int = self.packet.timestamp + self.event_size: int = event_size + self._ctl_connection: pymysql.connections.Connection = ctl_connection + self.mysql_version: Tuple[int, int ,int] = mysql_version + self._fail_on_table_metadata_unavailable: bool = fail_on_table_metadata_unavailable + self._ignore_decode_errors: bool = ignore_decode_errors self._verify_checksum = verify_checksum self._is_event_valid = None # The event have been fully processed, if processed is false # the event will be skipped - self._processed = True - self.complete = True + self._processed: bool = True + self.complete: bool = True self._verify_event() + def _read_table_id(self): # Table ID is 6 byte # pad little-endian number @@ -75,7 +79,7 @@ class GtidEvent(BinLogEvent): """ GTID change in binlog event - For more information: `[GTID] `_ `[see also] `_ + For more information : `[see GtidEvent] `_ `[see also] `_ :ivar commit_flag: 1byte - 00000001 = Transaction may have changes logged with SBR. In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and 8.0.2, this flag is cleared if the transaction only contains row events. It is set if any part of the transaction is written in statement format. @@ -89,14 +93,14 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.commit_flag = struct.unpack("!B", self.packet.read(1))[0] == 1 - self.sid = self.packet.read(16) - self.gno = struct.unpack('= (5, 7): - self.last_committed = struct.unpack('`_. + for more information: `[see MariadbGtidEvent] `_. :ivar server_id: int - The ID of the server where the GTID event occurred. :ivar gtid_seq_no: int - The sequence number of the GTID event. @@ -171,11 +175,11 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.server_id = self.packet.server_id - self.gtid_seq_no = self.packet.read_uint64() - self.domain_id = self.packet.read_uint32() - self.flags = self.packet.read_uint8() - self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + self.server_id: int = self.packet.server_id + self.gtid_seq_no: int = self.packet.read_uint64() + self.domain_id: int = self.packet.read_uint32() + self.flags: int = self.packet.read_uint8() + self.gtid: str = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) def _dump(self): super()._dump() @@ -186,8 +190,7 @@ class MariadbBinLogCheckPointEvent(BinLogEvent): """ Represents a checkpoint in a binlog event in MariaDB. - More details are available in the MariaDB Knowledge Base: - https://mariadb.com/kb/en/binlog_checkpoint_event/ + for more information: `[see MaridbBinLogCheckPointEvent] `_ :ivar filename_length: int - The length of the filename. :ivar filename: str - The name of the file saved at the checkpoint. @@ -196,8 +199,8 @@ class MariadbBinLogCheckPointEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(MariadbBinLogCheckPointEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - filename_length = self.packet.read_uint32() - self.filename = self.packet.read(filename_length).decode() + filename_length: int = self.packet.read_uint32() + self.filename: str = self.packet.read(filename_length).decode() def _dump(self): print('Filename:', self.filename) @@ -212,7 +215,7 @@ class MariadbAnnotateRowsEvent(BinLogEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.sql_statement = self.packet.read(event_size) + self.sql_statement: str = self.packet.read(event_size) def _dump(self): super()._dump() @@ -220,17 +223,14 @@ def _dump(self): class MariadbGtidListEvent(BinLogEvent): """ - GTID List event - https://mariadb.com/kb/en/gtid_list_event/ + for more information: `[see MariadbGtidListEvent] `_ :ivar gtid_length: int - Number of GTIDs - :ivar gtid_list: list - list of 'MariadbGtidObejct' - - 'MariadbGtidObejct' Attributes: - domain_id: Replication Domain ID - server_id: Server_ID - gtid_seq_no: GTID sequence - gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no' + :ivar gtid_list: List - list of 'MariadbGtidObejct' + :ivar domain_id: int - Replication Domain ID (MariadbGtidObject) + :ivar server_id: int -Server_ID (MariadbGtidObject) + :ivar gtid_seq_no: int - GTID sequence (MariadbGtidObject) + :ivar gtid: str - 'domain_id'+ 'server_id' + 'gtid_seq_no' (MariadbGtidObject) """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): @@ -242,14 +242,14 @@ class MariadbGtidObejct(BinLogEvent): """ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.domain_id = self.packet.read_uint32() - self.server_id = self.packet.read_uint32() - self.gtid_seq_no = self.packet.read_uint64() - self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + self.domain_id: int = self.packet.read_uint32() + self.server_id: int = self.packet.read_uint32() + self.gtid_seq_no: int = self.packet.read_uint64() + self.gtid: str = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) - self.gtid_length = self.packet.read_uint32() - self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)] + self.gtid_length: int = self.packet.read_uint32() + self.gtid_list: List[MariadbGtidObejct] = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)] class RotateEvent(BinLogEvent): @@ -257,18 +257,26 @@ class RotateEvent(BinLogEvent): Change MySQL bin log file Represents information for the slave to know the name of the binary log it is going to receive. - For more information: `[see details] `_. + For more information: `[see RotateEvent] `_. + + In detail, the class creates the following python objects in the constructor: :ivar position: int - Position inside next binlog :ivar next_binlog: str - Name of next binlog file """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) - self.position = struct.unpack(' None: + """Dumps the python objects for the event.""" print("=== %s ===" % (self.__class__.__name__)) print("Position: %d" % self.position) print("Next binlog file: %s" % self.next_binlog) @@ -280,30 +288,37 @@ class XAPrepareEvent(BinLogEvent): An XA prepare event is generated for a XA prepared transaction. Like Xid_event, it contains XID of the **prepared** transaction. - For more information: `[see details] `_. + For more information: `[see XAPrepareEvent] `_. + + :ivar one_phase: str - current XA transaction commit method + :ivar xid_format_id: int - a number that identifies the format used by the gtrid and bqual values + :ivar xid: str - serialized XID representation of XA transaction (xid_gtrid + xid_bqual) - :ivar one_phase: current XA transaction commit method - :ivar xid_format_id: a number that identifies the format used by the gtrid and bqual values - :ivar xid: serialized XID representation of XA transaction (xid_gtrid + xid_bqual) """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # one_phase is True: XA COMMIT ... ONE PHASE # one_phase is False: XA PREPARE - self.one_phase = (self.packet.read(1) != b'\x00') - self.xid_format_id = struct.unpack(' str: + """Gets the xid python objects.""" return self.xid_gtrid.decode() + self.xid_bqual.decode() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" print("One phase: %s" % self.one_phase) print("XID formatID: %d" % self.xid_format_id) print("XID: %s" % self.xid) @@ -311,7 +326,7 @@ def _dump(self): class FormatDescriptionEvent(BinLogEvent): """ - Represents a Format Description Event in the MySQL binary log. + The event Represents a Format Description Event in the MySQL binary log. This event is written at the start of a binary log file for binlog version 4. It provides the necessary information to decode subsequent events in the file. @@ -319,45 +334,56 @@ class FormatDescriptionEvent(BinLogEvent): :ivar binlog_version: int - Version of the binary log format. :ivar mysql_version_str: str - Server's MySQL version in string format. """ - - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) - self.binlog_version = struct.unpack(' None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.binlog_version: str = struct.unpack(' None: + """Dumps the python objects for the event.""" print("Binlog version: %s" % self.binlog_version) print("MySQL version: %s" % self.mysql_version_str) class StopEvent(BinLogEvent): + """Does nothing.""" pass class XidEvent(BinLogEvent): """ - A COMMIT event generated when COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine occurs. + A COMMIT event generated when COMMIT of a transaction that modifies + one or more tables of an XA-capable storage engine occurs. - For more information: `[see details] `_. + For more information: `[see XidEvent] `_. :ivar xid: uint - Transaction ID for 2 Phase Commit. """ + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.xid: int = struct.unpack(' None: + """Dumps the python objects for the event.""" super()._dump() print("Transaction ID: %d" % (self.xid)) class HeartbeatLogEvent(BinLogEvent): - """A Heartbeat event + """ + A Heartbeat event Heartbeats are sent by the master. Master sends heartbeats when there are no unsent events in the binary log file after certain period of time. The interval is defined by MASTER_HEARTBEAT_PERIOD connection setting. @@ -377,16 +403,21 @@ class HeartbeatLogEvent(BinLogEvent): (see max_binlog_size, defaults to 1G). In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). - :ivar ident: Name of the current binlog - """ + In detail, the class creates the following python objects in the constructor: - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, - table_map, ctl_connection, - **kwargs) - self.ident = self.packet.read(event_size).decode() + :ivar ident: str - Name of the current binlog + """ + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.ident: str = self.packet.read(event_size).decode() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("Current binlog: %s" % (self.ident)) @@ -396,25 +427,31 @@ class QueryEvent(BinLogEvent): QueryEvent is generated for each query that modified database. If row-based replication is used, DML will not be logged as RowsEvent instead. + In detail, the class creates the following python objects in the constructor: + :ivar slave_proxy_id: int - The id of the thread that issued this statement on the master server - :ivar execution_time: int - The time from when the query started to when it was logged in the binlog, in seconds. - :ivar schema_length: int - The length of the name of the currently selected database. + :ivar execution_time: int - The time from when the query started to when it was logged in the binlog, in seconds + :ivar schema_length: int - The length of the name of the currently selected database :ivar error_code: int - Error code generated by the master :ivar status_vars_length: int - The length of the status variable - :ivar schema: str - The name of the currently selected database. - :ivar query: str - The query executed. + :ivar schema: str - The name of the currently selected database + :ivar query: str - The query executed """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Post-header - self.slave_proxy_id = self.packet.read_uint32() - self.execution_time = self.packet.read_uint32() - self.schema_length = struct.unpack("!B", self.packet.read(1))[0] - self.error_code = self.packet.read_uint16() - self.status_vars_length = self.packet.read_uint16() + self.slave_proxy_id: int = self.packet.read_uint32() + self.execution_time: int = self.packet.read_uint32() + self.schema_length: int = struct.unpack("!B", self.packet.read(1))[0] + self.error_code: int = self.packet.read_uint16() + self.status_vars_length: int = self.packet.read_uint16() # Payload status_vars_end_pos = self.packet.read_bytes + self.status_vars_length @@ -424,65 +461,97 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) # read VALUE for status variable self._read_status_vars_value_for_key(status_vars_key) - self.schema = self.packet.read(self.schema_length) + self.schema: str = self.packet.read(self.schema_length) self.packet.advance(1) + #string[EOF] query query = self.packet.read(event_size - 13 - self.status_vars_length - self.schema_length - 1) - self.query = query.decode("utf-8", errors='backslashreplace') - #string[EOF] query + self.query:str = query.decode("utf-8", errors='backslashreplace') - def _dump(self): + def _dump(self) -> None: + """Dump the python objects for the event.""" super()._dump() print("Schema: %s" % (self.schema)) print("Execution time: %d" % (self.execution_time)) print("Query: %s" % (self.query)) - def _read_status_vars_value_for_key(self, key): - """parse status variable VALUE for given KEY + def _read_status_vars_value_for_key(self, key: int) -> Union[None, NoReturn]: + """ + CAUTION: this function returns nothing(not None) or raises StatusVariableMismatch! + + It parses status variable VALUE for given KEY. A status variable in query events is a sequence of status KEY-VALUE pairs. Parsing logic from mysql-server source code edited by dongwook-chan https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336 - :ivar key: key for status variable + See details about status variable at: + https://mariadb.com/kb/en/query_event/ + https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Query__event.html + + In detail, the function creates python objects depending on the given key: + + :ivar flags2: int + :ivar sql_mode: int + :ivar auto_increment_increment: int + :ivar auto_increment_offset: int + :ivar character_set_client: int + :ivar collation_connection: int + :ivar collation_server: int + :ivar time_zone: str + :ivar catalog_nz_code: str + :ivar lc_time_names_number: int + :ivar charset_database_number: int + :ivar table_map_for_update: int + :ivar user: str + :ivar host: str + :ivar mts_accessed_db_names: [str, ...] + :ivar microseconds: int + :ivar explicit_defaults_ts: bool + :ivar ddl_xid: int + :ivar default_collation_for_utf8mb4_number: int + :ivar sql_require_primary_key: int + :ivar default_table_encryption: int + :ivar hrnow: int + :ivar xid: int """ if key == Q_FLAGS2_CODE: # 0x00 - self.flags2 = self.packet.read_uint32() + self.flags2: int = self.packet.read_uint32() elif key == Q_SQL_MODE_CODE: # 0x01 - self.sql_mode = self.packet.read_uint64() + self.sql_mode: int = self.packet.read_uint64() elif key == Q_CATALOG_CODE: # 0x02 for MySQL 5.0.x pass elif key == Q_AUTO_INCREMENT: # 0x03 - self.auto_increment_increment = self.packet.read_uint16() - self.auto_increment_offset = self.packet.read_uint16() + self.auto_increment_increment: int = self.packet.read_uint16() + self.auto_increment_offset: int = self.packet.read_uint16() elif key == Q_CHARSET_CODE: # 0x04 - self.character_set_client = self.packet.read_uint16() - self.collation_connection = self.packet.read_uint16() - self.collation_server = self.packet.read_uint16() + self.character_set_client: int = self.packet.read_uint16() + self.collation_connection: int = self.packet.read_uint16() + self.collation_server: int = self.packet.read_uint16() elif key == Q_TIME_ZONE_CODE: # 0x05 time_zone_len = self.packet.read_uint8() if time_zone_len: - self.time_zone = self.packet.read(time_zone_len) + self.time_zone: str = self.packet.read(time_zone_len) elif key == Q_CATALOG_NZ_CODE: # 0x06 catalog_len = self.packet.read_uint8() if catalog_len: - self.catalog_nz_code = self.packet.read(catalog_len) + self.catalog_nz_code: str = self.packet.read(catalog_len) elif key == Q_LC_TIME_NAMES_CODE: # 0x07 - self.lc_time_names_number = self.packet.read_uint16() + self.lc_time_names_number: int = self.packet.read_uint16() elif key == Q_CHARSET_DATABASE_CODE: # 0x08 - self.charset_database_number = self.packet.read_uint16() + self.charset_database_number: int = self.packet.read_uint16() elif key == Q_TABLE_MAP_FOR_UPDATE_CODE: # 0x09 - self.table_map_for_update = self.packet.read_uint64() + self.table_map_for_update: int = self.packet.read_uint64() elif key == Q_MASTER_DATA_WRITTEN_CODE: # 0x0A pass elif key == Q_INVOKER: # 0x0B user_len = self.packet.read_uint8() if user_len: - self.user = self.packet.read(user_len) + self.user: str = self.packet.read(user_len) host_len = self.packet.read_uint8() if host_len: - self.host = self.packet.read(host_len) + self.host: str = self.packet.read(host_len) elif key == Q_UPDATED_DB_NAMES: # 0x0C mts_accessed_dbs = self.packet.read_uint8() """ @@ -500,47 +569,55 @@ def _read_status_vars_value_for_key(self, key): for i in range(mts_accessed_dbs): db = self.packet.read_string() dbs.append(db) - self.mts_accessed_db_names = dbs + self.mts_accessed_db_names: [str, ...] = dbs elif key == Q_MICROSECONDS: # 0x0D - self.microseconds = self.packet.read_uint24() + self.microseconds: int = self.packet.read_uint24() elif key == Q_COMMIT_TS: # 0x0E pass elif key == Q_COMMIT_TS2: # 0x0F pass elif key == Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:# 0x10 - self.explicit_defaults_ts = self.packet.read_uint8() + self.explicit_defaults_ts: bool = self.packet.read_uint8() elif key == Q_DDL_LOGGED_WITH_XID: # 0x11 - self.ddl_xid = self.packet.read_uint64() + self.ddl_xid: int = self.packet.read_uint64() elif key == Q_DEFAULT_COLLATION_FOR_UTF8MB4: # 0x12 - self.default_collation_for_utf8mb4_number = self.packet.read_uint16() + self.default_collation_for_utf8mb4_number: int = self.packet.read_uint16() elif key == Q_SQL_REQUIRE_PRIMARY_KEY: # 0x13 - self.sql_require_primary_key = self.packet.read_uint8() + self.sql_require_primary_key: int = self.packet.read_uint8() elif key == Q_DEFAULT_TABLE_ENCRYPTION: # 0x14 - self.default_table_encryption = self.packet.read_uint8() + self.default_table_encryption: int = self.packet.read_uint8() elif key == Q_HRNOW: - self.hrnow = self.packet.read_uint24() + self.hrnow: int = self.packet.read_uint24() elif key == Q_XID: - self.xid = self.packet.read_uint64() + self.xid: int = self.packet.read_uint64() else: raise StatusVariableMismatch + class BeginLoadQueryEvent(BinLogEvent): """ This event is written into the binary log file for LOAD DATA INFILE events if the server variable binlog_mode was set to "STATEMENT". - :ivar file_id: the id of the file - :ivar block-data: data block about "LOAD DATA INFILE" + In detail, the class creates the following python objects in the constructor: + + :ivar file_id: int - the id of the file + :ivar block-data: str - data block about "LOAD DATA INFILE" """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload - self.file_id = self.packet.read_uint32() - self.block_data = self.packet.read(event_size - 4) + self.file_id: int = self.packet.read_uint32() + self.block_data: str = self.packet.read(event_size - 4) - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("File id: %d" % (self.file_id)) print("Block data: %s" % (self.block_data)) @@ -553,9 +630,11 @@ class ExecuteLoadQueryEvent(BinLogEvent): Since QueryEvent cannot explain this special action, ExecuteLoadQueryEvent is needed. So it is similar to a QUERY_EVENT except that it has extra static fields. + In detail, the class creates the following python objects in the constructor: + :ivar slave_proxy_id: int - The id of the thread that issued this statement on the master server :ivar execution_time: int - The number of seconds that the statement took to execute - :ivar schema_length: int - The length of the default database's name when the statement was executed. + :ivar schema_length: int - The length of the default database's name when the statement was executed :ivar error_code: int - The error code resulting from execution of the statement on the master :ivar status_vars_length: int - The length of the status variable block :ivar file_id: int - The id of the loaded file @@ -563,24 +642,29 @@ class ExecuteLoadQueryEvent(BinLogEvent): :ivar end_pos: int - Offset from the start of the statement to the end of the filename :ivar dup_handling_flags: int - How LOAD DATA INFILE handles duplicated data (0x0: error, 0x1: ignore, 0x2: replace) """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Post-header - self.slave_proxy_id = self.packet.read_uint32() - self.execution_time = self.packet.read_uint32() - self.schema_length = self.packet.read_uint8() - self.error_code = self.packet.read_uint16() - self.status_vars_length = self.packet.read_uint16() + self.slave_proxy_id: int = self.packet.read_uint32() + self.execution_time: int = self.packet.read_uint32() + self.schema_length: int = self.packet.read_uint8() + self.error_code: int = self.packet.read_uint16() + self.status_vars_length: int = self.packet.read_uint16() # Payload - self.file_id = self.packet.read_uint32() - self.start_pos = self.packet.read_uint32() - self.end_pos = self.packet.read_uint32() - self.dup_handling_flags = self.packet.read_uint8() + self.file_id: int = self.packet.read_uint32() + self.start_pos: int = self.packet.read_uint32() + self.end_pos: int = self.packet.read_uint32() + self.dup_handling_flags: int = self.packet.read_uint8() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super(ExecuteLoadQueryEvent, self)._dump() print("Slave proxy id: %d" % (self.slave_proxy_id)) print("Execution time: %d" % (self.execution_time)) @@ -599,18 +683,24 @@ class IntvarEvent(BinLogEvent): This event will be created just before a QueryEvent. :ivar type: int - 1 byte identifying the type of variable stored. + Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2). :ivar value: int - The value of the variable """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) # Payload - self.type = self.packet.read_uint8() - self.value = self.packet.read_uint32() + self.type: int = self.packet.read_uint8() + self.value: int = self.packet.read_uint32() - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("type: %d" % (self.type)) print("Value: %d" % (self.value)) @@ -619,32 +709,41 @@ def _dump(self): class RandEvent(BinLogEvent): """ RandEvent is generated every time a statement uses the RAND() function. - Indicates the seed values to use for generating a random number with RAND() in the next statement. + It indicates the seed values to use for generating a random number with RAND() in the next statement. RandEvent only works in statement-based logging (need to set binlog_format as 'STATEMENT') and only works when the seed number is not specified. + + In detail, the class creates the following python objects in the constructor: :ivar seed1: int - value for the first seed - :ivar seed2: int - value for the second seed + :ivar seed2: int - value for the second seed """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) + + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str): + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + # Payload - self._seed1 = self.packet.read_uint64() - self._seed2 = self.packet.read_uint64() + self._seed1: int = self.packet.read_uint64() + self._seed2: int = self.packet.read_uint64() @property - def seed1(self): - """Get the first seed value""" + def seed1(self) -> int: + """Gets the first seed value.""" return self._seed1 @property - def seed2(self): - """Get the second seed value""" + def seed2(self) -> int: + """Gets the second seed value.""" return self._seed2 - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" super()._dump() print("seed1: %d" % (self.seed1)) print("seed2: %d" % (self.seed2)) @@ -801,22 +900,28 @@ class MariadbStartEncryptionEvent(BinLogEvent): additional configuration steps are required in MariaDB. (Link: https://mariadb.com/kb/en/encrypting-binary-logs/) - This event is written just once, after the Format Description event + This event is written just once, after the Format Description event. - Attributes: - schema: The Encryption scheme, always set to 1 for system files. - key_version: The Encryption key version. - nonce: Nonce (12 random bytes) of current binlog file. - """ + In detail, the class creates the following python objects in the constructor: - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + :ivar schema: int - The Encryption scheme, always set to 1 for system files + :ivar key_version: int - The Encryption key version + :ivar nonce: bytes - Nonce (12 random bytes) of current binlog file + """ + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) - self.schema = self.packet.read_uint8() - self.key_version = self.packet.read_uint32() - self.nonce = self.packet.read(12) + self.schema: int = self.packet.read_uint8() + self.key_version: int = self.packet.read_uint32() + self.nonce: bytes = self.packet.read(12) - def _dump(self): + def _dump(self) -> None: + """Dumps the python objects for the event.""" print("Schema: %d" % self.schema) print("Key version: %d" % self.key_version) print(f"Nonce: {self.nonce}") @@ -824,20 +929,28 @@ def _dump(self): class RowsQueryLogEvent(BinLogEvent): """ - Record original query for the row events in Row-Based Replication + This class records original query for the row events in Row-Based Replication. - More details are available in the MySQL Knowledge Base: - https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html + In detail, it creates the following python objects in the constructor: - :ivar query_length: uint - Length of the SQL statement + :ivar query_length: int - Length of the SQL statement :ivar query: str - The executed SQL statement + + More details are available in the MySQL Knowledge Base: + https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) - self.query_length = self.packet.read_uint8() - self.query = self.packet.read(self.query_length).decode('utf-8') - def dump(self): + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.query_length: int = self.packet.read_uint8() + self.query: str = self.packet.read(self.query_length).decode('utf-8') + + def dump(self) -> None: + """Dumps the python objects for the event.""" print("=== %s ===" % (self.__class__.__name__)) print("Query length: %d" % self.query_length) print("Query: %s" % self.query) @@ -845,11 +958,15 @@ def dump(self): class NotImplementedEvent(BinLogEvent): """ - Used as a temporary class for events that have not yet been implemented. + This class is used as a temporary class for events that have not yet been implemented. - The event referencing this class skips parsing. + The event class referencing this class skips parsing. """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): - super().__init__( - from_packet, event_size, table_map, ctl_connection, **kwargs) + def __init__(self, + from_packet: Type[MysqlPacket], + event_size: int, + table_map: Dict[int, Table], + ctl_connection: pymysql.connections.Connection, + **kwargs: str) -> None: + super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) self.packet.advance(event_size) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index fcd138d3..b5ff096e 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -5,6 +5,7 @@ import datetime import json +from typing import List, Dict, Any, Optional from pymysql.charset import charset_by_name from .event import BinLogEvent @@ -15,25 +16,33 @@ from .table import Table from .bitmap import BitCount, BitGet + class RowsEvent(BinLogEvent): - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + def __init__( + self, + from_packet: Any, + event_size: int, + table_map: Dict[int, Any], + ctl_connection: Any, + **kwargs: Any + ) -> None: super().__init__(from_packet, event_size, table_map, - ctl_connection, **kwargs) - self.__rows = None + ctl_connection, **kwargs) + self.__rows: Optional[List[Any]] = None self.__only_tables = kwargs["only_tables"] self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] self.__ignored_schemas = kwargs["ignored_schemas"] - #Header - self.table_id = self._read_table_id() + # Header + self.table_id: int = self._read_table_id() # Additional information try: - self.primary_key = table_map[self.table_id].data["primary_key"] - self.schema = self.table_map[self.table_id].schema - self.table = self.table_map[self.table_id].table - except KeyError: #If we have filter the corresponding TableMap Event + self.primary_key: Any = table_map[self.table_id].data["primary_key"] + self.schema: str = self.table_map[self.table_id].schema + self.table: str = self.table_map[self.table_id].table + except KeyError: # If we have filter the corresponding TableMap Event self._processed = False return @@ -51,32 +60,31 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self._processed = False return - - #Event V2 + # Event V2 if self.event_type == BINLOG.WRITE_ROWS_EVENT_V2 or \ self.event_type == BINLOG.DELETE_ROWS_EVENT_V2 or \ self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2: - self.flags, self.extra_data_length = struct.unpack(' 2: - self.extra_data_type = struct.unpack(' 2: + self.extra_data_type = struct.unpack(' int: bit = null_bitmap[int(position / 8)] if type(bit) is str: bit = ord(bit) return bit & (1 << (position % 8)) - def _read_column_data(self, cols_bitmap): + def _read_column_data(self, + cols_bitmap: int + ) -> Dict[str, Any]: """Use for WRITE, UPDATE and DELETE events. Return an array of column data """ - values = {} + values: Dict[str, Any] = {} # null bitmap length = (bits set in 'columns-present-bitmap'+7)/8 # See http://dev.mysql.com/doc/internals/en/rows-event.html @@ -121,8 +133,16 @@ def _read_column_data(self, cols_bitmap): return values - def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, zerofill, - fixed_binary_length, i): + def __read_values_name(self, + column: Column, + null_bitmap: Any, + null_bitmap_index: int, + cols_bitmap: Any, + unsigned: bool, + zerofill: bool, + fixed_binary_length: int, + i: int + ): if BitGet(cols_bitmap, i) == 0: return None @@ -231,19 +251,33 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap return self.packet.read_binary_json(column.length_size) else: raise NotImplementedError("Unknown MySQL column type: %d" % - (column.type)) + column.type) + + def __add_fsp_to_time(self, + time: datetime, + column: Column + ) -> datetime: + """ + Read and add the fractional part of time - def __add_fsp_to_time(self, time, column): - """Read and add the fractional part of time For more details about new date format: - http://dev.mysql.com/doc/internals/en/date-and-time-data-type-representation.html + https://dev.mysql.com/doc/refman/8.0/en/date-and-time-types.html + + :param time: The datetime object representing the time. + :type time: datetime.datetime + :param column: The MySQL column containing fractional seconds information. + :type column: Any (actual type should be defined) + :return: The datetime object with added fractional seconds. + :rtype: datetime.datetime """ microsecond = self.__read_fsp(column) if microsecond > 0: time = time.replace(microsecond=microsecond) return time - def __read_fsp(self, column): + def __read_fsp(self, + column: Column + ) -> int: read = 0 if column.fsp == 1 or column.fsp == 2: read = 1 @@ -255,15 +289,18 @@ def __read_fsp(self, column): microsecond = self.packet.read_int_be_by_size(read) if column.fsp % 2: microsecond = int(microsecond / 10) - return microsecond * (10 ** (6-column.fsp)) + return microsecond * (10 ** (6 - column.fsp)) return 0 @staticmethod - def charset_to_encoding(name): + def charset_to_encoding(name: str) -> str: charset = charset_by_name(name) return charset.encoding if charset else name - def __read_string(self, size, column): + def __read_string(self, + size: int, + column: Column + ) -> str: string = self.packet.read_length_coded_pascal_string(size) if column.character_set_name is not None: encoding = self.charset_to_encoding(column.character_set_name) @@ -271,7 +308,9 @@ def __read_string(self, size, column): string = string.decode(encoding, decode_errors) return string - def __read_bit(self, column): + def __read_bit(self, + column: Column + ) -> str: """Read MySQL BIT type""" resp = "" for byte in range(0, column.bytes): @@ -294,7 +333,7 @@ def __read_bit(self, column): resp += current_byte[::-1] return resp - def __read_time(self): + def __read_time(self) -> datetime.timedelta: time = self.packet.read_uint24() date = datetime.timedelta( hours=int(time / 10000), @@ -302,7 +341,9 @@ def __read_time(self): seconds=int(time % 100)) return date - def __read_time2(self, column): + def __read_time2(self, + column: Column + ) -> datetime.timedelta: """TIME encoding for nonfractional part: 1 bit sign (1= non-negative, 0= negative) @@ -329,7 +370,7 @@ def __read_time2(self, column): ) * sign return t - def __read_date(self): + def __read_date(self) -> Optional[datetime.date]: time = self.packet.read_uint24() if time == 0: # nasty mysql 0000-00-00 dates return None @@ -347,7 +388,7 @@ def __read_date(self): ) return date - def __read_datetime(self): + def __read_datetime(self) -> Optional[datetime.date]: value = self.packet.read_uint64() if value == 0: # nasty mysql 0000-00-00 dates return None @@ -370,7 +411,9 @@ def __read_datetime(self): second=int(time % 100)) return date - def __read_datetime2(self, column): + def __read_datetime2(self, + column: Column + ) -> Optional[datetime.datetime]: """DATETIME 1 bit sign (1= non-negative, 0= negative) @@ -397,13 +440,21 @@ def __read_datetime2(self, column): return None return self.__add_fsp_to_time(t, column) - def __read_new_decimal(self, column): - """Read MySQL's new decimal format introduced in MySQL 5""" + def __read_new_decimal(self, + column: Column + ) -> decimal.Decimal: + """ + Read MySQL's new decimal format introduced in MySQL 5 - # This project was a great source of inspiration for - # understanding this storage format. - # https://github.com/jeremycole/mysql_binlog + This project was a great source of inspiration for + understanding this storage format. + https://github.com/jeremycole/mysql_binlog + :param column: The MySQL column containing the new decimal value. + :type column: Any (actual type should be defined) + :return: The Python Decimal object representing the new decimal value. + :rtype: decimal.Decimal + """ digits_per_integer = 9 compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4] integral = (column.precision - column.decimals) @@ -414,7 +465,7 @@ def __read_new_decimal(self, column): * digits_per_integer) # Support negative - # The sign is encoded in the high bit of the the byte + # The sign is encoded in the high bit of the byte # But this bit can also be used in the value value = self.packet.read_uint8() if value & 0x80 != 0: @@ -447,13 +498,25 @@ def __read_new_decimal(self, column): return decimal.Decimal(res) - def __read_binary_slice(self, binary, start, size, data_length): + def __read_binary_slice(self, + binary: int, + start: int, + size: int, + data_length: int + ) -> int: """ - Read a part of binary data and extract a number - binary: the data - start: From which bit (1 to X) - size: How many bits should be read - data_length: data size + Read a part of binary data and extract a number. + + :param binary: The binary data. + :type binary: int + :param start: From which bit (1 to X). + :type start: int + :param size: How many bits should be read. + :type size: int + :param data_length: Size of the data. + :type data_length: int + :return: Extracted number from binary data. + :rtype: int """ binary = binary >> data_length - (start + size) mask = ((1 << size) - 1) @@ -482,25 +545,30 @@ def rows(self): class DeleteRowsEvent(RowsEvent): - """This event is trigger when a row in the database is removed + """ + This event is trigger when a row in the database is removed For each row you have a hash with a single key: values which contain the data of the removed line. """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + def __init__(self, + from_packet: Any, + event_size: int, + table_map: Dict[int, Any], + ctl_connection: Any, + **kwargs: Any + ) -> None: super().__init__(from_packet, event_size, - table_map, ctl_connection, **kwargs) + table_map, ctl_connection, **kwargs) if self._processed: self.columns_present_bitmap = self.packet.read( (self.number_of_columns + 7) / 8) - def _fetch_one_row(self): - row = {} - - row["values"] = self._read_column_data(self.columns_present_bitmap) + def _fetch_one_row(self) -> Dict[str, Any]: + row: Dict[str, Any] = {"values": self._read_column_data(self.columns_present_bitmap)} return row - def _dump(self): + def _dump(self) -> None: super()._dump() print("Values:") for row in self.rows: @@ -510,25 +578,30 @@ def _dump(self): class WriteRowsEvent(RowsEvent): - """This event is triggered when a row in database is added + """ + This event is triggered when a row in database is added For each row you have a hash with a single key: values which contain the data of the new line. """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + def __init__(self, + from_packet: Any, + event_size: int, + table_map: Dict[int, Any], + ctl_connection: Any, + **kwargs: Any + ) -> None: super().__init__(from_packet, event_size, - table_map, ctl_connection, **kwargs) + table_map, ctl_connection, **kwargs) if self._processed: self.columns_present_bitmap = self.packet.read( (self.number_of_columns + 7) / 8) - def _fetch_one_row(self): - row = {} - - row["values"] = self._read_column_data(self.columns_present_bitmap) + def _fetch_one_row(self) -> Dict[str, Any]: + row: Dict[str, Any] = {"values": self._read_column_data(self.columns_present_bitmap)} return row - def _dump(self): + def _dump(self) -> None: super()._dump() print("Values:") for row in self.rows: @@ -548,25 +621,28 @@ class UpdateRowsEvent(RowsEvent): http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#sysvar_binlog_row_image """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + def __init__(self, + from_packet: Any, + event_size: int, + table_map: Dict[int, Any], + ctl_connection: Any, + **kwargs: Any + ) -> None: super().__init__(from_packet, event_size, - table_map, ctl_connection, **kwargs) + table_map, ctl_connection, **kwargs) if self._processed: - #Body + # Body self.columns_present_bitmap = self.packet.read( (self.number_of_columns + 7) / 8) self.columns_present_bitmap2 = self.packet.read( (self.number_of_columns + 7) / 8) - def _fetch_one_row(self): - row = {} - - row["before_values"] = self._read_column_data(self.columns_present_bitmap) - - row["after_values"] = self._read_column_data(self.columns_present_bitmap2) + def _fetch_one_row(self) -> Dict[str, Any]: + row: Dict[str, Any] = {"before_values": self._read_column_data(self.columns_present_bitmap), + "after_values": self._read_column_data(self.columns_present_bitmap2)} return row - def _dump(self): + def _dump(self) -> None: super()._dump() print("Affected columns: %d" % self.number_of_columns) print("Values:") @@ -579,14 +655,21 @@ def _dump(self): class TableMapEvent(BinLogEvent): - """This event describes the structure of a table. + """ + This event describes the structure of a table. It's sent before a change happens on a table. An end user of the lib should have no usage of this """ - def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + def __init__(self, + from_packet: Any, + event_size: int, + table_map: Dict[int, Any], + ctl_connection: Any, + **kwargs: Any + ) -> None: super().__init__(from_packet, event_size, - table_map, ctl_connection, **kwargs) + table_map, ctl_connection, **kwargs) self.__only_tables = kwargs["only_tables"] self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] @@ -676,12 +759,12 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) ## Refer to definition of and call to row.event._is_null() to interpret bitmap corresponding to columns self.null_bitmask = self.packet.read((self.column_count + 7) / 8) - def get_table(self): + def get_table(self) -> Table: return self.table_obj - def _dump(self): + def _dump(self) -> None: super()._dump() - print("Table id: %d" % (self.table_id)) - print("Schema: %s" % (self.schema)) - print("Table: %s" % (self.table)) - print("Columns: %s" % (self.column_count)) + print("Table id: %d" % self.table_id) + print("Schema: %s" % self.schema) + print("Table: %s" % self.table) + print("Columns: %s" % self.column_count) diff --git a/pymysqlreplication/tests/binlogfilereader.py b/pymysqlreplication/tests/binlogfilereader.py index 7075039e..3b197a84 100644 --- a/pymysqlreplication/tests/binlogfilereader.py +++ b/pymysqlreplication/tests/binlogfilereader.py @@ -6,22 +6,24 @@ from pymysqlreplication.event import QueryEvent from pymysqlreplication.event import RotateEvent from pymysqlreplication.event import XidEvent +from pymysqlreplication.event import BinLogEvent from pymysqlreplication.row_event import TableMapEvent from pymysqlreplication.row_event import WriteRowsEvent +from typing import Optional, List, Type, Union, Iterator, IO class SimpleBinLogFileReader(object): '''Read binlog files''' - + _expected_magic = b'\xfebin' - def __init__(self, file_path, only_events=None): - self._current_event = None - self._file = None - self._file_path = file_path - self._only_events = only_events - self._pos = None + def __init__(self, file_path: str, only_events: Optional[List[BinLogEvent]] = None) -> None: + self._current_event: Optional[SimpleBinLogEvent] = None + self._file: Optional[IO[bytes]] = None + self._file_path: str = file_path + self._only_events: Optional[List[BinLogEvent]] = only_events + self._pos: Optional[int] = None - def fetchone(self): + def fetchone(self) -> Optional["SimpleBinLogEvent"]: '''Fetch one record from the binlog file''' if self._pos is None or self._pos < 4: self._read_magic() @@ -33,12 +35,12 @@ def fetchone(self): if self._filter_events(event): return event - def truncatebinlog(self): + def truncatebinlog(self) -> None: '''Truncate the binlog file at the current event''' if self._current_event is not None: self._file.truncate(self._current_event.pos) - def _filter_events(self, event): + def _filter_events(self, event: BinLogEvent) -> bool: '''Return True if an event can be returned''' # It would be good if we could reuse the __event_map in # packet.BinLogPacketWrapper. @@ -52,14 +54,14 @@ def _filter_events(self, event): }.get(event.event_type) return event_type in self._only_events - def _open_file(self): + def _open_file(self) -> None: '''Open the file at ``self._file_path``''' if self._file is None: self._file = open(self._file_path, 'rb+') self._pos = self._file.tell() assert self._pos == 0 - def _read_event(self): + def _read_event(self) -> Optional["SimpleBinLogEvent"]: '''Read an event from the binlog file''' # Assuming a binlog version > 1 headerlength = 19 @@ -80,7 +82,7 @@ def _read_event(self): event.set_body(body) return event - def _read_magic(self): + def _read_magic(self) -> None: '''Read the first four *magic* bytes of the binlog file''' self._open_file() if self._pos == 0: @@ -92,10 +94,10 @@ def _read_magic(self): message = messagefmt.format(magic, self._expected_magic) raise BadMagicBytesError(message) - def __iter__(self): + def __iter__(self) -> Iterator[Optional["SimpleBinLogEvent"]]: return iter(self.fetchone, None) - def __repr__(self): + def __repr__(self) -> str: cls = self.__class__ mod = cls.__module__ name = cls.__name__ @@ -108,28 +110,28 @@ def __repr__(self): class SimpleBinLogEvent(object): '''An event from a binlog file''' - def __init__(self, header): + def __init__(self, header: bytes) -> None: '''Initialize the Event with the event header''' unpacked = struct.unpack(' None: '''Save the body bytes''' self.body = body - def set_pos(self, pos): + def set_pos(self, pos: int) -> None: '''Save the event position''' self.pos = pos - def __repr__(self): + def __repr__(self) -> str: cls = self.__class__ mod = cls.__module__ name = cls.__name__ @@ -146,4 +148,4 @@ class BadMagicBytesError(Exception): '''The binlog file magic bytes did not match the specification''' class EventSizeTooSmallError(Exception): - '''The event size was smaller than the length of the event header''' + '''The event size was smaller than the length of the event header''' \ No newline at end of file