Skip to content

Commit 08219f7

Browse files
authored
bug fix json Parser (julien-duponchelle#481)
* bug fix json Parser * make bytes util * error fix json array, json object ruff fix delete unused code * bug fix optional_meta_data_flag missing * test case error * add datetime, time parsing change to object * - fix bug when long json string type - change json array type format - add Json testcases add update Json testcase add test datetime,decimal,time cases
1 parent a4e59d6 commit 08219f7

File tree

7 files changed

+369
-215
lines changed

7 files changed

+369
-215
lines changed

pymysqlreplication/binlogstream.py

+2
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,8 @@ def __check_optional_meta_data(self):
567567
By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
568568
""",
569569
)
570+
else:
571+
self.__optional_meta_data = True
570572

571573
def fetchone(self):
572574
while True:

pymysqlreplication/event.py

+2-54
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from pymysqlreplication.constants.STATUS_VAR_KEY import *
88
from pymysqlreplication.exceptions import StatusVariableMismatch
9+
from pymysqlreplication.util.bytes import parse_decimal_from_bytes
910
from typing import Union, Optional
1011

1112

@@ -781,9 +782,7 @@ def _read_decimal(self, buffer: bytes) -> decimal.Decimal:
781782
self.precision = self.temp_value_buffer[0]
782783
self.decimals = self.temp_value_buffer[1]
783784
raw_decimal = self.temp_value_buffer[2:]
784-
return self._parse_decimal_from_bytes(
785-
raw_decimal, self.precision, self.decimals
786-
)
785+
return parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)
787786

788787
def _read_default(self) -> bytes:
789788
"""
@@ -792,57 +791,6 @@ def _read_default(self) -> bytes:
792791
"""
793792
return self.packet.read(self.value_len)
794793

795-
@staticmethod
796-
def _parse_decimal_from_bytes(
797-
raw_decimal: bytes, precision: int, decimals: int
798-
) -> decimal.Decimal:
799-
"""
800-
Parse decimal from bytes.
801-
"""
802-
digits_per_integer = 9
803-
compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]
804-
integral = precision - decimals
805-
806-
uncomp_integral, comp_integral = divmod(integral, digits_per_integer)
807-
uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer)
808-
809-
res = "-" if not raw_decimal[0] & 0x80 else ""
810-
mask = -1 if res == "-" else 0
811-
raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:]
812-
813-
def decode_decimal_decompress_value(comp_indx, data, mask):
814-
size = compressed_bytes[comp_indx]
815-
if size > 0:
816-
databuff = bytearray(data[:size])
817-
for i in range(size):
818-
databuff[i] = (databuff[i] ^ mask) & 0xFF
819-
return size, int.from_bytes(databuff, byteorder="big")
820-
return 0, 0
821-
822-
pointer, value = decode_decimal_decompress_value(
823-
comp_integral, raw_decimal, mask
824-
)
825-
res += str(value)
826-
827-
for _ in range(uncomp_integral):
828-
value = struct.unpack(">i", raw_decimal[pointer : pointer + 4])[0] ^ mask
829-
res += "%09d" % value
830-
pointer += 4
831-
832-
res += "."
833-
834-
for _ in range(uncomp_fractional):
835-
value = struct.unpack(">i", raw_decimal[pointer : pointer + 4])[0] ^ mask
836-
res += "%09d" % value
837-
pointer += 4
838-
839-
size, value = decode_decimal_decompress_value(
840-
comp_fractional, raw_decimal[pointer:], mask
841-
)
842-
if size > 0:
843-
res += "%0*d" % (comp_fractional, value)
844-
return decimal.Decimal(res)
845-
846794
def _dump(self) -> None:
847795
super(UserVarEvent, self)._dump()
848796
print("User variable name: %s" % self.name)

pymysqlreplication/packet.py

+138-137
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import struct
22

33
from pymysqlreplication import constants, event, row_event
4+
from pymysqlreplication.constants import FIELD_TYPE
5+
from pymysqlreplication.util.bytes import *
46

57
# Constants from PyMYSQL source code
68
NULL_COLUMN = 251
@@ -13,7 +15,6 @@
1315
UNSIGNED_INT24_LENGTH = 3
1416
UNSIGNED_INT64_LENGTH = 8
1517

16-
1718
JSONB_TYPE_SMALL_OBJECT = 0x0
1819
JSONB_TYPE_LARGE_OBJECT = 0x1
1920
JSONB_TYPE_SMALL_ARRAY = 0x2
@@ -33,18 +34,141 @@
3334
JSONB_LITERAL_TRUE = 0x1
3435
JSONB_LITERAL_FALSE = 0x2
3536

37+
JSONB_SMALL_OFFSET_SIZE = 2
38+
JSONB_LARGE_OFFSET_SIZE = 4
39+
JSONB_KEY_ENTRY_SIZE_SMALL = 2 + JSONB_SMALL_OFFSET_SIZE
40+
JSONB_KEY_ENTRY_SIZE_LARGE = 2 + JSONB_LARGE_OFFSET_SIZE
41+
JSONB_VALUE_ENTRY_SIZE_SMALL = 1 + JSONB_SMALL_OFFSET_SIZE
42+
JSONB_VALUE_ENTRY_SIZE_LARGE = 1 + JSONB_LARGE_OFFSET_SIZE
43+
44+
45+
def is_json_inline_value(type: bytes, is_small: bool) -> bool:
46+
if type in [JSONB_TYPE_UINT16, JSONB_TYPE_INT16, JSONB_TYPE_LITERAL]:
47+
return True
48+
elif type in [JSONB_TYPE_INT32, JSONB_TYPE_UINT32]:
49+
return not is_small
50+
return False
51+
52+
53+
def parse_json(type: bytes, data: bytes):
54+
if type == JSONB_TYPE_SMALL_OBJECT:
55+
v = parse_json_object_or_array(data, True, True)
56+
elif type == JSONB_TYPE_LARGE_OBJECT:
57+
v = parse_json_object_or_array(data, False, True)
58+
elif type == JSONB_TYPE_SMALL_ARRAY:
59+
v = parse_json_object_or_array(data, True, False)
60+
elif type == JSONB_TYPE_LARGE_ARRAY:
61+
v = parse_json_object_or_array(data, False, False)
62+
elif type == JSONB_TYPE_LITERAL:
63+
v = parse_literal(data)
64+
elif type == JSONB_TYPE_INT16:
65+
v = parse_int16(data)
66+
elif type == JSONB_TYPE_UINT16:
67+
v = parse_uint16(data)
68+
elif type == JSONB_TYPE_INT32:
69+
v = parse_int32(data)
70+
elif type == JSONB_TYPE_UINT32:
71+
v = parse_uint32(data)
72+
elif type == JSONB_TYPE_INT64:
73+
v = parse_int64(data)
74+
elif type == JSONB_TYPE_UINT64:
75+
v = parse_uint64(data)
76+
elif type == JSONB_TYPE_DOUBLE:
77+
v = parse_double(data)
78+
elif type == JSONB_TYPE_STRING:
79+
length, n = decode_variable_length(data)
80+
v = parse_string(n, length, data)
81+
elif type == JSONB_TYPE_OPAQUE:
82+
v = parse_opaque(data)
83+
else:
84+
raise ValueError("Json type %d is not handled" % t)
85+
return v
86+
87+
88+
def parse_json_object_or_array(bytes, is_small, is_object):
89+
offset_size = JSONB_SMALL_OFFSET_SIZE if is_small else JSONB_LARGE_OFFSET_SIZE
90+
count = decode_count(bytes, is_small)
91+
size = decode_count(bytes[offset_size:], is_small)
92+
if is_small:
93+
key_entry_size = JSONB_KEY_ENTRY_SIZE_SMALL
94+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_SMALL
95+
else:
96+
key_entry_size = JSONB_KEY_ENTRY_SIZE_LARGE
97+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_LARGE
98+
if is_data_short(bytes, size):
99+
raise ValueError(
100+
"Before MySQL 5.7.22, json type generated column may have invalid value"
101+
)
36102

37-
def read_offset_or_inline(packet, large):
38-
t = packet.read_uint8()
39-
40-
if t in (JSONB_TYPE_LITERAL, JSONB_TYPE_INT16, JSONB_TYPE_UINT16):
41-
return (t, None, packet.read_binary_json_type_inlined(t, large))
42-
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32):
43-
return (t, None, packet.read_binary_json_type_inlined(t, large))
44-
45-
if large:
46-
return (t, packet.read_uint32(), None)
47-
return (t, packet.read_uint16(), None)
103+
header_size = 2 * offset_size + count * value_entry_size
104+
105+
if is_object:
106+
header_size += count * key_entry_size
107+
108+
if header_size > size:
109+
raise ValueError("header size > size")
110+
111+
keys = []
112+
if is_object:
113+
keys = []
114+
for i in range(count):
115+
entry_offset = 2 * offset_size + key_entry_size * i
116+
key_offset = decode_count(bytes[entry_offset:], is_small)
117+
key_length = decode_uint(bytes[entry_offset + offset_size :])
118+
keys.append(bytes[key_offset : key_offset + key_length])
119+
120+
values = {}
121+
for i in range(count):
122+
entry_offset = 2 * offset_size + value_entry_size * i
123+
if is_object:
124+
entry_offset += key_entry_size * count
125+
json_type = bytes[entry_offset]
126+
if is_json_inline_value(json_type, is_small):
127+
values[i] = parse_json(
128+
json_type, bytes[entry_offset + 1 : entry_offset + value_entry_size]
129+
)
130+
continue
131+
value_offset = decode_count(bytes[entry_offset + 1 :], is_small)
132+
if is_data_short(bytes, value_offset):
133+
return None
134+
values[i] = parse_json(json_type, bytes[value_offset:])
135+
if not is_object:
136+
return list(values.values())
137+
out = {}
138+
for i in range(count):
139+
out[keys[i]] = values[i]
140+
return out
141+
142+
143+
def parse_literal(data: bytes):
144+
json_type = data[0]
145+
if json_type == JSONB_LITERAL_NULL:
146+
return None
147+
elif json_type == JSONB_LITERAL_TRUE:
148+
return True
149+
elif json_type == JSONB_LITERAL_FALSE:
150+
return False
151+
152+
raise ValueError("NOT LITERAL TYPE")
153+
154+
155+
def parse_opaque(data: bytes):
156+
if is_data_short(data, 1):
157+
return None
158+
type_ = data[0]
159+
data = data[1:]
160+
161+
length, n = decode_variable_length(data)
162+
data = data[n : n + length]
163+
164+
if type_ in [FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.DECIMAL]:
165+
return decode_decimal(data)
166+
elif type_ in [FIELD_TYPE.TIME, FIELD_TYPE.TIME2]:
167+
return decode_time(data)
168+
elif type_ in [FIELD_TYPE.DATE, FIELD_TYPE.DATETIME, FIELD_TYPE.DATETIME2]:
169+
return decode_datetime(data)
170+
else:
171+
return data.decode(errors="ignore")
48172

49173

50174
class BinLogPacketWrapper(object):
@@ -375,131 +499,8 @@ def read_binary_json(self, size):
375499
if length == 0:
376500
# handle NULL value
377501
return None
378-
payload = self.read(length)
379-
self.unread(payload)
380-
t = self.read_uint8()
381-
382-
return self.read_binary_json_type(t, length)
383-
384-
def read_binary_json_type(self, t, length):
385-
large = t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY)
386-
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
387-
return self.read_binary_json_object(length - 1, large)
388-
elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):
389-
return self.read_binary_json_array(length - 1, large)
390-
elif t in (JSONB_TYPE_STRING,):
391-
return self.read_variable_length_string()
392-
elif t in (JSONB_TYPE_LITERAL,):
393-
value = self.read_uint8()
394-
if value == JSONB_LITERAL_NULL:
395-
return None
396-
elif value == JSONB_LITERAL_TRUE:
397-
return True
398-
elif value == JSONB_LITERAL_FALSE:
399-
return False
400-
elif t == JSONB_TYPE_INT16:
401-
return self.read_int16()
402-
elif t == JSONB_TYPE_UINT16:
403-
return self.read_uint16()
404-
elif t in (JSONB_TYPE_DOUBLE,):
405-
return struct.unpack("<d", self.read(8))[0]
406-
elif t == JSONB_TYPE_INT32:
407-
return self.read_int32()
408-
elif t == JSONB_TYPE_UINT32:
409-
return self.read_uint32()
410-
elif t == JSONB_TYPE_INT64:
411-
return self.read_int64()
412-
elif t == JSONB_TYPE_UINT64:
413-
return self.read_uint64()
414-
415-
raise ValueError("Json type %d is not handled" % t)
416-
417-
def read_binary_json_type_inlined(self, t, large):
418-
if t == JSONB_TYPE_LITERAL:
419-
value = self.read_uint32() if large else self.read_uint16()
420-
if value == JSONB_LITERAL_NULL:
421-
return None
422-
elif value == JSONB_LITERAL_TRUE:
423-
return True
424-
elif value == JSONB_LITERAL_FALSE:
425-
return False
426-
elif t == JSONB_TYPE_INT16:
427-
return self.read_int32() if large else self.read_int16()
428-
elif t == JSONB_TYPE_UINT16:
429-
return self.read_uint32() if large else self.read_uint16()
430-
elif t == JSONB_TYPE_INT32:
431-
return self.read_int32()
432-
elif t == JSONB_TYPE_UINT32:
433-
return self.read_uint32()
434-
435-
raise ValueError("Json type %d is not handled" % t)
436-
437-
def read_binary_json_object(self, length, large):
438-
if large:
439-
elements = self.read_uint32()
440-
size = self.read_uint32()
441-
else:
442-
elements = self.read_uint16()
443-
size = self.read_uint16()
444-
445-
if size > length:
446-
raise ValueError("Json length is larger than packet length")
447-
448-
if large:
449-
key_offset_lengths = [
450-
(
451-
self.read_uint32(), # offset (we don't actually need that)
452-
self.read_uint16(), # size of the key
453-
)
454-
for _ in range(elements)
455-
]
456-
else:
457-
key_offset_lengths = [
458-
(
459-
self.read_uint16(), # offset (we don't actually need that)
460-
self.read_uint16(), # size of key
461-
)
462-
for _ in range(elements)
463-
]
464-
465-
value_type_inlined_lengths = [
466-
read_offset_or_inline(self, large) for _ in range(elements)
467-
]
468-
469-
keys = [self.read(x[1]) for x in key_offset_lengths]
470-
471-
out = {}
472-
for i in range(elements):
473-
if value_type_inlined_lengths[i][1] is None:
474-
data = value_type_inlined_lengths[i][2]
475-
else:
476-
t = value_type_inlined_lengths[i][0]
477-
data = self.read_binary_json_type(t, length)
478-
out[keys[i]] = data
479-
480-
return out
481-
482-
def read_binary_json_array(self, length, large):
483-
if large:
484-
elements = self.read_uint32()
485-
size = self.read_uint32()
486-
else:
487-
elements = self.read_uint16()
488-
size = self.read_uint16()
489-
490-
if size > length:
491-
raise ValueError("Json length is larger than packet length")
492-
493-
values_type_offset_inline = [
494-
read_offset_or_inline(self, large) for _ in range(elements)
495-
]
496-
497-
def _read(x):
498-
if x[1] is None:
499-
return x[2]
500-
return self.read_binary_json_type(x[0], length)
501-
502-
return [_read(x) for x in values_type_offset_inline]
502+
data = self.read(length)
503+
return parse_json(data[0], data[1:])
503504

504505
def read_string(self):
505506
"""Read a 'Length Coded String' from the data buffer.

0 commit comments

Comments
 (0)