diff --git a/pymysqlreplication/_compat.py b/pymysqlreplication/_compat.py new file mode 100644 index 00000000..a61b248c --- /dev/null +++ b/pymysqlreplication/_compat.py @@ -0,0 +1,7 @@ +import sys + + +if sys.version_info > (3,): + text_type = str +else: + text_type = unicode diff --git a/pymysqlreplication/column.py b/pymysqlreplication/column.py index d448a82f..64445b3f 100644 --- a/pymysqlreplication/column.py +++ b/pymysqlreplication/column.py @@ -47,6 +47,8 @@ def __parse_column_definition(self, column_type, column_schema, packet): self.length_size = packet.read_uint8() elif self.type == FIELD_TYPE.GEOMETRY: self.length_size = packet.read_uint8() + elif self.type == FIELD_TYPE.JSON: + self.length_size = packet.read_uint8() elif self.type == FIELD_TYPE.NEWDECIMAL: self.precision = packet.read_uint8() self.decimals = packet.read_uint8() diff --git a/pymysqlreplication/constants/FIELD_TYPE.py b/pymysqlreplication/constants/FIELD_TYPE.py index 6ed230a6..51791d62 100644 --- a/pymysqlreplication/constants/FIELD_TYPE.py +++ b/pymysqlreplication/constants/FIELD_TYPE.py @@ -41,6 +41,7 @@ TIMESTAMP2 = 17 DATETIME2 = 18 TIME2 = 19 +JSON = 245 # Introduced in 5.7.8 NEWDECIMAL = 246 ENUM = 247 SET = 248 diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index e0d8500b..ebb03d81 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -18,6 +18,40 @@ UNSIGNED_INT64_LENGTH = 8 +JSONB_TYPE_SMALL_OBJECT = 0x0 +JSONB_TYPE_LARGE_OBJECT = 0x1 +JSONB_TYPE_SMALL_ARRAY = 0x2 +JSONB_TYPE_LARGE_ARRAY = 0x3 +JSONB_TYPE_LITERAL = 0x4 +JSONB_TYPE_INT16 = 0x5 +JSONB_TYPE_UINT16 = 0x6 +JSONB_TYPE_INT32 = 0x7 +JSONB_TYPE_UINT32 = 0x8 +JSONB_TYPE_INT64 = 0x9 +JSONB_TYPE_UINT64 = 0xA +JSONB_TYPE_DOUBLE = 0xB +JSONB_TYPE_STRING = 0xC +JSONB_TYPE_OPAQUE = 0xF + +JSONB_LITERAL_NULL = 0x0 +JSONB_LITERAL_TRUE = 0x1 +JSONB_LITERAL_FALSE = 0x2 + + +def read_offset_or_inline(packet, large): + t = packet.read_uint8() + + if t in (JSONB_TYPE_LITERAL, + JSONB_TYPE_INT16, JSONB_TYPE_UINT16): + return (t, None, packet.read_binary_json_type_inlined(t)) + if large and t in (JSONB_TYPE_INT32, JSONB_TYPE_UINT32): + return (t, None, packet.read_binary_json_type_inlined(t)) + + if large: + return (t, packet.read_uint32(), None) + return (t, packet.read_uint16(), None) + + class BinLogPacketWrapper(object): """ Bin Log Packet Wrapper. It uses an existing packet object, and wraps @@ -230,6 +264,9 @@ def read_int24_be(self): def read_uint8(self): return struct.unpack(' length: + raise ValueError('Json length is larger than packet length') + + if large: + key_offset_lengths = [( + self.read_uint32(), # offset (we don't actually need that) + self.read_uint16() # size of the key + ) for _ in range(elements)] + else: + key_offset_lengths = [( + self.read_uint16(), # offset (we don't actually need that) + self.read_uint16() # size of key + ) for _ in range(elements)] + + value_type_inlined_lengths = [read_offset_or_inline(self, large) + for _ in range(elements)] + + keys = [self.read(x[1]) for x in key_offset_lengths] + + out = {} + for i in range(elements): + if value_type_inlined_lengths[i][1] is None: + data = value_type_inlined_lengths[i][2] + else: + t = value_type_inlined_lengths[i][0] + data = self.read_binary_json_type(t, length) + out[keys[i]] = data + + return out + + def read_binary_json_array(self, length, large): + if large: + elements = self.read_uint32() + size = self.read_uint32() + else: + elements = self.read_uint16() + size = self.read_uint16() + + if size > length: + raise ValueError('Json length is larger than packet length') + + values_type_offset_inline = [ + read_offset_or_inline(self, large) + for _ in range(elements)] + + def _read(x): + if x[1] is None: + return x[2] + return self.read_binary_json_type(x[0], length) + + return [_read(x) for x in values_type_offset_inline] diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index b3846e2f..aba418b0 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -3,6 +3,7 @@ import struct import decimal import datetime +import json from pymysql.util import byte2int from pymysql.charset import charset_to_encoding @@ -167,6 +168,8 @@ def _read_column_data(self, cols_bitmap): elif column.type == FIELD_TYPE.GEOMETRY: values[name] = self.packet.read_length_coded_pascal_string( column.length_size) + elif column.type == FIELD_TYPE.JSON: + values[name] = self.packet.read_binary_json(column.length_size) else: raise NotImplementedError("Unknown MySQL column type: %d" % (column.type)) diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 87a6d6e8..f3e72ef1 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -14,10 +14,22 @@ from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * from pymysqlreplication.event import * +from pymysqlreplication._compat import text_type + __all__ = ["TestDataType"] +def to_binary_dict(d): + def encode_value(v): + if isinstance(v, text_type): + return v.encode() + if isinstance(v, list): + return [encode_value(x) for x in v] + return v + return dict([(k.encode(), encode_value(v)) for (k, v) in d.items()]) + + class TestDataType(base.PyMySQLReplicationTestCase): def ignoredEvents(self): return [GtidEvent] @@ -413,6 +425,94 @@ def test_geometry(self): event = self.create_and_insert_value(create_query, insert_query) self.assertEqual(event.rows[0]["values"]["test"], b'\x00\x00\x00\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?') + def test_json(self): + if not self.isMySQL57(): + self.skipTest("Json is only supported in mysql 5.7") + create_query = "CREATE TABLE test (id int, value json);" + insert_query = """INSERT INTO test (id, value) VALUES (1, '{"my_key": "my_val", "my_key2": "my_val2"}');""" + event = self.create_and_insert_value(create_query, insert_query) + self.assertEqual(event.rows[0]["values"]["value"], {b"my_key": b"my_val", b"my_key2": b"my_val2"}) + + def test_json_array(self): + if not self.isMySQL57(): + self.skipTest("Json is only supported in mysql 5.7") + create_query = "CREATE TABLE test (id int, value json);" + insert_query = """INSERT INTO test (id, value) VALUES (1, '["my_val", "my_val2"]');""" + event = self.create_and_insert_value(create_query, insert_query) + self.assertEqual(event.rows[0]["values"]["value"], [b'my_val', b'my_val2']) + + def test_json_large(self): + if not self.isMySQL57(): + self.skipTest("Json is only supported in mysql 5.7") + data = dict([('foooo%i'%i, 'baaaaar%i'%i) for i in range(2560)]) # Make it large enough to reach 2^16 length + create_query = "CREATE TABLE test (id int, value json);" + insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data) + event = self.create_and_insert_value(create_query, insert_query) + + self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data)) + + def test_json_types(self): + if not self.isMySQL57(): + self.skipTest("Json is only supported in mysql 5.7") + + types = [ + True, + False, + None, + 1.2, + 2^14, + 2^30, + 2^62, + -1 * 2^14, + -1 * 2^30, + -1 * 2^62, + ['foo', 'bar'] + ] + + for t in types: + data = {'foo': t} + create_query = "CREATE TABLE test (id int, value json);" + insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data) + event = self.create_and_insert_value(create_query, insert_query) + self.assertEqual(event.rows[0]["values"]["value"], to_binary_dict(data)) + + self.tearDown() + self.setUp() + + def test_json_basic(self): + if not self.isMySQL57(): + self.skipTest("Json is only supported in mysql 5.7") + + types = [ + True, + False, + None, + 1.2, + 2^14, + 2^30, + 2^62, + -1 * 2^14, + -1 * 2^30, + -1 * 2^62, + ] + + for data in types: + create_query = "CREATE TABLE test (id int, value json);" + insert_query = """INSERT INTO test (id, value) VALUES (1, '%s');""" % json.dumps(data) + event = self.create_and_insert_value(create_query, insert_query) + self.assertEqual(event.rows[0]["values"]["value"], data) + + self.tearDown() + self.setUp() + + def test_json_unicode(self): + if not self.isMySQL57(): + self.skipTest("Json is only supported in mysql 5.7") + create_query = "CREATE TABLE test (id int, value json);" + insert_query = u"""INSERT INTO test (id, value) VALUES (1, '{"miam": "🍔"}');""" + event = self.create_and_insert_value(create_query, insert_query) + self.assertEqual(event.rows[0]["values"]["value"][b"miam"], u'🍔'.encode('utf8')) + def test_null(self): create_query = "CREATE TABLE test ( \ test TINYINT NULL DEFAULT NULL, \