-
Notifications
You must be signed in to change notification settings - Fork 2
typing - packet.py #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
7f26fc4
68632c5
6d29cc9
e54e9a5
87d93fe
59e15c7
daa12bd
4ccf81a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,9 @@ | |
|
||
from pymysqlreplication import constants, event, row_event | ||
|
||
from typing import List, Tuple, Dict, Optional, Union | ||
from pymysql.connections import MysqlPacket | ||
|
||
# Constants from PyMYSQL source code | ||
NULL_COLUMN = 251 | ||
UNSIGNED_CHAR_COLUMN = 251 | ||
|
@@ -15,7 +18,6 @@ | |
UNSIGNED_INT24_LENGTH = 3 | ||
UNSIGNED_INT64_LENGTH = 8 | ||
|
||
|
||
JSONB_TYPE_SMALL_OBJECT = 0x0 | ||
JSONB_TYPE_LARGE_OBJECT = 0x1 | ||
JSONB_TYPE_SMALL_ARRAY = 0x2 | ||
|
@@ -36,24 +38,10 @@ | |
JSONB_LITERAL_FALSE = 0x2 | ||
|
||
|
||
def read_offset_or_inline(packet, large): | ||
t = packet.read_uint8() | ||
|
||
if t in (JSONB_TYPE_LITERAL, | ||
JSONB_TYPE_INT16, JSONB_TYPE_UINT16): | ||
return (t, None, packet.read_binary_json_type_inlined(t, large)) | ||
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32): | ||
return (t, None, packet.read_binary_json_type_inlined(t, large)) | ||
|
||
if large: | ||
return (t, packet.read_uint32(), None) | ||
return (t, packet.read_uint16(), None) | ||
|
||
|
||
class BinLogPacketWrapper(object): | ||
""" | ||
Bin Log Packet Wrapper. It uses an existing packet object, and wraps | ||
around it, exposing useful variables while still providing access | ||
Bin Log Packet Wrapper uses an existing packet object and wraps around it, | ||
exposing useful variables while still providing access | ||
to the original packet objects variables and methods. | ||
""" | ||
|
||
|
@@ -81,7 +69,7 @@ class BinLogPacketWrapper(object): | |
constants.DELETE_ROWS_EVENT_V2: row_event.DeleteRowsEvent, | ||
constants.TABLE_MAP_EVENT: row_event.TableMapEvent, | ||
|
||
#5.6 GTID enabled replication events | ||
# 5.6 GTID enabled replication events | ||
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, | ||
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, | ||
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent, | ||
|
@@ -94,7 +82,9 @@ class BinLogPacketWrapper(object): | |
constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent | ||
} | ||
|
||
def __init__(self, from_packet, table_map, | ||
def __init__(self, | ||
from_packet, | ||
table_map, | ||
ctl_connection, | ||
mysql_version, | ||
use_checksum, | ||
|
@@ -107,9 +97,9 @@ def __init__(self, from_packet, table_map, | |
fail_on_table_metadata_unavailable, | ||
ignore_decode_errors): | ||
# -1 because we ignore the ok byte | ||
self.read_bytes = 0 | ||
self.read_bytes: int = 0 | ||
# Used when we want to override a value in the data buffer | ||
self.__data_buffer = b'' | ||
self.__data_buffer: bytes = b'' | ||
heehehe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
self.packet = from_packet | ||
self.charset = ctl_connection.charset | ||
|
@@ -155,7 +145,7 @@ def __init__(self, from_packet, table_map, | |
if self.event._processed == False: | ||
self.event = None | ||
|
||
def read(self, size): | ||
def read(self, size: int) -> Union[int, bytes]: | ||
size = int(size) | ||
self.read_bytes += size | ||
if len(self.__data_buffer) > 0: | ||
|
@@ -167,14 +157,15 @@ def read(self, size): | |
return data + self.packet.read(size - len(data)) | ||
return self.packet.read(size) | ||
|
||
def unread(self, data): | ||
'''Push again data in data buffer. It's use when you want | ||
to extract a bit from a value a let the rest of the code normally | ||
read the datas''' | ||
def unread(self, data: str): | ||
""" | ||
Push again data in data buffer. | ||
Use to extract a bit from a value and ensure that the rest of the code reads data normally | ||
""" | ||
self.read_bytes -= len(data) | ||
self.__data_buffer += data | ||
|
||
def advance(self, size): | ||
def advance(self, size: int): | ||
size = int(size) | ||
self.read_bytes += size | ||
buffer_len = len(self.__data_buffer) | ||
|
@@ -185,13 +176,11 @@ def advance(self, size): | |
else: | ||
self.packet.advance(size) | ||
|
||
def read_length_coded_binary(self): | ||
"""Read a 'Length Coded Binary' number from the data buffer. | ||
|
||
def read_length_coded_binary(self) -> Optional[int]: | ||
""" | ||
heehehe marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Read a 'Length Coded Binary' number from the data buffer. | ||
Length coded numbers can be anywhere from 1 to 9 bytes depending | ||
on the value of the first byte. | ||
|
||
From PyMYSQL source code | ||
on the value of the first byte. (From PyMYSQL source code) | ||
""" | ||
c = struct.unpack("!B", self.read(1))[0] | ||
if c == NULL_COLUMN: | ||
|
@@ -205,14 +194,12 @@ def read_length_coded_binary(self): | |
elif c == UNSIGNED_INT64_COLUMN: | ||
return self.unpack_int64(self.read(UNSIGNED_INT64_LENGTH)) | ||
|
||
def read_length_coded_string(self): | ||
"""Read a 'Length Coded String' from the data buffer. | ||
|
||
A 'Length Coded String' consists first of a length coded | ||
(unsigned, positive) integer represented in 1-9 bytes followed by | ||
that many bytes of binary data. (For example "cat" would be "3cat".) | ||
|
||
From PyMYSQL source code | ||
def read_length_coded_string(self) -> Optional[str]: | ||
""" | ||
Read a 'Length Coded String' from the data buffer. | ||
A 'Length Coded String' consists first of a length coded (unsigned, positive) integer | ||
represented in 1-9 bytes followed by that many bytes of binary data. | ||
(For example, "cat" would be "3cat". - From PyMYSQL source code) | ||
""" | ||
length = self.read_length_coded_binary() | ||
if length is None: | ||
|
@@ -226,8 +213,10 @@ def __getattr__(self, key): | |
raise AttributeError("%s instance has no attribute '%s'" % | ||
(self.__class__, key)) | ||
|
||
def read_int_be_by_size(self, size): | ||
'''Read a big endian integer values based on byte number''' | ||
def read_int_be_by_size(self, size: int): | ||
""" | ||
Read a big endian integer values based on byte number | ||
""" | ||
if size == 1: | ||
return struct.unpack('>b', self.read(size))[0] | ||
elif size == 2: | ||
|
@@ -241,8 +230,10 @@ def read_int_be_by_size(self, size): | |
elif size == 8: | ||
return struct.unpack('>l', self.read(size))[0] | ||
|
||
def read_uint_by_size(self, size): | ||
'''Read a little endian integer values based on byte number''' | ||
def read_uint_by_size(self, size: int) -> int: | ||
""" | ||
Read a little endian integer values based on byte number | ||
""" | ||
if size == 1: | ||
return self.read_uint8() | ||
elif size == 2: | ||
|
@@ -260,19 +251,18 @@ def read_uint_by_size(self, size): | |
elif size == 8: | ||
return self.read_uint64() | ||
|
||
def read_length_coded_pascal_string(self, size): | ||
"""Read a string with length coded using pascal style. | ||
def read_length_coded_pascal_string(self, size: int) -> Union[int, bytes]: | ||
""" | ||
Read a string with length coded using pascal style. | ||
The string start by the size of the string | ||
""" | ||
length = self.read_uint_by_size(size) | ||
return self.read(length) | ||
Comment on lines
+260
to
266
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 어떤경우에 int가 나오고 어떤경우 byte가나오나요? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 원래는 아래 부분에서 pymysql 코드 살펴보니까 모두 bytes로 반환될 것 같네요..! 자세히 살펴봐주셔서 감사합니다 수정해볼게요!! - 4c1286c 반영 완료 |
||
|
||
def read_variable_length_string(self): | ||
"""Read a variable length string where the first 1-5 bytes stores the | ||
length of the string. | ||
|
||
For each byte, the first bit being high indicates another byte must be | ||
read. | ||
def read_variable_length_string(self) -> Union[int, bytes]: | ||
""" | ||
Read a variable length string where the first 1-5 bytes stores the length of the string. | ||
For each byte, the first bit being high indicates another byte must be read. | ||
""" | ||
byte = 0x80 | ||
length = 0 | ||
|
@@ -283,73 +273,73 @@ def read_variable_length_string(self): | |
bits_read = bits_read + 7 | ||
return self.read(length) | ||
|
||
def read_int24(self): | ||
def read_int24(self) -> int: | ||
a, b, c = struct.unpack("BBB", self.read(3)) | ||
res = a | (b << 8) | (c << 16) | ||
if res >= 0x800000: | ||
res -= 0x1000000 | ||
return res | ||
|
||
def read_int24_be(self): | ||
def read_int24_be(self) -> int: | ||
a, b, c = struct.unpack('BBB', self.read(3)) | ||
res = (a << 16) | (b << 8) | c | ||
if res >= 0x800000: | ||
res -= 0x1000000 | ||
return res | ||
|
||
def read_uint8(self): | ||
def read_uint8(self) -> int: | ||
return struct.unpack('<B', self.read(1))[0] | ||
|
||
def read_int16(self): | ||
def read_int16(self) -> int: | ||
return struct.unpack('<h', self.read(2))[0] | ||
|
||
def read_uint16(self): | ||
def read_uint16(self) -> int: | ||
return struct.unpack('<H', self.read(2))[0] | ||
|
||
def read_uint24(self): | ||
def read_uint24(self) -> int: | ||
a, b, c = struct.unpack("<BBB", self.read(3)) | ||
return a + (b << 8) + (c << 16) | ||
|
||
def read_uint32(self): | ||
def read_uint32(self) -> int: | ||
return struct.unpack('<I', self.read(4))[0] | ||
|
||
def read_int32(self): | ||
def read_int32(self) -> int: | ||
return struct.unpack('<i', self.read(4))[0] | ||
|
||
def read_uint40(self): | ||
def read_uint40(self) -> int: | ||
a, b = struct.unpack("<BI", self.read(5)) | ||
return a + (b << 8) | ||
|
||
def read_int40_be(self): | ||
def read_int40_be(self) -> int: | ||
a, b = struct.unpack(">IB", self.read(5)) | ||
return b + (a << 8) | ||
|
||
def read_uint48(self): | ||
def read_uint48(self) -> int: | ||
a, b, c = struct.unpack("<HHH", self.read(6)) | ||
return a + (b << 16) + (c << 32) | ||
|
||
def read_uint56(self): | ||
def read_uint56(self) -> int: | ||
a, b, c = struct.unpack("<BHI", self.read(7)) | ||
return a + (b << 8) + (c << 24) | ||
|
||
def read_uint64(self): | ||
def read_uint64(self) -> int: | ||
return struct.unpack('<Q', self.read(8))[0] | ||
|
||
def read_int64(self): | ||
def read_int64(self) -> int: | ||
return struct.unpack('<q', self.read(8))[0] | ||
|
||
def unpack_uint16(self, n): | ||
def unpack_uint16(self, n: bytes) -> int: | ||
return struct.unpack('<H', n[0:2])[0] | ||
|
||
def unpack_int24(self, n): | ||
def unpack_int24(self, n: bytes) -> Optional[Union[int, Tuple[str, int]]]: | ||
try: | ||
return struct.unpack('B', n[0])[0] \ | ||
+ (struct.unpack('B', n[1])[0] << 8) \ | ||
+ (struct.unpack('B', n[2])[0] << 16) | ||
except TypeError: | ||
return n[0] + (n[1] << 8) + (n[2] << 16) | ||
|
||
def unpack_int32(self, n): | ||
def unpack_int32(self, n: bytes) -> Optional[Union[int, Tuple[str, int]]]: | ||
try: | ||
return struct.unpack('B', n[0])[0] \ | ||
+ (struct.unpack('B', n[1])[0] << 8) \ | ||
|
@@ -358,7 +348,7 @@ def unpack_int32(self, n): | |
except TypeError: | ||
return n[0] + (n[1] << 8) + (n[2] << 16) + (n[3] << 24) | ||
|
||
def read_binary_json(self, size): | ||
def read_binary_json(self, size: int) -> Optional[str]: | ||
length = self.read_uint_by_size(size) | ||
if length == 0: | ||
# handle NULL value | ||
|
@@ -369,7 +359,7 @@ def read_binary_json(self, size): | |
|
||
return self.read_binary_json_type(t, length) | ||
|
||
def read_binary_json_type(self, t, length): | ||
def read_binary_json_type(self, t: int, length: int) -> Optional[Union[bool, str]]: | ||
large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY)) | ||
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT): | ||
return self.read_binary_json_object(length - 1, large) | ||
|
@@ -402,7 +392,7 @@ def read_binary_json_type(self, t, length): | |
|
||
raise ValueError('Json type %d is not handled' % t) | ||
|
||
def read_binary_json_type_inlined(self, t, large): | ||
def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[Union[bool, str]]: | ||
if t == JSONB_TYPE_LITERAL: | ||
value = self.read_uint32() if large else self.read_uint16() | ||
if value == JSONB_LITERAL_NULL: | ||
|
@@ -422,7 +412,7 @@ def read_binary_json_type_inlined(self, t, large): | |
|
||
raise ValueError('Json type %d is not handled' % t) | ||
|
||
def read_binary_json_object(self, length, large): | ||
def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]: | ||
if large: | ||
elements = self.read_uint32() | ||
size = self.read_uint32() | ||
|
@@ -436,13 +426,13 @@ def read_binary_json_object(self, length, large): | |
if large: | ||
key_offset_lengths = [( | ||
self.read_uint32(), # offset (we don't actually need that) | ||
self.read_uint16() # size of the key | ||
) for _ in range(elements)] | ||
self.read_uint16() # size of the key | ||
) for _ in range(elements)] | ||
else: | ||
key_offset_lengths = [( | ||
self.read_uint16(), # offset (we don't actually need that) | ||
self.read_uint16() # size of key | ||
) for _ in range(elements)] | ||
self.read_uint16() # size of key | ||
) for _ in range(elements)] | ||
|
||
value_type_inlined_lengths = [read_offset_or_inline(self, large) | ||
for _ in range(elements)] | ||
|
@@ -460,7 +450,7 @@ def read_binary_json_object(self, length, large): | |
|
||
return out | ||
|
||
def read_binary_json_array(self, length, large): | ||
def read_binary_json_array(self, length: int, large: bool) -> List: | ||
if large: | ||
elements = self.read_uint32() | ||
size = self.read_uint32() | ||
|
@@ -482,13 +472,11 @@ def _read(x): | |
|
||
return [_read(x) for x in values_type_offset_inline] | ||
|
||
def read_string(self): | ||
"""Read a 'Length Coded String' from the data buffer. | ||
|
||
def read_string(self) -> bytes: | ||
""" | ||
Read a 'Length Coded String' from the data buffer. | ||
Read __data_buffer until NULL character (0 = \0 = \x00) | ||
|
||
Returns: | ||
Binary string parsed from __data_buffer | ||
:return string: Binary string parsed from __data_buffer | ||
""" | ||
string = b'' | ||
while True: | ||
|
@@ -498,3 +486,18 @@ def read_string(self): | |
string += char | ||
|
||
return string | ||
|
||
|
||
def read_offset_or_inline(packet: Union[MysqlPacket, BinLogPacketWrapper], large: bool) \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 일단 packet.py가 쉽지 않아 보이던데 엄청 꼼꼼하게 해주셔서 감탄하면서 봤네요 😲😲 고생하셨어요!! 🥰🥰 혹시 이 함수가 밑으로 옮겨진 이유가 따로 있을까요? 오류는 아니고 궁금해서 여쭤봅니다...! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 감사합니다🥺 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 아하 그렇군요!! 알려주셔서 감사해요🥰🥰🥰 |
||
-> Tuple[int, Optional[int], Optional[Union[bool, str]]]: | ||
t = packet.read_uint8() | ||
|
||
if t in (JSONB_TYPE_LITERAL, | ||
JSONB_TYPE_INT16, JSONB_TYPE_UINT16): | ||
return t, None, packet.read_binary_json_type_inlined(t, large) | ||
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32): | ||
return t, None, packet.read_binary_json_type_inlined(t, large) | ||
|
||
if large: | ||
return t, packet.read_uint32(), None | ||
return t, packet.read_uint16(), None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any라고 적고 끝내실수도 있는데 끈질기게 해법을 고민해주셨다는 게 대단한 것 같아요!
끈기는 개발자의 핵심 역량이죠 👍👍👍
다음번에도 막히는 부분 있다면 머리를 맞대어 해결해봅시다!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
정승님께서 발견해주신 덕분에 잘 바꿀 수 있었습니다ㅎㅎ 넘 감사합니다!!😆