Skip to content

Commit 0b011e0

Browse files
author
Arthur Gautier
committed
Fix handling of JSON data
Introduced in 5.7.8 Fixes #181 Signed-off-by: Arthur Gautier <[email protected]>
1 parent 990fc08 commit 0b011e0

File tree

6 files changed

+273
-0
lines changed

6 files changed

+273
-0
lines changed

pymysqlreplication/_compat.py

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import sys
2+
3+
4+
if sys.version_info > (3,):
5+
text_type = str
6+
else:
7+
text_type = unicode

pymysqlreplication/column.py

+2
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def __parse_column_definition(self, column_type, column_schema, packet):
4747
self.length_size = packet.read_uint8()
4848
elif self.type == FIELD_TYPE.GEOMETRY:
4949
self.length_size = packet.read_uint8()
50+
elif self.type == FIELD_TYPE.JSON:
51+
self.length_size = packet.read_uint8()
5052
elif self.type == FIELD_TYPE.NEWDECIMAL:
5153
self.precision = packet.read_uint8()
5254
self.decimals = packet.read_uint8()

pymysqlreplication/constants/FIELD_TYPE.py

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
TIMESTAMP2 = 17
4242
DATETIME2 = 18
4343
TIME2 = 19
44+
JSON = 245 # Introduced in 5.7.8
4445
NEWDECIMAL = 246
4546
ENUM = 247
4647
SET = 248

pymysqlreplication/packet.py

+160
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,40 @@
1818
UNSIGNED_INT64_LENGTH = 8
1919

2020

