Skip to content

Commit 362b489

Browse files
authored
Merge pull request #521 from sean-k1/feature/partial-update-row-event
Feature/partial update row event
2 parents fbeaead + 3f11f3d commit 362b489

File tree

8 files changed

+521
-169
lines changed

8 files changed

+521
-169
lines changed

pymysqlreplication/binlogstream.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@
3232
from .exceptions import BinLogNotEnabled
3333
from .gtid import GtidSet
3434
from .packet import BinLogPacketWrapper
35-
from .row_event import UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent
35+
from .row_event import (
36+
UpdateRowsEvent,
37+
WriteRowsEvent,
38+
DeleteRowsEvent,
39+
TableMapEvent,
40+
PartialUpdateRowsEvent,
41+
)
3642

3743
try:
3844
from pymysql.constants.COMMAND import COM_BINLOG_DUMP_GTID
@@ -719,6 +725,7 @@ def _allowed_event_list(
719725
MariadbBinLogCheckPointEvent,
720726
UserVarEvent,
721727
PreviousGtidsEvent,
728+
PartialUpdateRowsEvent,
722729
)
723730
)
724731
if ignored_events is not None:

pymysqlreplication/constants/BINLOG.py

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
ANONYMOUS_GTID_LOG_EVENT = 0x22
3636
PREVIOUS_GTIDS_LOG_EVENT = 0x23
3737
XA_PREPARE_EVENT = 0x26
38+
PARTIAL_UPDATE_ROWS_EVENT = 0x27
3839

3940
# INTVAR types
4041
INTVAR_INVALID_INT_EVENT = 0x00

pymysqlreplication/constants/NONE_SOURCE.py

+1
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
OUT_OF_DATETIME2_RANGE = "out of datetime2 range"
55
EMPTY_SET = "empty set"
66
COLS_BITMAP = "cols bitmap"
7+
JSON_PARTIAL_UPDATE = "same with before values"

pymysqlreplication/json_binary.py

