Skip to content

Commit 1f4a5d2

Browse files
authored
Merge pull request #494 from julien-duponchelle/mysql-5.7
v0.45 bug fix json object and array
2 parents ab0e20e + f9ec362 commit 1f4a5d2

File tree

6 files changed

+366
-200
lines changed

6 files changed

+366
-200
lines changed

pymysqlreplication/event.py

+2-46
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from pymysqlreplication.constants.STATUS_VAR_KEY import *
1010
from pymysqlreplication.exceptions import StatusVariableMismatch
11+
from pymysqlreplication.util.bytes import parse_decimal_from_bytes
1112
from typing import Union, Optional
1213

1314

@@ -729,7 +730,7 @@ def _read_decimal(self, buffer: bytes) -> decimal.Decimal:
729730
self.precision = self.temp_value_buffer[0]
730731
self.decimals = self.temp_value_buffer[1]
731732
raw_decimal = self.temp_value_buffer[2:]
732-
return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)
733+
return parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)
733734

734735
def _read_default(self) -> bytes:
735736
"""
@@ -738,51 +739,6 @@ def _read_default(self) -> bytes:
738739
"""
739740
return self.packet.read(self.value_len)
740741

741-
@staticmethod
742-
def _parse_decimal_from_bytes(raw_decimal: bytes, precision: int, decimals: int) -> decimal.Decimal:
743-
"""
744-
Parse decimal from bytes.
745-
"""
746-
digits_per_integer = 9
747-
compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]
748-
integral = precision - decimals
749-
750-
uncomp_integral, comp_integral = divmod(integral, digits_per_integer)
751-
uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer)
752-
753-
res = "-" if not raw_decimal[0] & 0x80 else ""
754-
mask = -1 if res == "-" else 0
755-
raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:]
756-
757-
def decode_decimal_decompress_value(comp_indx, data, mask):
758-
size = compressed_bytes[comp_indx]
759-
if size > 0:
760-
databuff = bytearray(data[:size])
761-
for i in range(size):
762-
databuff[i] = (databuff[i] ^ mask) & 0xFF
763-
return size, int.from_bytes(databuff, byteorder='big')
764-
return 0, 0
765-
766-
pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask)
767-
res += str(value)
768-
769-
for _ in range(uncomp_integral):
770-
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
771-
res += '%09d' % value
772-
pointer += 4
773-
774-
res += "."
775-
776-
for _ in range(uncomp_fractional):
777-
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
778-
res += '%09d' % value
779-
pointer += 4
780-
781-
size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask)
782-
if size > 0:
783-
res += '%0*d' % (comp_fractional, value)
784-
return decimal.Decimal(res)
785-
786742
def _dump(self) -> None:
787743
super(UserVarEvent, self)._dump()
788744
print("User variable name: %s" % self.name)

pymysqlreplication/packet.py

+139-132
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import struct
44

55
from pymysqlreplication import constants, event, row_event
6+
from pymysqlreplication.constants import FIELD_TYPE
7+
from pymysqlreplication.util.bytes import *
68

79
# Constants from PyMYSQL source code
810
NULL_COLUMN = 251
@@ -15,7 +17,6 @@
1517
UNSIGNED_INT24_LENGTH = 3
1618
UNSIGNED_INT64_LENGTH = 8
1719

18-
1920
JSONB_TYPE_SMALL_OBJECT = 0x0
2021
JSONB_TYPE_LARGE_OBJECT = 0x1
2122
JSONB_TYPE_SMALL_ARRAY = 0x2
@@ -35,19 +36,141 @@
3536
JSONB_LITERAL_TRUE = 0x1
3637
JSONB_LITERAL_FALSE = 0x2
3738

