From 7f26fc493c5d5ef6ff9af9c30e5011a31c8f16ae Mon Sep 17 00:00:00 2001 From: heehehe Date: Wed, 30 Aug 2023 01:19:31 +0900 Subject: [PATCH 1/7] docs: add typing in packet.py --- pymysqlreplication/packet.py | 131 +++++++++++++++++------------------ 1 file changed, 65 insertions(+), 66 deletions(-) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 1d2e408b..b7809b9f 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -3,6 +3,7 @@ import struct from pymysqlreplication import constants, event, row_event +from typing import List, Tuple, Any, Dict, Optional, Union # Constants from PyMYSQL source code NULL_COLUMN = 251 @@ -36,24 +37,24 @@ JSONB_LITERAL_FALSE = 0x2 -def read_offset_or_inline(packet, large): +def read_offset_or_inline(packet, large: bool) -> Tuple[Any, Any, Any]: t = packet.read_uint8() if t in (JSONB_TYPE_LITERAL, JSONB_TYPE_INT16, JSONB_TYPE_UINT16): - return (t, None, packet.read_binary_json_type_inlined(t, large)) + return t, None, packet.read_binary_json_type_inlined(t, large) if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32): - return (t, None, packet.read_binary_json_type_inlined(t, large)) + return t, None, packet.read_binary_json_type_inlined(t, large) if large: - return (t, packet.read_uint32(), None) - return (t, packet.read_uint16(), None) + return t, packet.read_uint32(), None + return t, packet.read_uint16(), None class BinLogPacketWrapper(object): """ - Bin Log Packet Wrapper. It uses an existing packet object, and wraps - around it, exposing useful variables while still providing access + Bin Log Packet Wrapper uses an existing packet object and wraps around it, + exposing useful variables while still providing access to the original packet objects variables and methods. """ @@ -155,7 +156,7 @@ def __init__(self, from_packet, table_map, if self.event._processed == False: self.event = None - def read(self, size): + def read(self, size: int) -> Union[int, bytes]: size = int(size) self.read_bytes += size if len(self.__data_buffer) > 0: @@ -167,14 +168,15 @@ def read(self, size): return data + self.packet.read(size - len(data)) return self.packet.read(size) - def unread(self, data): - '''Push again data in data buffer. It's use when you want - to extract a bit from a value a let the rest of the code normally - read the datas''' + def unread(self, data: str): + """ + Push again data in data buffer. + Use to extract a bit from a value and ensure that the rest of the code reads data normally + """ self.read_bytes -= len(data) self.__data_buffer += data - def advance(self, size): + def advance(self, size: int): size = int(size) self.read_bytes += size buffer_len = len(self.__data_buffer) @@ -185,13 +187,11 @@ def advance(self, size): else: self.packet.advance(size) - def read_length_coded_binary(self): - """Read a 'Length Coded Binary' number from the data buffer. - + def read_length_coded_binary(self) -> Optional[str]: + """ + Read a 'Length Coded Binary' number from the data buffer. Length coded numbers can be anywhere from 1 to 9 bytes depending - on the value of the first byte. - - From PyMYSQL source code + on the value of the first byte. (From PyMYSQL source code) """ c = struct.unpack("!B", self.read(1))[0] if c == NULL_COLUMN: @@ -205,14 +205,12 @@ def read_length_coded_binary(self): elif c == UNSIGNED_INT64_COLUMN: return self.unpack_int64(self.read(UNSIGNED_INT64_LENGTH)) - def read_length_coded_string(self): - """Read a 'Length Coded String' from the data buffer. - - A 'Length Coded String' consists first of a length coded - (unsigned, positive) integer represented in 1-9 bytes followed by - that many bytes of binary data. (For example "cat" would be "3cat".) - - From PyMYSQL source code + def read_length_coded_string(self) -> Optional[str]: + """ + Read a 'Length Coded String' from the data buffer. + A 'Length Coded String' consists first of a length coded (unsigned, positive) integer + represented in 1-9 bytes followed by that many bytes of binary data. + (For example, "cat" would be "3cat". - From PyMYSQL source code) """ length = self.read_length_coded_binary() if length is None: @@ -226,8 +224,10 @@ def __getattr__(self, key): raise AttributeError("%s instance has no attribute '%s'" % (self.__class__, key)) - def read_int_be_by_size(self, size): - '''Read a big endian integer values based on byte number''' + def read_int_be_by_size(self, size: int): + """ + Read a big endian integer values based on byte number + """ if size == 1: return struct.unpack('>b', self.read(size))[0] elif size == 2: @@ -241,8 +241,10 @@ def read_int_be_by_size(self, size): elif size == 8: return struct.unpack('>l', self.read(size))[0] - def read_uint_by_size(self, size): - '''Read a little endian integer values based on byte number''' + def read_uint_by_size(self, size: int) -> str: + """ + Read a little endian integer values based on byte number + """ if size == 1: return self.read_uint8() elif size == 2: @@ -260,19 +262,18 @@ def read_uint_by_size(self, size): elif size == 8: return self.read_uint64() - def read_length_coded_pascal_string(self, size): - """Read a string with length coded using pascal style. + def read_length_coded_pascal_string(self, size: int) -> Union[int, bytes]: + """ + Read a string with length coded using pascal style. The string start by the size of the string """ length = self.read_uint_by_size(size) return self.read(length) - def read_variable_length_string(self): - """Read a variable length string where the first 1-5 bytes stores the - length of the string. - - For each byte, the first bit being high indicates another byte must be - read. + def read_variable_length_string(self) -> Union[int, bytes]: + """ + Read a variable length string where the first 1-5 bytes stores the length of the string. + For each byte, the first bit being high indicates another byte must be read. """ byte = 0x80 length = 0 @@ -283,65 +284,65 @@ def read_variable_length_string(self): bits_read = bits_read + 7 return self.read(length) - def read_int24(self): + def read_int24(self) -> str: a, b, c = struct.unpack("BBB", self.read(3)) res = a | (b << 8) | (c << 16) if res >= 0x800000: res -= 0x1000000 return res - def read_int24_be(self): + def read_int24_be(self) -> str: a, b, c = struct.unpack('BBB', self.read(3)) res = (a << 16) | (b << 8) | c if res >= 0x800000: res -= 0x1000000 return res - def read_uint8(self): + def read_uint8(self) -> str: return struct.unpack(' str: return struct.unpack(' str: return struct.unpack(' str: a, b, c = struct.unpack(" str: return struct.unpack(' str: return struct.unpack(' str: a, b = struct.unpack(" str: a, b = struct.unpack(">IB", self.read(5)) return b + (a << 8) - def read_uint48(self): + def read_uint48(self) -> str: a, b, c = struct.unpack(" str: a, b, c = struct.unpack(" str: return struct.unpack(' str: return struct.unpack(' str: return struct.unpack(' Optional[str, int]: try: return struct.unpack('B', n[0])[0] \ + (struct.unpack('B', n[1])[0] << 8) \ @@ -358,7 +359,7 @@ def unpack_int32(self, n): except TypeError: return n[0] + (n[1] << 8) + (n[2] << 16) + (n[3] << 24) - def read_binary_json(self, size): + def read_binary_json(self, size: int) -> Optional[str]: length = self.read_uint_by_size(size) if length == 0: # handle NULL value @@ -369,7 +370,7 @@ def read_binary_json(self, size): return self.read_binary_json_type(t, length) - def read_binary_json_type(self, t, length): + def read_binary_json_type(self, t: bytes, length: int) -> Optional[bool, str]: large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY)) if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): return self.read_binary_json_object(length - 1, large) @@ -402,7 +403,7 @@ def read_binary_json_type(self, t, length): raise ValueError('Json type %d is not handled' % t) - def read_binary_json_type_inlined(self, t, large): + def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[bool, str]: if t == JSONB_TYPE_LITERAL: value = self.read_uint32() if large else self.read_uint16() if value == JSONB_LITERAL_NULL: @@ -422,7 +423,7 @@ def read_binary_json_type_inlined(self, t, large): raise ValueError('Json type %d is not handled' % t) - def read_binary_json_object(self, length, large): + def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]: if large: elements = self.read_uint32() size = self.read_uint32() @@ -460,7 +461,7 @@ def read_binary_json_object(self, length, large): return out - def read_binary_json_array(self, length, large): + def read_binary_json_array(self, length: int, large: bool) -> List: if large: elements = self.read_uint32() size = self.read_uint32() @@ -482,13 +483,11 @@ def _read(x): return [_read(x) for x in values_type_offset_inline] - def read_string(self): - """Read a 'Length Coded String' from the data buffer. - + def read_string(self) -> bytes: + """ + Read a 'Length Coded String' from the data buffer. Read __data_buffer until NULL character (0 = \0 = \x00) - - Returns: - Binary string parsed from __data_buffer + :return string: Binary string parsed from __data_buffer """ string = b'' while True: From 68632c5796ea4ad6828ba4aabff34ffe1748fbc3 Mon Sep 17 00:00:00 2001 From: heehehe Date: Thu, 31 Aug 2023 21:23:48 +0900 Subject: [PATCH 2/7] fix: modify wrong return types for read_uint --- pymysqlreplication/packet.py | 47 ++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index b7809b9f..bde4ca01 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -95,7 +95,8 @@ class BinLogPacketWrapper(object): constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent } - def __init__(self, from_packet, table_map, + def __init__(self, from_packet, + table_map, ctl_connection, mysql_version, use_checksum, @@ -108,9 +109,9 @@ def __init__(self, from_packet, table_map, fail_on_table_metadata_unavailable, ignore_decode_errors): # -1 because we ignore the ok byte - self.read_bytes = 0 + self.read_bytes: int = 0 # Used when we want to override a value in the data buffer - self.__data_buffer = b'' + self.__data_buffer: bytes = b'' self.packet = from_packet self.charset = ctl_connection.charset @@ -187,7 +188,7 @@ def advance(self, size: int): else: self.packet.advance(size) - def read_length_coded_binary(self) -> Optional[str]: + def read_length_coded_binary(self) -> Optional[int]: """ Read a 'Length Coded Binary' number from the data buffer. Length coded numbers can be anywhere from 1 to 9 bytes depending @@ -241,7 +242,7 @@ def read_int_be_by_size(self, size: int): elif size == 8: return struct.unpack('>l', self.read(size))[0] - def read_uint_by_size(self, size: int) -> str: + def read_uint_by_size(self, size: int) -> int: """ Read a little endian integer values based on byte number """ @@ -284,65 +285,65 @@ def read_variable_length_string(self) -> Union[int, bytes]: bits_read = bits_read + 7 return self.read(length) - def read_int24(self) -> str: + def read_int24(self) -> int: a, b, c = struct.unpack("BBB", self.read(3)) res = a | (b << 8) | (c << 16) if res >= 0x800000: res -= 0x1000000 return res - def read_int24_be(self) -> str: + def read_int24_be(self) -> int: a, b, c = struct.unpack('BBB', self.read(3)) res = (a << 16) | (b << 8) | c if res >= 0x800000: res -= 0x1000000 return res - def read_uint8(self) -> str: + def read_uint8(self) -> int: return struct.unpack(' str: + def read_int16(self) -> int: return struct.unpack(' str: + def read_uint16(self) -> int: return struct.unpack(' str: + def read_uint24(self) -> int: a, b, c = struct.unpack(" str: + def read_uint32(self) -> int: return struct.unpack(' str: + def read_int32(self) -> int: return struct.unpack(' str: + def read_uint40(self) -> int: a, b = struct.unpack(" str: + def read_int40_be(self) -> int: a, b = struct.unpack(">IB", self.read(5)) return b + (a << 8) - def read_uint48(self) -> str: + def read_uint48(self) -> int: a, b, c = struct.unpack(" str: + def read_uint56(self) -> int: a, b, c = struct.unpack(" str: + def read_uint64(self) -> int: return struct.unpack(' str: + def read_int64(self) -> int: return struct.unpack(' str: + def unpack_uint16(self, n: bytes) -> int: return struct.unpack(' Optional[str, int]: + def unpack_int24(self, n: bytes) -> Optional[str, Tuple[str, int]]: try: return struct.unpack('B', n[0])[0] \ + (struct.unpack('B', n[1])[0] << 8) \ @@ -350,7 +351,7 @@ def unpack_int24(self, n: bytes) -> Optional[str, int]: except TypeError: return n[0] + (n[1] << 8) + (n[2] << 16) - def unpack_int32(self, n): + def unpack_int32(self, n: bytes) -> Optional[str, Tuple[str, int]]: try: return struct.unpack('B', n[0])[0] \ + (struct.unpack('B', n[1])[0] << 8) \ @@ -370,7 +371,7 @@ def read_binary_json(self, size: int) -> Optional[str]: return self.read_binary_json_type(t, length) - def read_binary_json_type(self, t: bytes, length: int) -> Optional[bool, str]: + def read_binary_json_type(self, t: int, length: int) -> Optional[bool, str]: large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY)) if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): return self.read_binary_json_object(length - 1, large) From 6d29cc95081b43e23f4101b6bff142bbdd5d9e38 Mon Sep 17 00:00:00 2001 From: heehehe Date: Thu, 31 Aug 2023 21:58:47 +0900 Subject: [PATCH 3/7] fix: modify unpack_int return type --- pymysqlreplication/packet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index bde4ca01..e3a1f54f 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -343,7 +343,7 @@ def read_int64(self) -> int: def unpack_uint16(self, n: bytes) -> int: return struct.unpack(' Optional[str, Tuple[str, int]]: + def unpack_int24(self, n: bytes) -> Optional[Union[int, Tuple[str, int]]]: try: return struct.unpack('B', n[0])[0] \ + (struct.unpack('B', n[1])[0] << 8) \ @@ -351,7 +351,7 @@ def unpack_int24(self, n: bytes) -> Optional[str, Tuple[str, int]]: except TypeError: return n[0] + (n[1] << 8) + (n[2] << 16) - def unpack_int32(self, n: bytes) -> Optional[str, Tuple[str, int]]: + def unpack_int32(self, n: bytes) -> Optional[Union[int, Tuple[str, int]]]: try: return struct.unpack('B', n[0])[0] \ + (struct.unpack('B', n[1])[0] << 8) \ From e54e9a52d2d0a1676fa6afbb828d1a1ae535b568 Mon Sep 17 00:00:00 2001 From: heehehe Date: Thu, 31 Aug 2023 22:06:31 +0900 Subject: [PATCH 4/7] fix: add union for multiple optional types --- pymysqlreplication/packet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index e3a1f54f..d564892b 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -371,7 +371,7 @@ def read_binary_json(self, size: int) -> Optional[str]: return self.read_binary_json_type(t, length) - def read_binary_json_type(self, t: int, length: int) -> Optional[bool, str]: + def read_binary_json_type(self, t: int, length: int) -> Optional[Union[bool, str]]: large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY)) if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): return self.read_binary_json_object(length - 1, large) @@ -404,7 +404,7 @@ def read_binary_json_type(self, t: int, length: int) -> Optional[bool, str]: raise ValueError('Json type %d is not handled' % t) - def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[bool, str]: + def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[Union[bool, str]]: if t == JSONB_TYPE_LITERAL: value = self.read_uint32() if large else self.read_uint16() if value == JSONB_LITERAL_NULL: From 87d93fec8a6fad9d7e4ee38f5bbffcb6bacc99bd Mon Sep 17 00:00:00 2001 From: heehehe Date: Fri, 1 Sep 2023 00:45:49 +0900 Subject: [PATCH 5/7] feat: add read_offset_or_inline packet type --- pymysqlreplication/packet.py | 47 +++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index d564892b..d007a1a2 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -3,7 +3,9 @@ import struct from pymysqlreplication import constants, event, row_event -from typing import List, Tuple, Any, Dict, Optional, Union + +from typing import List, Tuple, Dict, Optional, Union +from pymysql.connections import MysqlPacket # Constants from PyMYSQL source code NULL_COLUMN = 251 @@ -16,7 +18,6 @@ UNSIGNED_INT24_LENGTH = 3 UNSIGNED_INT64_LENGTH = 8 - JSONB_TYPE_SMALL_OBJECT = 0x0 JSONB_TYPE_LARGE_OBJECT = 0x1 JSONB_TYPE_SMALL_ARRAY = 0x2 @@ -37,20 +38,6 @@ JSONB_LITERAL_FALSE = 0x2 -def read_offset_or_inline(packet, large: bool) -> Tuple[Any, Any, Any]: - t = packet.read_uint8() - - if t in (JSONB_TYPE_LITERAL, - JSONB_TYPE_INT16, JSONB_TYPE_UINT16): - return t, None, packet.read_binary_json_type_inlined(t, large) - if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32): - return t, None, packet.read_binary_json_type_inlined(t, large) - - if large: - return t, packet.read_uint32(), None - return t, packet.read_uint16(), None - - class BinLogPacketWrapper(object): """ Bin Log Packet Wrapper uses an existing packet object and wraps around it, @@ -82,7 +69,7 @@ class BinLogPacketWrapper(object): constants.DELETE_ROWS_EVENT_V2: row_event.DeleteRowsEvent, constants.TABLE_MAP_EVENT: row_event.TableMapEvent, - #5.6 GTID enabled replication events + # 5.6 GTID enabled replication events constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent, @@ -95,7 +82,8 @@ class BinLogPacketWrapper(object): constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent } - def __init__(self, from_packet, + def __init__(self, + from_packet, table_map, ctl_connection, mysql_version, @@ -438,13 +426,13 @@ def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]: if large: key_offset_lengths = [( self.read_uint32(), # offset (we don't actually need that) - self.read_uint16() # size of the key - ) for _ in range(elements)] + self.read_uint16() # size of the key + ) for _ in range(elements)] else: key_offset_lengths = [( self.read_uint16(), # offset (we don't actually need that) - self.read_uint16() # size of key - ) for _ in range(elements)] + self.read_uint16() # size of key + ) for _ in range(elements)] value_type_inlined_lengths = [read_offset_or_inline(self, large) for _ in range(elements)] @@ -498,3 +486,18 @@ def read_string(self) -> bytes: string += char return string + + +def read_offset_or_inline(packet: Union[MysqlPacket, BinLogPacketWrapper], large: bool) \ + -> Tuple[int, Optional[int], Optional[Union[bool, str]]]: + t = packet.read_uint8() + + if t in (JSONB_TYPE_LITERAL, + JSONB_TYPE_INT16, JSONB_TYPE_UINT16): + return t, None, packet.read_binary_json_type_inlined(t, large) + if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32): + return t, None, packet.read_binary_json_type_inlined(t, large) + + if large: + return t, packet.read_uint32(), None + return t, packet.read_uint16(), None From 59e15c7bb48dc5a59865cfe89fc48ff13e3f5fda Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 2 Sep 2023 14:17:49 +0900 Subject: [PATCH 6/7] feat: add typing of instance variables --- pymysqlreplication/packet.py | 94 +++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 43 deletions(-) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index d007a1a2..9eff0f32 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -4,8 +4,8 @@ from pymysqlreplication import constants, event, row_event -from typing import List, Tuple, Dict, Optional, Union -from pymysql.connections import MysqlPacket +from typing import List, Tuple, Dict, Optional, Union, FrozenSet +from pymysql.connections import MysqlPacket, Connection # Constants from PyMYSQL source code NULL_COLUMN = 251 @@ -83,26 +83,26 @@ class BinLogPacketWrapper(object): } def __init__(self, - from_packet, - table_map, - ctl_connection, - mysql_version, - use_checksum, - allowed_events, - only_tables, - ignored_tables, - only_schemas, - ignored_schemas, - freeze_schema, - fail_on_table_metadata_unavailable, - ignore_decode_errors): + from_packet: MysqlPacket, + table_map: dict, + ctl_connection: Connection, + mysql_version: Tuple[int, int, int], + use_checksum: bool, + allowed_events: FrozenSet[event.BinLogEvent], + only_tables: Optional[List[str]], + ignored_tables: Optional[List[str]], + only_schemas: Optional[List[str]], + ignored_schemas: Optional[List[str]], + freeze_schema: bool, + fail_on_table_metadata_unavailable: bool, + ignore_decode_errors: bool) -> None: # -1 because we ignore the ok byte - self.read_bytes: int = 0 + self.read_bytes = 0 # Used when we want to override a value in the data buffer - self.__data_buffer: bytes = b'' + self.__data_buffer = b'' - self.packet = from_packet - self.charset = ctl_connection.charset + self.packet: MysqlPacket = from_packet + self.charset: str = ctl_connection.charset # OK value # timestamp @@ -113,13 +113,13 @@ def __init__(self, unpack = struct.unpack(' Union[int, bytes]: return data + self.packet.read(size - len(data)) return self.packet.read(size) - def unread(self, data: str): + def unread(self, data: Union[int, bytes]) -> None: """ Push again data in data buffer. Use to extract a bit from a value and ensure that the rest of the code reads data normally @@ -165,7 +169,7 @@ def unread(self, data: str): self.read_bytes -= len(data) self.__data_buffer += data - def advance(self, size: int): + def advance(self, size: int) -> None: size = int(size) self.read_bytes += size buffer_len = len(self.__data_buffer) @@ -213,7 +217,7 @@ def __getattr__(self, key): raise AttributeError("%s instance has no attribute '%s'" % (self.__class__, key)) - def read_int_be_by_size(self, size: int): + def read_int_be_by_size(self, size: int) -> int: """ Read a big endian integer values based on byte number """ @@ -359,7 +363,10 @@ def read_binary_json(self, size: int) -> Optional[str]: return self.read_binary_json_type(t, length) - def read_binary_json_type(self, t: int, length: int) -> Optional[Union[bool, str]]: + def read_binary_json_type(self, t: int, length: int) \ + -> Optional[Union[ + Dict[Union[int, bytes], Union[bool, str, None]], + List[int], bool, int]]: large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY)) if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): return self.read_binary_json_object(length - 1, large) @@ -392,7 +399,7 @@ def read_binary_json_type(self, t: int, length: int) -> Optional[Union[bool, str raise ValueError('Json type %d is not handled' % t) - def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[Union[bool, str]]: + def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[Union[bool, int]]: if t == JSONB_TYPE_LITERAL: value = self.read_uint32() if large else self.read_uint16() if value == JSONB_LITERAL_NULL: @@ -412,7 +419,8 @@ def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[Union raise ValueError('Json type %d is not handled' % t) - def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]: + def read_binary_json_object(self, length: int, large: bool) \ + -> Dict[Union[int, bytes], Union[bool, str, None]]: if large: elements = self.read_uint32() size = self.read_uint32() @@ -450,7 +458,7 @@ def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]: return out - def read_binary_json_array(self, length: int, large: bool) -> List: + def read_binary_json_array(self, length: int, large: bool) -> List[int]: if large: elements = self.read_uint32() size = self.read_uint32() @@ -465,7 +473,7 @@ def read_binary_json_array(self, length: int, large: bool) -> List: read_offset_or_inline(self, large) for _ in range(elements)] - def _read(x): + def _read(x: Tuple[int, Optional[int], Optional[Union[bool, str]]]) -> int: if x[1] is None: return x[2] return self.read_binary_json_type(x[0], length) From daa12bdd13850a211fc3d816f53ce67d486dab05 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sun, 3 Sep 2023 00:29:59 +0900 Subject: [PATCH 7/7] fix: add Type to allowed_events https://github.com/23-OSSCA-python-mysql-replication/python-mysql-replication/pull/79#discussion_r1313831487 --- pymysqlreplication/packet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 9eff0f32..364efaea 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -4,7 +4,7 @@ from pymysqlreplication import constants, event, row_event -from typing import List, Tuple, Dict, Optional, Union, FrozenSet +from typing import List, Tuple, Dict, Optional, Union, FrozenSet, Type from pymysql.connections import MysqlPacket, Connection # Constants from PyMYSQL source code @@ -88,7 +88,7 @@ def __init__(self, ctl_connection: Connection, mysql_version: Tuple[int, int, int], use_checksum: bool, - allowed_events: FrozenSet[event.BinLogEvent], + allowed_events: FrozenSet[Type[event.BinLogEvent]], only_tables: Optional[List[str]], ignored_tables: Optional[List[str]], only_schemas: Optional[List[str]],