Skip to content

Commit e760cb8

Browse files
author
Arthur Gautier
committed
Fix handling of JSON data
Introduced in 5.7.8 Fixes #181 Signed-off-by: Arthur Gautier <[email protected]>
1 parent 990fc08 commit e760cb8

File tree

6 files changed

+246
-0
lines changed

6 files changed

+246
-0
lines changed

pymysqlreplication/_compat.py

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import sys
2+
3+
4+
if sys.version_info > (3,):
5+
text_type = str
6+
else:
7+
text_type = unicode

pymysqlreplication/column.py

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def __parse_column_definition(self, column_type, column_schema, packet):
4747
self.length_size = packet.read_uint8()
4848
elif self.type == FIELD_TYPE.GEOMETRY:
4949
self.length_size = packet.read_uint8()
50+
elif self.type == FIELD_TYPE.JSON:
51+
self.length_size = packet.read_uint8()
5052
elif self.type == FIELD_TYPE.NEWDECIMAL:
5153
self.precision = packet.read_uint8()
5254
self.decimals = packet.read_uint8()

pymysqlreplication/constants/FIELD_TYPE.py

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
TIMESTAMP2 = 17
4242
DATETIME2 = 18
4343
TIME2 = 19
44+
JSON = 245 # Introduced in 5.7.8
4445
NEWDECIMAL = 246
4546
ENUM = 247
4647
SET = 248

pymysqlreplication/packet.py

+157
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,40 @@
1818
UNSIGNED_INT64_LENGTH = 8
1919

2020