21+
JSONB_TYPE_SMALL_OBJECT = 0x0
22+
JSONB_TYPE_LARGE_OBJECT = 0x1
23+
JSONB_TYPE_SMALL_ARRAY = 0x2
24+
JSONB_TYPE_LARGE_ARRAY = 0x3
25+
JSONB_TYPE_LITERAL = 0x4
26+
JSONB_TYPE_INT16 = 0x5
27+
JSONB_TYPE_UINT16 = 0x6
28+
JSONB_TYPE_INT32 = 0x7
29+
JSONB_TYPE_UINT32 = 0x8
30+
JSONB_TYPE_INT64 = 0x9
31+
JSONB_TYPE_UINT64 = 0xA
32+
JSONB_TYPE_DOUBLE = 0xB
33+
JSONB_TYPE_STRING = 0xC
34+
JSONB_TYPE_OPAQUE = 0xF
35+
36+
JSONB_LITERAL_NULL = 0x0
37+
JSONB_LITERAL_TRUE = 0x1
38+
JSONB_LITERAL_FALSE = 0x2
39+
40+
41+
def read_offset_or_inline(packet, large):
42+
t = packet.read_uint8()
43+
44+
if t in (JSONB_TYPE_LITERAL,
45+
JSONB_TYPE_INT16, JSONB_TYPE_UINT16):
46+
return (t, None, packet.read_binary_json_type_inlined(t))
47+
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32):
48+
return (t, None, packet.read_binary_json_type_inlined(t))
49+
50+
if large:
51+
return (t, packet.read_uint32(), None)
52+
return (t, packet.read_uint16(), None)
53+
54+
2155
class BinLogPacketWrapper(object):
2256
"""
2357
Bin Log Packet Wrapper. It uses an existing packet object, and wraps
@@ -230,6 +264,9 @@ def read_int24_be(self):
230264
def read_uint8(self):
231265
return struct.unpack('<B', self.read(1))[0]
232266

267+
def read_int16(self):
268+
return struct.unpack('<h', self.read(2))[0]
269+
233270
def read_uint16(self):
234271
return struct.unpack('<H', self.read(2))[0]
235272

@@ -281,3 +318,126 @@ def unpack_int32(self, n):
281318
+ (struct.unpack('B', n[3])[0] << 24)
282319
except TypeError:
283320
return n[0] + (n[1] << 8) + (n[2] << 16) + (n[3] << 24)
321+
322+
def read_binary_json(self, size):
323+
length = self.read_uint_by_size(size)
324+
payload = self.read(length)
325+
self.unread(payload)
326+
print('payload', payload)
327+
t = self.read_uint8()
328+
329+
return self.read_binary_json_type(t, length)
330+
331+
def read_binary_json_type(self, t, length):
332+
large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY))
333+
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
334+
return self.read_binary_json_object(length - 1, large)
335+
elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):
336+
return self.read_binary_json_array(length - 1, large)
337+
elif t in (JSONB_TYPE_STRING,):
338+
return self.read_length_coded_pascal_string(1)
339+
elif t in (JSONB_TYPE_LITERAL,):
340+
value = self.read_uint8()
341+
print('value', value)
342+
if value == JSONB_LITERAL_NULL:
343+
return None
344+
elif value == JSONB_LITERAL_TRUE:
345+
return True
346+
elif value == JSONB_LITERAL_FALSE:
347+
return False
348+
elif t == JSONB_TYPE_INT16:
349+
return self.read_int16()
350+
elif t == JSONB_TYPE_UINT16:
351+
return self.read_uint16()
352+
elif t in (JSONB_TYPE_DOUBLE,):
353+
return struct.unpack('<d', self.read(8))[0]
354+
elif t == JSONB_TYPE_INT32:
355+
return self.read_int32()
356+
elif t == JSONB_TYPE_UINT32:
357+
return self.read_uint32()
358+
elif t == JSONB_TYPE_INT64:
359+
return self.read_int64()
360+
elif t == JSONB_TYPE_UINT64:
361+
return self.read_uint64()
362+
363+
raise ValueError('Json type %d is not handled' % t)
364+
365+
def read_binary_json_type_inlined(self, t):
366+
if t == JSONB_TYPE_LITERAL:
367+
value = self.read_uint16()
368+
if value == JSONB_LITERAL_NULL:
369+
return None
370+
elif value == JSONB_LITERAL_TRUE:
371+
return True
372+
elif value == JSONB_LITERAL_FALSE:
373+
return False
374+
elif t == JSONB_TYPE_INT16:
375+
return self.read_int16()
376+
elif t == JSONB_TYPE_UINT16:
377+
return self.read_uint16()
378+
elif t == JSONB_TYPE_INT32:
379+
return self.read_int32()
380+
elif t == JSONB_TYPE_UINT32:
381+
return self.read_uint32()
382+
383+
raise ValueError('Json type %d is not handled' % t)
384+
385+
def read_binary_json_object(self, length, large):
386+
if large:
387+
elements = self.read_uint32()
388+
size = self.read_uint32()
389+
else:
390+
elements = self.read_uint16()
391+
size = self.read_uint16()
392+
393+
if size > length:
394+
raise ValueError('Json length is larger than packet length')
395+
396+
if large:
397+
key_offset_lengths = [(
398+
self.read_uint32(), # offset (we don't actually need that)
399+
self.read_uint16() # size of the key
400+
) for _ in range(elements)]
401+
else:
402+
key_offset_lengths = [(
403+
self.read_uint16(), # offset (we don't actually need that)
404+
self.read_uint16() # size of key
405+
) for _ in range(elements)]
406+
407+
value_type_inlined_lengths = [read_offset_or_inline(self, large)
408+
for _ in range(elements)]
409+
410+
keys = [self.read(x[1]) for x in key_offset_lengths]
411+
412+
out = {}
413+
for i in range(elements):
414+
if value_type_inlined_lengths[i][1] is None:
415+
data = value_type_inlined_lengths[i][2]
416+
else:
417+
t = value_type_inlined_lengths[i][0]
418+
data = self.read_binary_json_type(t, length)
419+
out[keys[i]] = data
420+
421+
return out
422+
423+
def read_binary_json_array(self, length, large):
424+
if large:
425+
elements = self.read_uint32()
426+
size = self.read_uint32()
427+
else:
428+
elements = self.read_uint16()
429+
size = self.read_uint16()
430+
431+
if size > length:
432+
raise ValueError('Json length is larger than packet length')
433+
434+
values_type_offset_inline = [
435+
read_offset_or_inline(self, large)
436+
for _ in range(elements)]
437+
438+
def _read(x):
439+
if x[1] is None:
440+
return x[2]
441+
return self.read_binary_json_type(x[0], length)
442+
443+
return [_read(x) for x in values_type_offset_inline]

pymysqlreplication/row_event.py

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import struct
44
import decimal
55
import datetime
6+
import json
67

78
from pymysql.util import byte2int
89
from pymysql.charset import charset_to_encoding
@@ -167,6 +168,8 @@ def _read_column_data(self, cols_bitmap):
167168
elif column.type == FIELD_TYPE.GEOMETRY:
168169
values[name] = self.packet.read_length_coded_pascal_string(
169170
column.length_size)
171+
elif column.type == FIELD_TYPE.JSON:
172+
values[name] = self.packet.read_binary_json(column.length_size)
170173
else:
171174
raise NotImplementedError("Unknown MySQL column type: %d" %
172175
(column.type))

pymysqlreplication/tests/test_data_type.py

+100
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,22 @@
1414
from pymysqlreplication.constants.BINLOG import *
1515
from pymysqlreplication.row_event import *
1616
from pymysqlreplication.event import *
17+
from pymysqlreplication._compat import text_type
18+
1719

1820
__all__ = ["TestDataType"]
1921

2022

23+
def to_binary_dict(d):
24+
def encode_value(v):
25+
if isinstance(v, text_type):
26+
return v.encode()
27+
if isinstance(v, list):
28+
return [encode_value(x) for x in v]
29+
return v
30+
return dict([(k.encode(), encode_value(v)) for (k, v) in d.items()])
31+
32+
2133
class TestDataType(base.PyMySQLReplicationTestCase):
2234
def ignoredEvents(self):
2335
return [GtidEvent]
@@ -413,6 +425,94 @@ def test_geometry(self):
413425
event = self.create_and_insert_value(create_query, insert_query)
414426
self.assertEqual(event.rows[0]["values"]["test"], b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?')
415427

428+
def test_json(self):
429+
if not self.isMySQL57():
430+
self.skipTest("Json is only supported in mysql 5.7")
431+
create_query = "CREATE TABLE test (id int, value json);"
432+
insert_query = """INSERT INTO test (id, value) VALUES (1, '{"my_key": "my_val", "my_key2": "my_val2"}');"""
433+
event = self.create_and_insert_value(create_query, insert_query)
434+
self.assertEqual(event.rows[0]["values"]["value"], {b"my_key": b"my_val", b"my_key2": b"my_val2"})
435+
436+
def test_json_array(self):
437+
if not self.isMySQL57():
438+
self.skipTest("Json is only supported in mysql 5.7")
439+
create_query = "CREATE TABLE test (id int, value json);"
440+
insert_query = """INSERT INTO test (id, value) VALUES (1, '["my_val", "my_val2"]');"""
441+
event = self.create_and_insert_value(create_query, insert_query)
442+
self.assertEqual(event.rows[0]["values"]["value"], [b'my_val', b'my_val2'])
443+
444+
def test_json_large(self):
445+
if not self.isMySQL57():
446+
self.skipTest("Json is only supported in mysql 5.7")
447+
data = dict([('foooo%i'%i, 'baaaaar%i'%i) for i in range(2560)]) # Make it large enough to reach 2^16 length
448+
create_query = "CREATE TABLE test (id int, value json);"
449+
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
450+
event = self.create_and_insert_value(create_query, insert_query)
451+
452+
self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data))
453+
454+
def test_json_types(self):
455+
if not self.isMySQL57():
456+
self.skipTest("Json is only supported in mysql 5.7")
457+
458+
types = [
459+
True,
460+
False,
461+
None,
462+
1.2,
463+
2^14,
464+
2^30,
465+
2^62,
466+
-1 * 2^14,
467+
-1 * 2^30,
468+
-1 * 2^62,
469+
['foo', 'bar']
470+
]
471+
472+
for t in types:
473+
data = {'foo': t}
474+
create_query = "CREATE TABLE test (id int, value json);"
475+
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
476+
event = self.create_and_insert_value(create_query, insert_query)
477+
self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data))
478+
479+
self.tearDown()
480+
self.setUp()
481+
482+
def test_json_basic(self):
483+
if not self.isMySQL57():
484+
self.skipTest("Json is only supported in mysql 5.7")
485+
486+
types = [
487+
True,
488+
False,
489+
None,
490+
1.2,
491+
2^14,
492+
2^30,
493+
2^62,
494+
-1 * 2^14,
495+
-1 * 2^30,
496+
-1 * 2^62,
497+
]
498+
499+
for data in types:
500+
create_query = "CREATE TABLE test (id int, value json);"
501+
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
502+
event = self.create_and_insert_value(create_query, insert_query)
503+
self.assertEqual(event.rows[0]["values"]["value"], data)
504+
505+
self.tearDown()
506+
self.setUp()
507+
508+
def test_json_unicode(self):
509+
if not self.isMySQL57():
510+
self.skipTest("Json is only supported in mysql 5.7")
511+
create_query = "CREATE TABLE test (id int, value json);"
512+
insert_query = u"""INSERT INTO test (id, value) VALUES (1, '{"miam": "🍔"}');"""
513+
event = self.create_and_insert_value(create_query, insert_query)
514+
self.assertEqual(event.rows[0]["values"]["value"][b"miam"], u'🍔'.encode('utf8'))
515+
416516
def test_null(self):
417517
create_query = "CREATE TABLE test ( \
418518
test TINYINT NULL DEFAULT NULL, \

0 commit comments

Comments
 (0)