Skip to content

Commit cafcb60

Browse files
author
Arthur Gautier
committed
Fix handling of JSON data
Introduced in 5.7.8 Fixes julien-duponchelle#181 Signed-off-by: Arthur Gautier <[email protected]>
1 parent 990fc08 commit cafcb60

File tree

5 files changed

+100
-0
lines changed

5 files changed

+100
-0
lines changed

pymysqlreplication/column.py

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def __parse_column_definition(self, column_type, column_schema, packet):
4747
self.length_size = packet.read_uint8()
4848
elif self.type == FIELD_TYPE.GEOMETRY:
4949
self.length_size = packet.read_uint8()
50+
elif self.type == FIELD_TYPE.JSON:
51+
self.length_size = packet.read_uint8()
5052
elif self.type == FIELD_TYPE.NEWDECIMAL:
5153
self.precision = packet.read_uint8()
5254
self.decimals = packet.read_uint8()

pymysqlreplication/constants/FIELD_TYPE.py

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
TIMESTAMP2 = 17
4242
DATETIME2 = 18
4343
TIME2 = 19
44+
JSON = 245 # Introduced in 5.7.8
4445
NEWDECIMAL = 246
4546
ENUM = 247
4647
SET = 248

pymysqlreplication/packet.py

+77
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,23 @@
1818
UNSIGNED_INT64_LENGTH = 8
1919

2020

21+
JSONB_TYPE_SMALL_OBJECT = 0x0
22+
JSONB_TYPE_LARGE_OBJECT = 0x1
23+
JSONB_TYPE_SMALL_ARRAY = 0x2
24+
JSONB_TYPE_LARGE_ARRAY = 0x3
25+
JSONB_TYPE_LITERAL = 0x4
26+
JSONB_TYPE_INT16 = 0x5
27+
JSONB_TYPE_UINT16 = 0x6
28+
JSONB_TYPE_INT32 = 0x7
29+
JSONB_TYPE_UINT32 = 0x8
30+
JSONB_TYPE_INT64 = 0x9
31+
JSONB_TYPE_UINT64 = 0xA
32+
JSONB_TYPE_DOUBLE = 0xB
33+
JSONB_TYPE_STRING = 0xC
34+
JSONB_TYPE_OPAQUE = 0xF
35+
36+
37+
2138
class BinLogPacketWrapper(object):
2239
"""
2340
Bin Log Packet Wrapper. It uses an existing packet object, and wraps
@@ -281,3 +298,63 @@ def unpack_int32(self, n):
281298
+ (struct.unpack('B', n[3])[0] << 24)
282299
except TypeError:
283300
return n[0] + (n[1] << 8) + (n[2] << 16) + (n[3] << 24)
301+
302+
def read_binary_json(self, size):
303+
length = self.read_uint_by_size(size)
304+
t = self.read_uint8()
305+
306+
return self.read_binary_json_type(t, length)
307+
308+
def read_binary_json_type(self, t, length):
309+
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
310+
return self.read_binary_json_object(length - 1,
311+
large=(t == JSONB_TYPE_LARGE_OBJECT))
312+
elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):
313+
return self.read_binary_json_array(length - 1,
314+
large=(t == JSONB_TYPE_LARGE_ARRAY))
315+
elif t in (JSONB_TYPE_STRING,):
316+
return self.read_length_coded_pascal_string(1)
317+
318+
def read_binary_json_object(self, length, large):
319+
if large:
320+
elements = self.read_uint32()
321+
size = self.read_uint32()
322+
else:
323+
elements = self.read_uint16()
324+
size = self.read_uint16()
325+
326+
if size > length:
327+
raise ValueError('Json length is larger than packet length')
328+
329+
if large:
330+
key_offset_lengths = [(
331+
self.read_uint32(), # offset (we don't actually need that)
332+
self.read_uint32() # size of the key
333+
) for _ in range(elements)]
334+
335+
value_type_lengths = [(
336+
self.read_uint8(), # type
337+
self.read_uint32() # offset
338+
) for _ in range(elements)]
339+
else:
340+
key_offset_lengths = [(
341+
self.read_uint16(), # offset (we don't actually need that)
342+
self.read_uint16() # size of key
343+
) for _ in range(elements)]
344+
345+
value_type_lengths = [(
346+
self.read_uint8(), # type
347+
self.read_uint16() # offset
348+
) for _ in range(elements)]
349+
350+
keys = [self.read(x[1]) for x in key_offset_lengths]
351+
352+
out = {}
353+
for i in range(elements):
354+
t = value_type_lengths[i][0]
355+
data = self.read_binary_json_type(t, length)
356+
out[keys[i]] = data
357+
358+
return out
359+
360+

pymysqlreplication/row_event.py

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import struct
44
import decimal
55
import datetime
6+
import json
67

78
from pymysql.util import byte2int
89
from pymysql.charset import charset_to_encoding
@@ -167,6 +168,8 @@ def _read_column_data(self, cols_bitmap):
167168
elif column.type == FIELD_TYPE.GEOMETRY:
168169
values[name] = self.packet.read_length_coded_pascal_string(
169170
column.length_size)
171+
elif column.type == FIELD_TYPE.JSON:
172+
values[name] = self.packet.read_binary_json(column.length_size)
170173
else:
171174
raise NotImplementedError("Unknown MySQL column type: %d" %
172175
(column.type))

pymysqlreplication/tests/test_data_type.py

+17
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,23 @@ def test_geometry(self):
413413
event = self.create_and_insert_value(create_query, insert_query)
414414
self.assertEqual(event.rows[0]["values"]["test"], b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?')
415415

416+
def test_json(self):
417+
if not self.isMySQL57():
418+
self.skipTest("Json is only supported in mysql 5.7")
419+
create_query = "CREATE TABLE test (id int, value json);"
420+
insert_query = """INSERT INTO test (id, value) VALUES (1, '{"my_key": "my_val", "my_key2": "my_val2"}');"""
421+
event = self.create_and_insert_value(create_query, insert_query)
422+
self.assertEqual(event.rows[0]["values"]["value"], {b"my_key": b"my_val", b"my_key2": b"my_val2"})
423+
424+
def test_json_unicode(self):
425+
if not self.isMySQL57():
426+
self.skipTest("Json is only supported in mysql 5.7")
427+
create_query = "CREATE TABLE test (id int, value json);"
428+
insert_query = u"""INSERT INTO test (id, value) VALUES (1, '{"miam": "🍔"}');"""
429+
event = self.create_and_insert_value(create_query, insert_query)
430+
print(event.rows[0]["values"])
431+
self.assertEqual(event.rows[0]["values"]["value"][b"miam"], u'🍔'.encode())
432+
416433
def test_null(self):
417434
create_query = "CREATE TABLE test ( \
418435
test TINYINT NULL DEFAULT NULL, \

0 commit comments

Comments
 (0)