38-
39-
def read_offset_or_inline(packet, large):
40-
t = packet.read_uint8()
41-
42-
if t in (JSONB_TYPE_LITERAL,
43-
JSONB_TYPE_INT16, JSONB_TYPE_UINT16):
44-
return (t, None, packet.read_binary_json_type_inlined(t, large))
45-
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32):
46-
return (t, None, packet.read_binary_json_type_inlined(t, large))
47-
48-
if large:
49-
return (t, packet.read_uint32(), None)
50-
return (t, packet.read_uint16(), None)
39+
JSONB_SMALL_OFFSET_SIZE = 2
40+
JSONB_LARGE_OFFSET_SIZE = 4
41+
JSONB_KEY_ENTRY_SIZE_SMALL = 2 + JSONB_SMALL_OFFSET_SIZE
42+
JSONB_KEY_ENTRY_SIZE_LARGE = 2 + JSONB_LARGE_OFFSET_SIZE
43+
JSONB_VALUE_ENTRY_SIZE_SMALL = 1 + JSONB_SMALL_OFFSET_SIZE
44+
JSONB_VALUE_ENTRY_SIZE_LARGE = 1 + JSONB_LARGE_OFFSET_SIZE
45+
46+
47+
def is_json_inline_value(type: bytes, is_small: bool) -> bool:
48+
if type in [JSONB_TYPE_UINT16, JSONB_TYPE_INT16, JSONB_TYPE_LITERAL]:
49+
return True
50+
elif type in [JSONB_TYPE_INT32, JSONB_TYPE_UINT32]:
51+
return not is_small
52+
return False
53+
54+
55+
def parse_json(type: bytes, data: bytes):
56+
if type == JSONB_TYPE_SMALL_OBJECT:
57+
v = parse_json_object_or_array(data, True, True)
58+
elif type == JSONB_TYPE_LARGE_OBJECT:
59+
v = parse_json_object_or_array(data, False, True)
60+
elif type == JSONB_TYPE_SMALL_ARRAY:
61+
v = parse_json_object_or_array(data, True, False)
62+
elif type == JSONB_TYPE_LARGE_ARRAY:
63+
v = parse_json_object_or_array(data, False, False)
64+
elif type == JSONB_TYPE_LITERAL:
65+
v = parse_literal(data)
66+
elif type == JSONB_TYPE_INT16:
67+
v = parse_int16(data)
68+
elif type == JSONB_TYPE_UINT16:
69+
v = parse_uint16(data)
70+
elif type == JSONB_TYPE_INT32:
71+
v = parse_int32(data)
72+
elif type == JSONB_TYPE_UINT32:
73+
v = parse_uint32(data)
74+
elif type == JSONB_TYPE_INT64:
75+
v = parse_int64(data)
76+
elif type == JSONB_TYPE_UINT64:
77+
v = parse_uint64(data)
78+
elif type == JSONB_TYPE_DOUBLE:
79+
v = parse_double(data)
80+
elif type == JSONB_TYPE_STRING:
81+
length, n = decode_variable_length(data)
82+
v = parse_string(n, length, data)
83+
elif type == JSONB_TYPE_OPAQUE:
84+
v = parse_opaque(data)
85+
else:
86+
raise ValueError("Json type %d is not handled" % t)
87+
return v
88+
89+
90+
def parse_json_object_or_array(bytes, is_small, is_object):
91+
offset_size = JSONB_SMALL_OFFSET_SIZE if is_small else JSONB_LARGE_OFFSET_SIZE
92+
count = decode_count(bytes, is_small)
93+
size = decode_count(bytes[offset_size:], is_small)
94+
if is_small:
95+
key_entry_size = JSONB_KEY_ENTRY_SIZE_SMALL
96+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_SMALL
97+
else:
98+
key_entry_size = JSONB_KEY_ENTRY_SIZE_LARGE
99+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_LARGE
100+
if is_data_short(bytes, size):
101+
raise ValueError(
102+
"Before MySQL 5.7.22, json type generated column may have invalid value"
103+
)
104+
105+
header_size = 2 * offset_size + count * value_entry_size
106+
107+
if is_object:
108+
header_size += count * key_entry_size
109+
110+
if header_size > size:
111+
raise ValueError("header size > size")
112+
113+
keys = []
114+
if is_object:
115+
keys = []
116+
for i in range(count):
117+
entry_offset = 2 * offset_size + key_entry_size * i
118+
key_offset = decode_count(bytes[entry_offset:], is_small)
119+
key_length = decode_uint(bytes[entry_offset + offset_size :])
120+
keys.append(bytes[key_offset : key_offset + key_length])
121+
122+
values = {}
123+
for i in range(count):
124+
entry_offset = 2 * offset_size + value_entry_size * i
125+
if is_object:
126+
entry_offset += key_entry_size * count
127+
json_type = bytes[entry_offset]
128+
if is_json_inline_value(json_type, is_small):
129+
values[i] = parse_json(
130+
json_type, bytes[entry_offset + 1 : entry_offset + value_entry_size]
131+
)
132+
continue
133+
value_offset = decode_count(bytes[entry_offset + 1 :], is_small)
134+
if is_data_short(bytes, value_offset):
135+
return None
136+
values[i] = parse_json(json_type, bytes[value_offset:])
137+
if not is_object:
138+
return list(values.values())
139+
out = {}
140+
for i in range(count):
141+
out[keys[i]] = values[i]
142+
return out
143+
144+
145+
def parse_literal(data: bytes):
146+
json_type = data[0]
147+
if json_type == JSONB_LITERAL_NULL:
148+
return None
149+
elif json_type == JSONB_LITERAL_TRUE:
150+
return True
151+
elif json_type == JSONB_LITERAL_FALSE:
152+
return False
153+
154+
raise ValueError("NOT LITERAL TYPE")
155+
156+
157+
def parse_opaque(data: bytes):
158+
if is_data_short(data, 1):
159+
return None
160+
type_ = data[0]
161+
data = data[1:]
162+
163+
length, n = decode_variable_length(data)
164+
data = data[n : n + length]
165+
166+
if type_ in [FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.DECIMAL]:
167+
return decode_decimal(data)
168+
elif type_ in [FIELD_TYPE.TIME, FIELD_TYPE.TIME2]:
169+
return decode_time(data)
170+
elif type_ in [FIELD_TYPE.DATE, FIELD_TYPE.DATETIME, FIELD_TYPE.DATETIME2]:
171+
return decode_datetime(data)
172+
else:
173+
return data.decode(errors="ignore")
51174