+186
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
from pymysqlreplication.util.bytes import *
2+
from pymysqlreplication.constants import FIELD_TYPE
3+
from enum import Enum
4+
5+
JSONB_TYPE_SMALL_OBJECT = 0x0
6+
JSONB_TYPE_LARGE_OBJECT = 0x1
7+
JSONB_TYPE_SMALL_ARRAY = 0x2
8+
JSONB_TYPE_LARGE_ARRAY = 0x3
9+
JSONB_TYPE_LITERAL = 0x4
10+
JSONB_TYPE_INT16 = 0x5
11+
JSONB_TYPE_UINT16 = 0x6
12+
JSONB_TYPE_INT32 = 0x7
13+
JSONB_TYPE_UINT32 = 0x8
14+
JSONB_TYPE_INT64 = 0x9
15+
JSONB_TYPE_UINT64 = 0xA
16+
JSONB_TYPE_DOUBLE = 0xB
17+
JSONB_TYPE_STRING = 0xC
18+
JSONB_TYPE_OPAQUE = 0xF
19+
20+
JSONB_LITERAL_NULL = 0x0
21+
JSONB_LITERAL_TRUE = 0x1
22+
JSONB_LITERAL_FALSE = 0x2
23+
24+
JSONB_SMALL_OFFSET_SIZE = 2
25+
JSONB_LARGE_OFFSET_SIZE = 4
26+
JSONB_KEY_ENTRY_SIZE_SMALL = 2 + JSONB_SMALL_OFFSET_SIZE
27+
JSONB_KEY_ENTRY_SIZE_LARGE = 2 + JSONB_LARGE_OFFSET_SIZE
28+
JSONB_VALUE_ENTRY_SIZE_SMALL = 1 + JSONB_SMALL_OFFSET_SIZE
29+
JSONB_VALUE_ENTRY_SIZE_LARGE = 1 + JSONB_LARGE_OFFSET_SIZE
30+
31+
32+
def is_json_inline_value(type: bytes, is_small: bool) -> bool:
33+
if type in [JSONB_TYPE_UINT16, JSONB_TYPE_INT16, JSONB_TYPE_LITERAL]:
34+
return True
35+
elif type in [JSONB_TYPE_INT32, JSONB_TYPE_UINT32]:
36+
return not is_small
37+
return False
38+
39+
40+
def parse_json(type: bytes, data: bytes):
41+
if type == JSONB_TYPE_SMALL_OBJECT:
42+
v = parse_json_object_or_array(data, True, True)
43+
elif type == JSONB_TYPE_LARGE_OBJECT:
44+
v = parse_json_object_or_array(data, False, True)
45+
elif type == JSONB_TYPE_SMALL_ARRAY:
46+
v = parse_json_object_or_array(data, True, False)
47+
elif type == JSONB_TYPE_LARGE_ARRAY:
48+
v = parse_json_object_or_array(data, False, False)
49+
elif type == JSONB_TYPE_LITERAL:
50+
v = parse_literal(data)
51+
elif type == JSONB_TYPE_INT16:
52+
v = parse_int16(data)
53+
elif type == JSONB_TYPE_UINT16:
54+
v = parse_uint16(data)
55+
elif type == JSONB_TYPE_INT32:
56+
v = parse_int32(data)
57+
elif type == JSONB_TYPE_UINT32:
58+
v = parse_uint32(data)
59+
elif type == JSONB_TYPE_INT64:
60+
v = parse_int64(data)
61+
elif type == JSONB_TYPE_UINT64:
62+
v = parse_uint64(data)
63+
elif type == JSONB_TYPE_DOUBLE:
64+
v = parse_double(data)
65+
elif type == JSONB_TYPE_STRING:
66+
length, n = decode_variable_length(data)
67+
v = parse_string(n, length, data)
68+
elif type == JSONB_TYPE_OPAQUE:
69+
v = parse_opaque(data)
70+
else:
71+
raise ValueError(f"Json type {type} is not handled")
72+
return v
73+
74+
75+
def parse_json_object_or_array(bytes, is_small, is_object):
76+
offset_size = JSONB_SMALL_OFFSET_SIZE if is_small else JSONB_LARGE_OFFSET_SIZE
77+
count = decode_count(bytes, is_small)
78+
size = decode_count(bytes[offset_size:], is_small)
79+
if is_small:
80+
key_entry_size = JSONB_KEY_ENTRY_SIZE_SMALL
81+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_SMALL
82+
else:
83+
key_entry_size = JSONB_KEY_ENTRY_SIZE_LARGE
84+
value_entry_size = JSONB_VALUE_ENTRY_SIZE_LARGE
85+
if is_data_short(bytes, size):
86+
raise ValueError(
87+
"Before MySQL 5.7.22, json type generated column may have invalid value"
88+
)
89+
90+
header_size = 2 * offset_size + count * value_entry_size
91+
92+
if is_object:
93+
header_size += count * key_entry_size
94+
95+
if header_size > size:
96+
raise ValueError("header size > size")
97+
98+
keys = []
99+
if is_object:
100+
keys = []
101+
for i in range(count):
102+
entry_offset = 2 * offset_size + key_entry_size * i
103+
key_offset = decode_count(bytes[entry_offset:], is_small)
104+
key_length = decode_uint(bytes[entry_offset + offset_size :])
105+
keys.append(bytes[key_offset : key_offset + key_length])
106+
107+
values = {}
108+
for i in range(count):
109+
entry_offset = 2 * offset_size + value_entry_size * i
110+
if is_object:
111+
entry_offset += key_entry_size * count
112+
json_type = bytes[entry_offset]
113+
if is_json_inline_value(json_type, is_small):
114+
values[i] = parse_json(
115+
json_type, bytes[entry_offset + 1 : entry_offset + value_entry_size]
116+
)
117+
continue
118+
value_offset = decode_count(bytes[entry_offset + 1 :], is_small)
119+
if is_data_short(bytes, value_offset):
120+
return None
121+
values[i] = parse_json(json_type, bytes[value_offset:])
122+
if not is_object:
123+
return list(values.values())
124+
out = {}
125+
for i in range(count):
126+
out[keys[i]] = values[i]
127+
return out
128+
129+
130+
def parse_literal(data: bytes):
131+
json_type = data[0]
132+
if json_type == JSONB_LITERAL_NULL:
133+
return None
134+
elif json_type == JSONB_LITERAL_TRUE:
135+
return True
136+
elif json_type == JSONB_LITERAL_FALSE:
137+
return False
138+
139+
raise ValueError("NOT LITERAL TYPE")
140+
141+
142+
def parse_opaque(data: bytes):
143+
if is_data_short(data, 1):
144+
return None
145+
type_ = data[0]
146+
data = data[1:]
147+
148+
length, n = decode_variable_length(data)
149+
data = data[n : n + length]
150+
151+
if type_ in [FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.DECIMAL]:
152+
return decode_decimal(data)
153+
elif type_ in [FIELD_TYPE.TIME, FIELD_TYPE.TIME2]:
154+
return decode_time(data)
155+
elif type_ in [FIELD_TYPE.DATE, FIELD_TYPE.DATETIME, FIELD_TYPE.DATETIME2]:
156+
return decode_datetime(data)
157+
else:
158+
return data.decode(errors="ignore")
159+
160+
161+
class JsonDiffOperation(Enum):
162+
# The JSON value in the given path is replaced with a new value.
163+
# It has the same effect as `JSON_REPLACE(col, path, value)`.
164+
Replace = 0
165+
# Add a new element at the given path.
166+
# If the path specifies an array element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`.
167+
# If the path specifies an object member, it has the same effect as `JSON_INSERT(col, path, value)`.
168+
Insert = 1
169+
# The JSON value at the given path is removed from an array or object.
170+
# It has the same effect as `JSON_REMOVE(col, path)`.
171+
Remove = 2
172+
173+
@staticmethod
174+
def by_index(index):
175+
return JsonDiffOperation(index)
176+
177+
178+
class JsonDiff:
179+
# JsonDiffOperation Remove Operation Does not have Value
180+
def __init__(self, op: JsonDiffOperation, path: bytes, value=None):
181+
self.op = op
182+
self.path = path
183+
self.value = value
184+
185+
def __str__(self):
186+
return f"JsonDiff(op :{self.op} path :{self.path.decode()} value :{self.value.decode() if self.value else ''})"

0 commit comments

Comments
 (0)