Skip to content

typing - packet.py #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

140 changes: 70 additions & 70 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import struct

from pymysqlreplication import constants, event, row_event
from typing import List, Tuple, Any, Dict, Optional, Union

# Constants from PyMYSQL source code
NULL_COLUMN = 251
Expand Down Expand Up @@ -36,24 +37,24 @@
JSONB_LITERAL_FALSE = 0x2


def read_offset_or_inline(packet, large):
def read_offset_or_inline(packet, large: bool) -> Tuple[Any, Any, Any]:
t = packet.read_uint8()

if t in (JSONB_TYPE_LITERAL,
JSONB_TYPE_INT16, JSONB_TYPE_UINT16):
return (t, None, packet.read_binary_json_type_inlined(t, large))
return t, None, packet.read_binary_json_type_inlined(t, large)
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32):
return (t, None, packet.read_binary_json_type_inlined(t, large))
return t, None, packet.read_binary_json_type_inlined(t, large)

if large:
return (t, packet.read_uint32(), None)
return (t, packet.read_uint16(), None)
return t, packet.read_uint32(), None
return t, packet.read_uint16(), None


class BinLogPacketWrapper(object):
"""
Bin Log Packet Wrapper. It uses an existing packet object, and wraps
around it, exposing useful variables while still providing access
Bin Log Packet Wrapper uses an existing packet object and wraps around it,
exposing useful variables while still providing access
to the original packet objects variables and methods.
"""

Expand Down Expand Up @@ -94,7 +95,8 @@ class BinLogPacketWrapper(object):
constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent
}