21+
JSONB_TYPE_SMALL_OBJECT = 0x0
22+
JSONB_TYPE_LARGE_OBJECT = 0x1
23+
JSONB_TYPE_SMALL_ARRAY = 0x2
24+
JSONB_TYPE_LARGE_ARRAY = 0x3
25+
JSONB_TYPE_LITERAL = 0x4
26+
JSONB_TYPE_INT16 = 0x5
27+
JSONB_TYPE_UINT16 = 0x6
28+
JSONB_TYPE_INT32 = 0x7
29+
JSONB_TYPE_UINT32 = 0x8
30+
JSONB_TYPE_INT64 = 0x9
31+
JSONB_TYPE_UINT64 = 0xA
32+
JSONB_TYPE_DOUBLE = 0xB
33+
JSONB_TYPE_STRING = 0xC
34+
JSONB_TYPE_OPAQUE = 0xF
35+
36+
JSONB_LITERAL_NULL = 0x0
37+
JSONB_LITERAL_TRUE = 0x1
38+
JSONB_LITERAL_FALSE = 0x2
39+
40+
41+
def read_offset_or_inline(packet, large):
42+
t = packet.read_uint8()
43+
44+
if t in (JSONB_TYPE_LITERAL,
45+
JSONB_TYPE_INT16, JSONB_TYPE_UINT16):
46+
return (t, None, packet.read_binary_json_type_inlined(t))
47+
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32):
48+
return (t, None, packet.read_binary_json_type_inlined(t))
49+
50+
if large:
51+
return (t, packet.read_uint32(), None)
52+
return (t, packet.read_uint16(), None)
53+
54+
2155
class BinLogPacketWrapper(object):
2256
"""
2357
Bin Log Packet Wrapper. It uses an existing packet object, and wraps
@@ -230,6 +264,9 @@ def read_int24_be(self):
230264
def read_uint8(self):
231265
return struct.unpack('<B', self.read(1))[0]
232266

267+
def read_int16(self):
268+
return struct.unpack('<h', self.read(2))[0]
269+
233270
def read_uint16(self):
234271
return struct.unpack('<H', self.read(2))[0]
235272

@@ -281,3 +318,123 @@ def unpack_int32(self, n):
281318
+ (struct.unpack('B', n[3])[0] << 24)
282319
except TypeError:
283320
return n[0] + (n[1] << 8) + (n[2] << 16) + (n[3] << 24)
321+
322+
def read_binary_json(self, size):
323+
length = self.read_uint_by_size(size)
324+
t = self.read_uint8()
325+
326+
return self.read_binary_json_type(t, length)
327+
328+
def read_binary_json_type(self, t, length):
329+
large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY))
330+
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
331+
return self.read_binary_json_object(length - 1, large)
332+
elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):
333+
return self.read_binary_json_array(length - 1, large)
334+
elif t in (JSONB_TYPE_STRING,):
335+
return self.read_length_coded_pascal_string(1)
336+
## Those should be inlined hence commented out
337+
# elif t in (JSONB_TYPE_LITERAL,):
338+
# value = self.read_uint16()
339+
# if value == JSONB_LITERAL_NULL:
340+
# return None
341+
# elif value == JSONB_LITERAL_TRUE:
342+
# return True
343+
# elif value == JSONB_LITERAL_FALSE:
344+
# return False
345+
# elif t == JSONB_TYPE_INT16:
346+
# return self.read_int16()
347+
# elif t == JSONB_TYPE_UINT16:
348+
# return self.read_uint16()
349+
elif t in (JSONB_TYPE_DOUBLE,):
350+
return struct.unpack('<d', self.read(8))[0]
351+
elif t == JSONB_TYPE_INT32:
352+
return self.read_int32()
353+
elif t == JSONB_TYPE_UINT32:
354+
return self.read_uint32()
355+
elif t == JSONB_TYPE_INT64:
356+
return self.read_int64()
357+
elif t == JSONB_TYPE_UINT64:
358+
return self.read_uint64()
359+
360+
raise ValueError('Json type %d is not handled' % t)
361+
362+
def read_binary_json_type_inlined(self, t):
363+
if t == JSONB_TYPE_LITERAL:
364+
value = self.read_uint16()
365+
if value == JSONB_LITERAL_NULL:
366+
return None
367+
elif value == JSONB_LITERAL_TRUE:
368+
return True
369+
elif value == JSONB_LITERAL_FALSE:
370+
return False
371+
elif t == JSONB_TYPE_INT16:
372+
return self.read_int16()
373+
elif t == JSONB_TYPE_UINT16:
374+
return self.read_uint16()
375+
elif t == JSONB_TYPE_INT32:
376+
return self.read_int32()
377+
elif t == JSONB_TYPE_UINT32:
378+
return self.read_uint32()
379+
380+
raise ValueError('Json type %d is not handled' % t)
381+
382+
def read_binary_json_object(self, length, large):
383+
if large:
384+
elements = self.read_uint32()
385+
size = self.read_uint32()
386+
else:
387+
elements = self.read_uint16()
388+
size = self.read_uint16()
389+
390+
if size > length:
391+
raise ValueError('Json length is larger than packet length')
392+
393+
if large:
394+
key_offset_lengths = [(
395+
self.read_uint32(), # offset (we don't actually need that)
396+
self.read_uint16() # size of the key
397+
) for _ in range(elements)]
398+
else:
399+
key_offset_lengths = [(
400+
self.read_uint16(), # offset (we don't actually need that)
401+
self.read_uint16() # size of key
402+
) for _ in range(elements)]
403+
404+
value_type_inlined_lengths = [read_offset_or_inline(self, large)
405+
for _ in range(elements)]
406+
407+
keys = [self.read(x[1]) for x in key_offset_lengths]
408+
409+
out = {}
410+
for i in range(elements):
411+
if value_type_inlined_lengths[i][1] is None:
412+
data = value_type_inlined_lengths[i][2]
413+
else:
414+
t = value_type_inlined_lengths[i][0]
415+
data = self.read_binary_json_type(t, length)
416+
out[keys[i]] = data
417+
418+
return out
419+
420+
def read_binary_json_array(self, length, large):
421+
if large:
422+
elements = self.read_uint32()
423+
size = self.read_uint32()
424+
else:
425+
elements = self.read_uint16()
426+
size = self.read_uint16()
427+
428+
if size > length:
429+
raise ValueError('Json length is larger than packet length')
430+
431+
values_type_offset_inline = [
432+
read_offset_or_inline(self, large)
433+
for _ in range(elements)]
434+
435+
def _read(x):
436+
if x[1] is None:
437+
return x[2]
438+
return self.read_binary_json_type(x[0], length)
439+
440+
return [_read(x) for x in values_type_offset_inline]

pymysqlreplication/row_event.py

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import struct
44
import decimal
55
import datetime
6+
import json
67

78
from pymysql.util import byte2int
89
from pymysql.charset import charset_to_encoding
@@ -167,6 +168,8 @@ def _read_column_data(self, cols_bitmap):
167168
elif column.type == FIELD_TYPE.GEOMETRY:
168169
values[name] = self.packet.read_length_coded_pascal_string(
169170
column.length_size)
171+
elif column.type == FIELD_TYPE.JSON:
172+
values[name] = self.packet.read_binary_json(column.length_size)
170173
else:
171174
raise NotImplementedError("Unknown MySQL column type: %d" %
172175
(column.type))

pymysqlreplication/tests/test_data_type.py

+76
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,22 @@
1414
from pymysqlreplication.constants.BINLOG import *
1515
from pymysqlreplication.row_event import *
1616
from pymysqlreplication.event import *
17+
from pymysqlreplication._compat import text_type
18+
1719

1820
__all__ = ["TestDataType"]
1921

2022

23+
def to_binary_dict(d):
24+
def encode_value(v):
25+
if isinstance(v, text_type):
26+
return v.encode()
27+
if isinstance(v, list):
28+
return [encode_value(x) for x in v]
29+
return v
30+
return dict([(k.encode(), encode_value(v)) for (k, v) in d.items()])
31+
32+
2133
class TestDataType(base.PyMySQLReplicationTestCase):
2234
def ignoredEvents(self):
2335
return [GtidEvent]
@@ -413,6 +425,70 @@ def test_geometry(self):
413425
event = self.create_and_insert_value(create_query, insert_query)
414426
self.assertEqual(event.rows[0]["values"]["test"], b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?')
415427

428+
def test_json(self):
429+
if not self.isMySQL57():
430+
self.skipTest("Json is only supported in mysql 5.7")
431+
create_query = "CREATE TABLE test (id int, value json);"
432+
insert_query = """INSERT INTO test (id, value) VALUES (1, '{"my_key": "my_val", "my_key2": "my_val2"}');"""
433+
event = self.create_and_insert_value(create_query, insert_query)
434+
self.assertEqual(event.rows[0]["values"]["value"], {b"my_key": b"my_val", b"my_key2": b"my_val2"})
435+
436+
def test_json_array(self):
437+
if not self.isMySQL57():
438+
self.skipTest("Json is only supported in mysql 5.7")
439+
create_query = "CREATE TABLE test (id int, value json);"
440+
insert_query = """INSERT INTO test (id, value) VALUES (1, '["my_val", "my_val2"]');"""
441+
event = self.create_and_insert_value(create_query, insert_query)
442+
self.assertEqual(event.rows[0]["values"]["value"], [b'my_val', b'my_val2'])
443+
444+
def test_json_large(self):
445+
if not self.isMySQL57():
446+
self.skipTest("Json is only supported in mysql 5.7")
447+
data = dict([('foooo%i'%i, 'baaaaar%i'%i) for i in range(2560)]) # Make it large enough to reach 2^16 length
448+
create_query = "CREATE TABLE test (id int, value json);"
449+
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
450+
event = self.create_and_insert_value(create_query, insert_query)
451+
452+
self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data))
453+
454+
def test_json_types(self):
455+
if not self.isMySQL57():
456+
self.skipTest("Json is only supported in mysql 5.7")
457+
458+
types = [
459+
True,
460+
False,
461+
None,
462+
1.2,
463+
2^14,
464+
2^30,
465+
2^62,
466+
-1 * 2^14,
467+
-1 * 2^30,
468+
-1 * 2^62,
469+
['foo', 'bar']
470+
]
471+
472+
for t in types:
473+
data = {'foo': t}
474+
create_query = "CREATE TABLE test (id int, value json);"
475+
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
476+
event = self.create_and_insert_value(create_query, insert_query)
477+
print('data', data)
478+
self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data))
479+
480+
self.tearDown()
481+
self.setUp()
482+
483+
def test_json_unicode(self):
484+
if not self.isMySQL57():
485+
self.skipTest("Json is only supported in mysql 5.7")
486+
create_query = "CREATE TABLE test (id int, value json);"
487+
insert_query = u"""INSERT INTO test (id, value) VALUES (1, '{"miam": "🍔"}');"""
488+
event = self.create_and_insert_value(create_query, insert_query)
489+
print(event.rows[0]["values"])
490+
self.assertEqual(event.rows[0]["values"]["value"][b"miam"], u'🍔'.encode())
491+
416492
def test_null(self):
417493
create_query = "CREATE TABLE test ( \
418494
test TINYINT NULL DEFAULT NULL, \

0 commit comments

Comments
 (0)