Skip to content

Fix handling of JSON data #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pymysqlreplication/_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys


if sys.version_info > (3,):
text_type = str
else:
text_type = unicode
2 changes: 2 additions & 0 deletions pymysqlreplication/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def __parse_column_definition(self, column_type, column_schema, packet):
self.length_size = packet.read_uint8()
elif self.type == FIELD_TYPE.GEOMETRY:
self.length_size = packet.read_uint8()
elif self.type == FIELD_TYPE.JSON:
self.length_size = packet.read_uint8()
elif self.type == FIELD_TYPE.NEWDECIMAL:
self.precision = packet.read_uint8()
self.decimals = packet.read_uint8()
Expand Down
1 change: 1 addition & 0 deletions pymysqlreplication/constants/FIELD_TYPE.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
TIMESTAMP2 = 17
DATETIME2 = 18
TIME2 = 19
JSON = 245 # Introduced in 5.7.8
NEWDECIMAL = 246
ENUM = 247
SET = 248
Expand Down
160 changes: 160 additions & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,40 @@
UNSIGNED_INT64_LENGTH = 8


JSONB_TYPE_SMALL_OBJECT = 0x0
JSONB_TYPE_LARGE_OBJECT = 0x1
JSONB_TYPE_SMALL_ARRAY = 0x2
JSONB_TYPE_LARGE_ARRAY = 0x3
JSONB_TYPE_LITERAL = 0x4
JSONB_TYPE_INT16 = 0x5
JSONB_TYPE_UINT16 = 0x6
JSONB_TYPE_INT32 = 0x7
JSONB_TYPE_UINT32 = 0x8
JSONB_TYPE_INT64 = 0x9
JSONB_TYPE_UINT64 = 0xA
JSONB_TYPE_DOUBLE = 0xB
JSONB_TYPE_STRING = 0xC
JSONB_TYPE_OPAQUE = 0xF

JSONB_LITERAL_NULL = 0x0
JSONB_LITERAL_TRUE = 0x1
JSONB_LITERAL_FALSE = 0x2


def read_offset_or_inline(packet, large):
t = packet.read_uint8()

if t in (JSONB_TYPE_LITERAL,
JSONB_TYPE_INT16, JSONB_TYPE_UINT16):
return (t, None, packet.read_binary_json_type_inlined(t))
if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32):
return (t, None, packet.read_binary_json_type_inlined(t))

if large:
return (t, packet.read_uint32(), None)
return (t, packet.read_uint16(), None)