def __init__(self, from_packet, table_map,
def __init__(self, from_packet,
table_map,
ctl_connection,
mysql_version,
use_checksum,
Expand All @@ -107,9 +109,9 @@ def __init__(self, from_packet, table_map,
fail_on_table_metadata_unavailable,
ignore_decode_errors):
# -1 because we ignore the ok byte
self.read_bytes = 0
self.read_bytes: int = 0
# Used when we want to override a value in the data buffer
self.__data_buffer = b''
self.__data_buffer: bytes = b''

self.packet = from_packet
self.charset = ctl_connection.charset
Expand Down Expand Up @@ -155,7 +157,7 @@ def __init__(self, from_packet, table_map,
if self.event._processed == False:
self.event = None

def read(self, size):
def read(self, size: int) -> Union[int, bytes]:
size = int(size)
self.read_bytes += size
if len(self.__data_buffer) > 0:
Expand All @@ -167,14 +169,15 @@ def read(self, size):
return data + self.packet.read(size - len(data))
return self.packet.read(size)

def unread(self, data):
'''Push again data in data buffer. It's use when you want
to extract a bit from a value a let the rest of the code normally
read the datas'''
def unread(self, data: str):
"""
Push again data in data buffer.
Use to extract a bit from a value and ensure that the rest of the code reads data normally
"""
self.read_bytes -= len(data)
self.__data_buffer += data

def advance(self, size):
def advance(self, size: int):
size = int(size)
self.read_bytes += size
buffer_len = len(self.__data_buffer)
Expand All @@ -185,13 +188,11 @@ def advance(self, size):
else:
self.packet.advance(size)

def read_length_coded_binary(self):
"""Read a 'Length Coded Binary' number from the data buffer.

def read_length_coded_binary(self) -> Optional[int]:
"""
Read a 'Length Coded Binary' number from the data buffer.
Length coded numbers can be anywhere from 1 to 9 bytes depending
on the value of the first byte.

From PyMYSQL source code
on the value of the first byte. (From PyMYSQL source code)
"""
c = struct.unpack("!B", self.read(1))[0]
if c == NULL_COLUMN:
Expand All @@ -205,14 +206,12 @@ def read_length_coded_binary(self):
elif c == UNSIGNED_INT64_COLUMN:
return self.unpack_int64(self.read(UNSIGNED_INT64_LENGTH))

def read_length_coded_string(self):
"""Read a 'Length Coded String' from the data buffer.

A 'Length Coded String' consists first of a length coded
(unsigned, positive) integer represented in 1-9 bytes followed by
that many bytes of binary data. (For example "cat" would be "3cat".)

From PyMYSQL source code
def read_length_coded_string(self) -> Optional[str]:
"""
Read a 'Length Coded String' from the data buffer.
A 'Length Coded String' consists first of a length coded (unsigned, positive) integer
represented in 1-9 bytes followed by that many bytes of binary data.
(For example, "cat" would be "3cat". - From PyMYSQL source code)
"""
length = self.read_length_coded_binary()
if length is None:
Expand All @@ -226,8 +225,10 @@ def __getattr__(self, key):
raise AttributeError("%s instance has no attribute '%s'" %
(self.__class__, key))

def read_int_be_by_size(self, size):
'''Read a big endian integer values based on byte number'''
def read_int_be_by_size(self, size: int):
"""
Read a big endian integer values based on byte number
"""
if size == 1:
return struct.unpack('>b', self.read(size))[0]
elif size == 2:
Expand All @@ -241,8 +242,10 @@ def read_int_be_by_size(self, size):
elif size == 8:
return struct.unpack('>l', self.read(size))[0]

def read_uint_by_size(self, size):
'''Read a little endian integer values based on byte number'''
def read_uint_by_size(self, size: int) -> int:
"""
Read a little endian integer values based on byte number
"""
if size == 1:
return self.read_uint8()
elif size == 2:
Expand All @@ -260,19 +263,18 @@ def read_uint_by_size(self, size):
elif size == 8:
return self.read_uint64()

def read_length_coded_pascal_string(self, size):
"""Read a string with length coded using pascal style.
def read_length_coded_pascal_string(self, size: int) -> Union[int, bytes]:
"""
Read a string with length coded using pascal style.
The string start by the size of the string
"""
length = self.read_uint_by_size(size)
return self.read(length)
Comment on lines +260 to 266
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

어떤경우에 int가 나오고 어떤경우 byte가나오나요?

Copy link
Member Author

@heehehe heehehe Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

원래는 아래 부분에서 if len(self.__data_buffer) > 0이면 bytes로 되고 아니면 int로 된다고 생각했는데요..
https://github.com/23-OSSCA-python-mysql-replication/python-mysql-replication/blob/4ccf81ad8024c43bfd9b7b0e8ffbca51e3657e86/pymysqlreplication/packet.py#L154-L164

pymysql 코드 살펴보니까 모두 bytes로 반환될 것 같네요..!
https://github.com/PyMySQL/PyMySQL/blob/8157da51e844f619eb693c5f5dd2758dca1d1c98/pymysql/protocol.py#L62-L76

자세히 살펴봐주셔서 감사합니다 수정해볼게요!! - 4c1286c 반영 완료


def read_variable_length_string(self):
"""Read a variable length string where the first 1-5 bytes stores the
length of the string.

For each byte, the first bit being high indicates another byte must be
read.
def read_variable_length_string(self) -> Union[int, bytes]:
"""
Read a variable length string where the first 1-5 bytes stores the length of the string.
For each byte, the first bit being high indicates another byte must be read.
"""
byte = 0x80
length = 0
Expand All @@ -283,73 +285,73 @@ def read_variable_length_string(self):
bits_read = bits_read + 7
return self.read(length)

def read_int24(self):
def read_int24(self) -> int:
a, b, c = struct.unpack("BBB", self.read(3))
res = a | (b << 8) | (c << 16)
if res >= 0x800000:
res -= 0x1000000
return res

def read_int24_be(self):
def read_int24_be(self) -> int:
a, b, c = struct.unpack('BBB', self.read(3))
res = (a << 16) | (b << 8) | c
if res >= 0x800000:
res -= 0x1000000
return res

def read_uint8(self):
def read_uint8(self) -> int:
return struct.unpack('<B', self.read(1))[0]

def read_int16(self):
def read_int16(self) -> int:
return struct.unpack('<h', self.read(2))[0]

def read_uint16(self):
def read_uint16(self) -> int:
return struct.unpack('<H', self.read(2))[0]

def read_uint24(self):
def read_uint24(self) -> int:
a, b, c = struct.unpack("<BBB", self.read(3))
return a + (b << 8) + (c << 16)

def read_uint32(self):
def read_uint32(self) -> int:
return struct.unpack('<I', self.read(4))[0]

def read_int32(self):
def read_int32(self) -> int:
return struct.unpack('<i', self.read(4))[0]

def read_uint40(self):
def read_uint40(self) -> int:
a, b = struct.unpack("<BI", self.read(5))
return a + (b << 8)

def read_int40_be(self):
def read_int40_be(self) -> int:
a, b = struct.unpack(">IB", self.read(5))
return b + (a << 8)

def read_uint48(self):
def read_uint48(self) -> int:
a, b, c = struct.unpack("<HHH", self.read(6))
return a + (b << 16) + (c << 32)

def read_uint56(self):
def read_uint56(self) -> int:
a, b, c = struct.unpack("<BHI", self.read(7))
return a + (b << 8) + (c << 24)

def read_uint64(self):
def read_uint64(self) -> int:
return struct.unpack('<Q', self.read(8))[0]

def read_int64(self):
def read_int64(self) -> int:
return struct.unpack('<q', self.read(8))[0]

def unpack_uint16(self, n):
def unpack_uint16(self, n: bytes) -> int:
return struct.unpack('<H', n[0:2])[0]

def unpack_int24(self, n):
def unpack_int24(self, n: bytes) -> Optional[Union[int, Tuple[str, int]]]:
try:
return struct.unpack('B', n[0])[0] \
+ (struct.unpack('B', n[1])[0] << 8) \
+ (struct.unpack('B', n[2])[0] << 16)
except TypeError:
return n[0] + (n[1] << 8) + (n[2] << 16)

def unpack_int32(self, n):
def unpack_int32(self, n: bytes) -> Optional[Union[int, Tuple[str, int]]]:
try:
return struct.unpack('B', n[0])[0] \
+ (struct.unpack('B', n[1])[0] << 8) \
Expand All @@ -358,7 +360,7 @@ def unpack_int32(self, n):
except TypeError:
return n[0] + (n[1] << 8) + (n[2] << 16) + (n[3] << 24)

def read_binary_json(self, size):
def read_binary_json(self, size: int) -> Optional[str]:
length = self.read_uint_by_size(size)
if length == 0:
# handle NULL value
Expand All @@ -369,7 +371,7 @@ def read_binary_json(self, size):

return self.read_binary_json_type(t, length)

def read_binary_json_type(self, t, length):
def read_binary_json_type(self, t: int, length: int) -> Optional[Union[bool, str]]:
large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY))
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
return self.read_binary_json_object(length - 1, large)
Expand Down Expand Up @@ -402,7 +404,7 @@ def read_binary_json_type(self, t, length):

raise ValueError('Json type %d is not handled' % t)

def read_binary_json_type_inlined(self, t, large):
def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[Union[bool, str]]:
if t == JSONB_TYPE_LITERAL:
value = self.read_uint32() if large else self.read_uint16()
if value == JSONB_LITERAL_NULL:
Expand All @@ -422,7 +424,7 @@ def read_binary_json_type_inlined(self, t, large):

raise ValueError('Json type %d is not handled' % t)

def read_binary_json_object(self, length, large):
def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]:
if large:
elements = self.read_uint32()
size = self.read_uint32()
Expand Down Expand Up @@ -460,7 +462,7 @@ def read_binary_json_object(self, length, large):

return out

def read_binary_json_array(self, length, large):
def read_binary_json_array(self, length: int, large: bool) -> List:
if large:
elements = self.read_uint32()
size = self.read_uint32()
Expand All @@ -482,13 +484,11 @@ def _read(x):

return [_read(x) for x in values_type_offset_inline]

def read_string(self):
"""Read a 'Length Coded String' from the data buffer.

def read_string(self) -> bytes:
"""
Read a 'Length Coded String' from the data buffer.
Read __data_buffer until NULL character (0 = \0 = \x00)

Returns:
Binary string parsed from __data_buffer
:return string: Binary string parsed from __data_buffer
"""
string = b''
while True:
Expand Down