52175

53176
class BinLogPacketWrapper(object):
@@ -365,124 +488,8 @@ def read_binary_json(self, size):
365488
if length == 0:
366489
# handle NULL value
367490
return None
368-
payload = self.read(length)
369-
self.unread(payload)
370-
t = self.read_uint8()
371-
372-
return self.read_binary_json_type(t, length)
373-
374-
def read_binary_json_type(self, t, length):
375-
large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY))
376-
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
377-
return self.read_binary_json_object(length - 1, large)
378-
elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):
379-
return self.read_binary_json_array(length - 1, large)
380-
elif t in (JSONB_TYPE_STRING,):
381-
return self.read_variable_length_string()
382-
elif t in (JSONB_TYPE_LITERAL,):
383-
value = self.read_uint8()
384-
if value == JSONB_LITERAL_NULL:
385-
return None
386-
elif value == JSONB_LITERAL_TRUE:
387-
return True
388-
elif value == JSONB_LITERAL_FALSE:
389-
return False
390-
elif t == JSONB_TYPE_INT16:
391-
return self.read_int16()
392-
elif t == JSONB_TYPE_UINT16:
393-
return self.read_uint16()
394-
elif t in (JSONB_TYPE_DOUBLE,):
395-
return struct.unpack('<d', self.read(8))[0]
396-
elif t == JSONB_TYPE_INT32:
397-
return self.read_int32()
398-
elif t == JSONB_TYPE_UINT32:
399-
return self.read_uint32()
400-
elif t == JSONB_TYPE_INT64:
401-
return self.read_int64()
402-
elif t == JSONB_TYPE_UINT64:
403-
return self.read_uint64()
404-
405-
raise ValueError('Json type %d is not handled' % t)
406-
407-
def read_binary_json_type_inlined(self, t, large):
408-
if t == JSONB_TYPE_LITERAL:
409-
value = self.read_uint32() if large else self.read_uint16()
410-
if value == JSONB_LITERAL_NULL:
411-
return None
412-
elif value == JSONB_LITERAL_TRUE:
413-
return True
414-
elif value == JSONB_LITERAL_FALSE:
415-
return False
416-
elif t == JSONB_TYPE_INT16:
417-
return self.read_int32() if large else self.read_int16()
418-
elif t == JSONB_TYPE_UINT16:
419-
return self.read_uint32() if large else self.read_uint16()
420-
elif t == JSONB_TYPE_INT32:
421-
return self.read_int32()
422-
elif t == JSONB_TYPE_UINT32:
423-
return self.read_uint32()
424-
425-
raise ValueError('Json type %d is not handled' % t)
426-
427-
def read_binary_json_object(self, length, large):
428-
if large:
429-
elements = self.read_uint32()
430-
size = self.read_uint32()
431-
else:
432-
elements = self.read_uint16()
433-
size = self.read_uint16()
434-
435-
if size > length:
436-
raise ValueError('Json length is larger than packet length')
437-
438-
if large:
439-
key_offset_lengths = [(
440-
self.read_uint32(), # offset (we don't actually need that)
441-
self.read_uint16() # size of the key
442-
) for _ in range(elements)]
443-
else:
444-
key_offset_lengths = [(
445-
self.read_uint16(), # offset (we don't actually need that)
446-
self.read_uint16() # size of key
447-
) for _ in range(elements)]
448-
449-
value_type_inlined_lengths = [read_offset_or_inline(self, large)
450-
for _ in range(elements)]
451-
452-
keys = [self.read(x[1]) for x in key_offset_lengths]
453-
454-
out = {}
455-
for i in range(elements):
456-
if value_type_inlined_lengths[i][1] is None:
457-
data = value_type_inlined_lengths[i][2]
458-
else:
459-
t = value_type_inlined_lengths[i][0]
460-
data = self.read_binary_json_type(t, length)
461-
out[keys[i]] = data
462-
463-
return out
464-
465-
def read_binary_json_array(self, length, large):
466-
if large:
467-
elements = self.read_uint32()
468-
size = self.read_uint32()
469-
else:
470-
elements = self.read_uint16()
471-
size = self.read_uint16()
472-
473-
if size > length:
474-
raise ValueError('Json length is larger than packet length')
475-
476-
values_type_offset_inline = [
477-
read_offset_or_inline(self, large)
478-
for _ in range(elements)]
479-
480-
def _read(x):
481-
if x[1] is None:
482-
return x[2]
483-
return self.read_binary_json_type(x[0], length)
484-
485-
return [_read(x) for x in values_type_offset_inline]
491+
data = self.read(length)
492+
return parse_json(data[0], data[1:])
486493