class BinLogPacketWrapper(object):
"""
Bin Log Packet Wrapper. It uses an existing packet object, and wraps
Expand Down Expand Up @@ -230,6 +264,9 @@ def read_int24_be(self):
def read_uint8(self):
return struct.unpack('<B', self.read(1))[0]

def read_int16(self):
return struct.unpack('<h', self.read(2))[0]

def read_uint16(self):
return struct.unpack('<H', self.read(2))[0]

Expand Down Expand Up @@ -281,3 +318,126 @@ def unpack_int32(self, n):
+ (struct.unpack('B', n[3])[0] << 24)
except TypeError:
return n[0] + (n[1] << 8) + (n[2] << 16) + (n[3] << 24)

def read_binary_json(self, size):
length = self.read_uint_by_size(size)
payload = self.read(length)
self.unread(payload)
print('payload', payload)
t = self.read_uint8()

return self.read_binary_json_type(t, length)

def read_binary_json_type(self, t, length):
large = (t in (JSONB_TYPE_LARGE_OBJECT, JSONB_TYPE_LARGE_ARRAY))
if t in (JSONB_TYPE_SMALL_OBJECT, JSONB_TYPE_LARGE_OBJECT):
return self.read_binary_json_object(length - 1, large)
elif t in (JSONB_TYPE_SMALL_ARRAY, JSONB_TYPE_LARGE_ARRAY):
return self.read_binary_json_array(length - 1, large)
elif t in (JSONB_TYPE_STRING,):
return self.read_length_coded_pascal_string(1)
elif t in (JSONB_TYPE_LITERAL,):
value = self.read_uint8()
print('value', value)
if value == JSONB_LITERAL_NULL:
return None
elif value == JSONB_LITERAL_TRUE:
return True
elif value == JSONB_LITERAL_FALSE:
return False
elif t == JSONB_TYPE_INT16:
return self.read_int16()
elif t == JSONB_TYPE_UINT16:
return self.read_uint16()
elif t in (JSONB_TYPE_DOUBLE,):
return struct.unpack('<d', self.read(8))[0]
elif t == JSONB_TYPE_INT32:
return self.read_int32()
elif t == JSONB_TYPE_UINT32:
return self.read_uint32()
elif t == JSONB_TYPE_INT64:
return self.read_int64()
elif t == JSONB_TYPE_UINT64:
return self.read_uint64()

raise ValueError('Json type %d is not handled' % t)

def read_binary_json_type_inlined(self, t):
if t == JSONB_TYPE_LITERAL:
value = self.read_uint16()
if value == JSONB_LITERAL_NULL:
return None
elif value == JSONB_LITERAL_TRUE:
return True
elif value == JSONB_LITERAL_FALSE:
return False
elif t == JSONB_TYPE_INT16:
return self.read_int16()
elif t == JSONB_TYPE_UINT16:
return self.read_uint16()
elif t == JSONB_TYPE_INT32:
return self.read_int32()
elif t == JSONB_TYPE_UINT32:
return self.read_uint32()

raise ValueError('Json type %d is not handled' % t)

def read_binary_json_object(self, length, large):
if large:
elements = self.read_uint32()
size = self.read_uint32()
else:
elements = self.read_uint16()
size = self.read_uint16()

if size > length:
raise ValueError('Json length is larger than packet length')

if large:
key_offset_lengths = [(
self.read_uint32(), # offset (we don't actually need that)
self.read_uint16() # size of the key
) for _ in range(elements)]
else:
key_offset_lengths = [(
self.read_uint16(), # offset (we don't actually need that)
self.read_uint16() # size of key
) for _ in range(elements)]

value_type_inlined_lengths = [read_offset_or_inline(self, large)
for _ in range(elements)]

keys = [self.read(x[1]) for x in key_offset_lengths]

out = {}
for i in range(elements):
if value_type_inlined_lengths[i][1] is None:
data = value_type_inlined_lengths[i][2]
else:
t = value_type_inlined_lengths[i][0]
data = self.read_binary_json_type(t, length)
out[keys[i]] = data

return out

def read_binary_json_array(self, length, large):
if large:
elements = self.read_uint32()
size = self.read_uint32()
else:
elements = self.read_uint16()
size = self.read_uint16()

if size > length:
raise ValueError('Json length is larger than packet length')

values_type_offset_inline = [
read_offset_or_inline(self, large)
for _ in range(elements)]

def _read(x):
if x[1] is None:
return x[2]
return self.read_binary_json_type(x[0], length)

return [_read(x) for x in values_type_offset_inline]
3 changes: 3 additions & 0 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import struct
import decimal
import datetime
import json

from pymysql.util import byte2int
from pymysql.charset import charset_to_encoding
Expand Down Expand Up @@ -167,6 +168,8 @@ def _read_column_data(self, cols_bitmap):
elif column.type == FIELD_TYPE.GEOMETRY:
values[name] = self.packet.read_length_coded_pascal_string(
column.length_size)
elif column.type == FIELD_TYPE.JSON:
values[name] = self.packet.read_binary_json(column.length_size)
else:
raise NotImplementedError("Unknown MySQL column type: %d" %
(column.type))
Expand Down
100 changes: 100 additions & 0 deletions pymysqlreplication/tests/test_data_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,22 @@
from pymysqlreplication.constants.BINLOG import *
from pymysqlreplication.row_event import *
from pymysqlreplication.event import *
from pymysqlreplication._compat import text_type


__all__ = ["TestDataType"]


def to_binary_dict(d):
def encode_value(v):
if isinstance(v, text_type):
return v.encode()
if isinstance(v, list):
return [encode_value(x) for x in v]
return v
return dict([(k.encode(), encode_value(v)) for (k, v) in d.items()])


class TestDataType(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
return [GtidEvent]
Expand Down Expand Up @@ -413,6 +425,94 @@ def test_geometry(self):
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.rows[0]["values"]["test"], b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?')

def test_json(self):
if not self.isMySQL57():
self.skipTest("Json is only supported in mysql 5.7")
create_query = "CREATE TABLE test (id int, value json);"
insert_query = """INSERT INTO test (id, value) VALUES (1, '{"my_key": "my_val", "my_key2": "my_val2"}');"""
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.rows[0]["values"]["value"], {b"my_key": b"my_val", b"my_key2": b"my_val2"})

def test_json_array(self):
if not self.isMySQL57():
self.skipTest("Json is only supported in mysql 5.7")
create_query = "CREATE TABLE test (id int, value json);"
insert_query = """INSERT INTO test (id, value) VALUES (1, '["my_val", "my_val2"]');"""
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.rows[0]["values"]["value"], [b'my_val', b'my_val2'])

def test_json_large(self):
if not self.isMySQL57():
self.skipTest("Json is only supported in mysql 5.7")
data = dict([('foooo%i'%i, 'baaaaar%i'%i) for i in range(2560)]) # Make it large enough to reach 2^16 length
create_query = "CREATE TABLE test (id int, value json);"
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
event = self.create_and_insert_value(create_query, insert_query)

self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data))

def test_json_types(self):
if not self.isMySQL57():
self.skipTest("Json is only supported in mysql 5.7")

types = [
True,
False,
None,
1.2,
2^14,
2^30,
2^62,
-1 * 2^14,
-1 * 2^30,
-1 * 2^62,
['foo', 'bar']
]

for t in types:
data = {'foo': t}
create_query = "CREATE TABLE test (id int, value json);"
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data))

self.tearDown()
self.setUp()

def test_json_basic(self):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a test for your last usecase @namabile

if not self.isMySQL57():
self.skipTest("Json is only supported in mysql 5.7")

types = [
True,
False,
None,
1.2,
2^14,
2^30,
2^62,
-1 * 2^14,
-1 * 2^30,
-1 * 2^62,
]

for data in types:
create_query = "CREATE TABLE test (id int, value json);"
insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data)
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.rows[0]["values"]["value"], data)

self.tearDown()
self.setUp()

def test_json_unicode(self):
if not self.isMySQL57():
self.skipTest("Json is only supported in mysql 5.7")
create_query = "CREATE TABLE test (id int, value json);"
insert_query = u"""INSERT INTO test (id, value) VALUES (1, '{"miam": "🍔"}');"""
event = self.create_and_insert_value(create_query, insert_query)
self.assertEqual(event.rows[0]["values"]["value"][b"miam"], u'🍔'.encode('utf8'))

def test_null(self):
create_query = "CREATE TABLE test ( \
test TINYINT NULL DEFAULT NULL, \
Expand Down