487494
def read_string(self):
488495
"""Read a 'Length Coded String' from the data buffer.

pymysqlreplication/tests/test_basic.py

+25
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,31 @@ def create_binlog_packet_wrapper(pkt):
574574
self.assertEqual(binlog_event.event._is_event_valid, True)
575575
self.assertNotEqual(wrong_event.event._is_event_valid, True)
576576

577+
def test_json_update(self):
578+
self.stream.close()
579+
self.stream = BinLogStreamReader(
580+
self.database, server_id=1024, only_events=[UpdateRowsEvent]
581+
)
582+
create_query = (
583+
"CREATE TABLE setting_table( id SERIAL AUTO_INCREMENT, setting JSON);"
584+
)
585+
insert_query = """INSERT INTO setting_table (setting) VALUES ('{"btn": true, "model": false}');"""
586+
587+
update_query = """ UPDATE setting_table
588+
SET setting = JSON_REMOVE(setting, '$.model')
589+
WHERE id=1;
590+
"""
591+
self.execute(create_query)
592+
self.execute(insert_query)
593+
self.execute(update_query)
594+
self.execute("COMMIT;")
595+
event = self.stream.fetchone()
596+
self.assertEqual(
597+
event.rows[0]["before_values"]["setting"],
598+
{b"btn": True, b"model": False},
599+
),
600+
self.assertEqual(event.rows[0]["after_values"]["setting"], {b"btn": True}),
601+
577602

578603
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
579604
def ignoredEvents(self):

0 commit comments

Comments
 (0)