From 40c4688d4b8103778eac0ee8d73a9eb9b6a9aa65 Mon Sep 17 00:00:00 2001 From: sean Date: Thu, 7 Sep 2023 07:28:01 +0900 Subject: [PATCH 1/2] feat: extract optional meta data Co-authored-by: mjs1995 Co-authored-by: mikaniz Co-authored-by: starcat37 Co-authored-by: heehehe Co-authored-by: dongwook-chan --- pymysqlreplication/binlogstream.py | 13 ++ pymysqlreplication/constants/BINLOG.py | 3 + pymysqlreplication/packet.py | 3 + pymysqlreplication/row_event.py | 289 ++++++++++++++++++++++++- 4 files changed, 307 insertions(+), 1 deletion(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 71eb10a2..8f298f58 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -262,6 +262,7 @@ def __connect_to_ctl(self): self._ctl_connection_settings["autocommit"] = True self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings) self._ctl_connection._get_table_information = self.__get_table_information + self._ctl_connection._get_dbms = self.__get_dbms self.__connected_ctl = True def __checksum_enabled(self): @@ -674,5 +675,17 @@ def __get_table_information(self, schema, table): else: raise error + def __get_dbms(self): + if not self.__connected_ctl: + self.__connect_to_ctl() + + cur = self._ctl_connection.cursor() + cur.execute("SELECT VERSION();") + version_info = cur.fetchone().get('VERSION()', '') + + if 'MariaDB' in version_info: + return 'mariadb' + return 'mysql' + def __iter__(self): return iter(self.fetchone, None) diff --git a/pymysqlreplication/constants/BINLOG.py b/pymysqlreplication/constants/BINLOG.py index a95b28ea..71e5faf4 100644 --- a/pymysqlreplication/constants/BINLOG.py +++ b/pymysqlreplication/constants/BINLOG.py @@ -49,3 +49,6 @@ MARIADB_GTID_EVENT = 0xa2 MARIADB_GTID_GTID_LIST_EVENT = 0xa3 MARIADB_START_ENCRYPTION_EVENT = 0xa4 + +# Common-Footer +BINLOG_CHECKSUM_LEN = 4 \ No newline at end of file diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 665caebe..5a51670b 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -500,3 +500,6 @@ def read_string(self): string += char return string + + def bytes_to_read(self): + return len(self.packet._data) - self.packet._position \ No newline at end of file diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index fcd138d3..47c942cb 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -6,6 +6,7 @@ import json from pymysql.charset import charset_by_name +from enum import Enum from .event import BinLogEvent from .exceptions import TableMetadataUnavailableError @@ -552,7 +553,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) super().__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) if self._processed: - #Body + # Body self.columns_present_bitmap = self.packet.read( (self.number_of_columns + 7) / 8) self.columns_present_bitmap2 = self.packet.read( @@ -577,6 +578,40 @@ def _dump(self): row["before_values"][key], row["after_values"][key])) +class OptionalMetaData: + def __init__(self): + self.unsigned_column_list = [] + self.default_charset_collation = None + self.charset_collation = {} + self.column_charset = [] + self.column_name_list = [] + self.set_str_value_list = [] + self.set_enum_str_value_list = [] + self.geometry_type_list = [] + self.simple_primary_key_list = [] + self.primary_keys_with_prefix = {} + self.enum_and_set_default_charset = None + self.enum_and_set_charset_collation = {} + self.enum_and_set_default_column_charset_list = [] + self.charset_collation_list = [] + self.enum_and_set_collation_list = [] + self.visibility_list = [] + + def dump(self): + print("=== %s ===" % self.__class__.__name__) + print("unsigned_column_list: %s" % self.unsigned_column_list) + print("default_charset_collation: %s" % self.default_charset_collation) + print("charset_collation: %s" % self.charset_collation) + print("column_charset: %s" % self.column_charset) + print("column_name_list: %s" % self.column_name_list) + print("set_str_value_list : %s" % self.set_str_value_list) + print("set_enum_str_value_list : %s" % self.set_enum_str_value_list) + print("geometry_type_list : %s" % self.geometry_type_list) + print("simple_primary_key_list: %s" % self.simple_primary_key_list) + print("primary_keys_with_prefix: %s" % self.primary_keys_with_prefix) + print("visibility_list: %s" % self.visibility_list) + print("charset_collation_list: %s" % self.charset_collation_list) + print("enum_and_set_collation_list: %s" % self.enum_and_set_collation_list) class TableMapEvent(BinLogEvent): """This event describes the structure of a table. @@ -633,6 +668,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) else: self.column_schemas = self._ctl_connection._get_table_information(self.schema, self.table) + self.dbms = self._ctl_connection._get_dbms() ordinal_pos_loc = 0 if self.column_count != 0: @@ -675,6 +711,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) # ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise ## Refer to definition of and call to row.event._is_null() to interpret bitmap corresponding to columns self.null_bitmask = self.packet.read((self.column_count + 7) / 8) + # optional meta Data + self.optional_metadata = self._get_optional_meta_data() def get_table(self): return self.table_obj @@ -685,3 +723,252 @@ def _dump(self): print("Schema: %s" % (self.schema)) print("Table: %s" % (self.table)) print("Columns: %s" % (self.column_count)) + self.optional_metadata.dump() + + def _get_optional_meta_data(self): + """ + DEFAULT_CHARSET and COLUMN_CHARSET don't appear together, + and ENUM_AND_SET_DEFAULT_CHARSET and ENUM_AND_SET_COLUMN_CHARSET don't appear together. + They are just alternative ways to pack character set information. + When binlogging, it logs character sets in the way that occupies least storage. + + TLV format data (TYPE, LENGTH, VALUE) + """ + optional_metadata = OptionalMetaData() + while self.packet.bytes_to_read() > BINLOG.BINLOG_CHECKSUM_LEN: + option_metadata_type = self.packet.read(1)[0] + length = self.packet.read_length_coded_binary() + field_type: MetadataFieldType = MetadataFieldType.by_index(option_metadata_type) + + if field_type == MetadataFieldType.SIGNEDNESS: + signed_column_list = self._convert_include_non_numeric_column( + self._read_bool_list(length, True)) + optional_metadata.unsigned_column_list = signed_column_list + + elif field_type == MetadataFieldType.DEFAULT_CHARSET: + optional_metadata.default_charset_collation, optional_metadata.charset_collation = self._read_default_charset( + length) + optional_metadata.charset_collation_list = self._parsed_column_charset_by_default_charset( + optional_metadata.default_charset_collation, + optional_metadata.charset_collation, + self._is_character_column) + + elif field_type == MetadataFieldType.COLUMN_CHARSET: + optional_metadata.column_charset = self._read_ints(length) + optional_metadata.charset_collation_list = self._parsed_column_charset_by_column_charset( + optional_metadata.column_charset, self._is_character_column) + + elif field_type == MetadataFieldType.COLUMN_NAME: + optional_metadata.column_name_list = self._read_column_names(length) + + elif field_type == MetadataFieldType.SET_STR_VALUE: + optional_metadata.set_str_value_list = self._read_type_values(length) + + elif field_type == MetadataFieldType.ENUM_STR_VALUE: + optional_metadata.set_enum_str_value_list = self._read_type_values(length) + + elif field_type == MetadataFieldType.GEOMETRY_TYPE: + optional_metadata.geometry_type_list = self._read_ints(length) + + elif field_type == MetadataFieldType.SIMPLE_PRIMARY_KEY: + optional_metadata.simple_primary_key_list = self._read_ints(length) + + elif field_type == MetadataFieldType.PRIMARY_KEY_WITH_PREFIX: + optional_metadata.primary_keys_with_prefix = self._read_primary_keys_with_prefix(length) + + elif field_type == MetadataFieldType.ENUM_AND_SET_DEFAULT_CHARSET: + optional_metadata.enum_and_set_default_charset, optional_metadata.enum_and_set_charset_collation = self._read_default_charset( + length) + + optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_default_charset( + optional_metadata.enum_and_set_default_charset, + optional_metadata.enum_and_set_charset_collation, + self._is_enum_or_set_column) + + elif field_type == MetadataFieldType.ENUM_AND_SET_COLUMN_CHARSET: + optional_metadata.enum_and_set_default_column_charset_list = self._read_ints(length) + + optional_metadata.enum_and_set_collation_list = self._parsed_column_charset_by_column_charset( + optional_metadata.enum_and_set_default_column_charset_list, self._is_enum_or_set_column) + + elif field_type == MetadataFieldType.VISIBILITY: + optional_metadata.visibility_list = self._read_bool_list(length, False) + + return optional_metadata + + def _convert_include_non_numeric_column(self, signedness_bool_list): + # The incoming order of columns in the packet represents the indices of the numeric columns. + # Thus, it transforms non-numeric columns to align with the sorting. + bool_list = [] + position = 0 + for i in range(self.column_count): + column_type = self.columns[i].type + if self._is_numeric_column(column_type): + if signedness_bool_list[position]: + bool_list.append(True) + else: + bool_list.append(False) + position += 1 + else: + bool_list.append(False) + + return bool_list + + def _parsed_column_charset_by_default_charset(self, default_charset_collation: int, column_charset_collation: dict, + column_type_detect_function): + column_charset = [] + for i in range(self.column_count): + column_type = self.columns[i].type + if not column_type_detect_function(column_type, dbms=self.dbms): + continue + elif i not in column_charset_collation.keys(): + column_charset.append(default_charset_collation) + else: + column_charset.append(column_charset_collation[i]) + + return column_charset + + def _parsed_column_charset_by_column_charset(self, column_charset_list: list, column_type_detect_function): + column_charset = [] + position = 0 + if len(column_charset_list) == 0: + return + for i in range(self.column_count): + column_type = self.columns[i].type + if not column_type_detect_function(column_type, dbms=self.dbms): + continue + else: + column_charset.append(column_charset_list[position]) + position += 1 + + return column_charset + + def _read_bool_list(self, read_byte_length, signedness_flag): + # if signedness_flag true + # The order of the index in the packet is only the index between the numeric_columns. + # Therefore, we need to use numeric_column_count when calculating bits. + bool_list = [] + bytes_data = self.packet.read(read_byte_length) + + byte = 0 + byte_idx = 0 + bit_idx = 0 + + for i in range(self.column_count): + column_type = self.columns[i].type + if not self._is_numeric_column(column_type) and signedness_flag: + continue + if bit_idx == 0: + byte = bytes_data[byte_idx] + byte_idx += 1 + bool_list.append((byte & (0b10000000 >> bit_idx)) != 0) + bit_idx = (bit_idx + 1) % 8 + return bool_list + + def _read_default_charset(self, length): + charset = {} + read_until = self.packet.read_bytes + length + if self.packet.read_bytes >= read_until: + return + default_charset_collation = self.packet.read_length_coded_binary() + while self.packet.read_bytes < read_until: + column_index = self.packet.read_length_coded_binary() + charset_collation = self.packet.read_length_coded_binary() + charset[column_index] = charset_collation + + return default_charset_collation, charset + + def _read_ints(self, length): + result = [] + read_until = self.packet.read_bytes + length + while self.packet.read_bytes < read_until: + result.append(self.packet.read_length_coded_binary()) + return result + + def _read_column_names(self, length): + result = [] + read_until = self.packet.read_bytes + length + while self.packet.read_bytes < read_until: + result.append(self.packet.read_variable_length_string().decode()) + return result + + def _read_type_values(self, length): + result = [] + read_until = self.packet.read_bytes + length + if self.packet.read_bytes >= read_until: + return + while self.packet.read_bytes < read_until: + type_value_list = [] + value_count = self.packet.read_length_coded_binary() + for i in range(value_count): + value = self.packet.read_variable_length_string() + decode_value = "" + try: + decode_value = value.decode() + except UnicodeDecodeError: + # ignore not utf-8 decode type + pass + type_value_list.append(decode_value) + result.append(type_value_list) + return result + + def _read_primary_keys_with_prefix(self, length): + ints = self._read_ints(length) + result = {} + for i in range(0, len(ints), 2): + result[ints[i]] = ints[i + 1] + return result + + @staticmethod + def _is_character_column(column_type, dbms='mysql'): + if column_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING, FIELD_TYPE.VARCHAR, FIELD_TYPE.BLOB]: + return True + if column_type == FIELD_TYPE.GEOMETRY and dbms == 'mariadb': + return True + return False + + @staticmethod + def _is_enum_column(column_type): + if column_type == FIELD_TYPE.ENUM: + return True + return False + + @staticmethod + def _is_set_column(column_type): + if column_type == FIELD_TYPE.SET: + return True + return False + + @staticmethod + def _is_enum_or_set_column(column_type, dbms='mysql'): + if column_type in [FIELD_TYPE.ENUM, FIELD_TYPE.SET]: + return True + return False + + @staticmethod + def _is_numeric_column(column_type): + if column_type in [FIELD_TYPE.TINY, FIELD_TYPE.SHORT, FIELD_TYPE.INT24, FIELD_TYPE.LONG, + FIELD_TYPE.LONGLONG, FIELD_TYPE.NEWDECIMAL, FIELD_TYPE.FLOAT, + FIELD_TYPE.DOUBLE, + FIELD_TYPE.YEAR]: + return True + return False + +class MetadataFieldType(Enum): + SIGNEDNESS = 1 # Signedness of numeric columns + DEFAULT_CHARSET = 2 # Charsets of character columns + COLUMN_CHARSET = 3 # Charsets of character columns + COLUMN_NAME = 4 # Names of columns + SET_STR_VALUE = 5 # The string values of SET columns + ENUM_STR_VALUE = 6 # The string values in ENUM columns + GEOMETRY_TYPE = 7 # The real type of geometry columns + SIMPLE_PRIMARY_KEY = 8 # The primary key without any prefix + PRIMARY_KEY_WITH_PREFIX = 9 # The primary key with some prefix + ENUM_AND_SET_DEFAULT_CHARSET = 10 # Charsets of ENUM and SET columns + ENUM_AND_SET_COLUMN_CHARSET = 11 # Charsets of ENUM and SET columns + VISIBILITY = 12 + UNKNOWN_METADATA_FIELD_TYPE = 128 + + @staticmethod + def by_index(index): + return MetadataFieldType(index) From 0db59b77260183e1204958b2e50ed1bd5128f9cb Mon Sep 17 00:00:00 2001 From: sean Date: Thu, 7 Sep 2023 07:50:07 +0900 Subject: [PATCH 2/2] feat: optional meat data test case add Co-authored-by: mjs1995 Co-authored-by: mikaniz Co-authored-by: starcat37 Co-authored-by: heehehe Co-authored-by: dongwook-chan --- pymysqlreplication/tests/base.py | 7 + pymysqlreplication/tests/test_basic.py | 183 ++++++++++++++++++++++++- 2 files changed, 187 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 301ee3e9..88acda33 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -63,6 +63,13 @@ def isMySQL80AndMore(self): version = float(self.getMySQLVersion().rsplit('.', 1)[0]) return version >= 8.0 + def isMySQL8014AndMore(self): + version = float(self.getMySQLVersion().rsplit(".", 1)[0]) + version_detail = int(self.getMySQLVersion().rsplit(".", 1)[1]) + if version > 8.0: + return True + return version == 8.0 and version_detail >= 14 + def isMariaDB(self): if self.__is_mariaDB is None: self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index f03b5663..c07b84c3 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -21,9 +21,11 @@ from pymysqlreplication.packet import BinLogPacketWrapper from pymysql.protocol import MysqlPacket -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", - "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", - "TestRowsQueryLogEvents"] +__all__ = [ + "TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", + "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", + "TestRowsQueryLogEvents", "TestOptionalMetaData" +] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -1444,6 +1446,181 @@ def test_query_event_latin1(self): assert event.query == r"CREATE TABLE test_latin1_\xd6\xc6\xdb (a INT)" +class TestOptionalMetaData(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestOptionalMetaData, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(TableMapEvent,), + fail_on_table_metadata_unavailable=True + ) + if not self.isMySQL8014AndMore(): + self.skipTest("Mysql version is under 8.0.14 - pass TestOptionalMetaData") + self.execute("SET GLOBAL binlog_row_metadata='FULL';") + + def test_signedness(self): + create_query = "CREATE TABLE test_signedness (col1 INT, col2 INT UNSIGNED);" + insert_query = "INSERT INTO test_signedness VALUES (-10, 10);" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.unsigned_column_list, [False, True]) + + def test_default_charset(self): + create_query = "CREATE TABLE test_default_charset (name VARCHAR(50)) CHARACTER SET utf8mb4;" + insert_query = "INSERT INTO test_default_charset VALUES ('Hello, World!');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.default_charset_collation, 45) + else: + self.assertEqual(event.optional_metadata.default_charset_collation, 255) + + def test_column_charset(self): + create_query = "CREATE TABLE test_column_charset (col1 VARCHAR(50), col2 VARCHAR(50) CHARACTER SET binary, col3 VARCHAR(50) CHARACTER SET latin1);" + insert_query = "INSERT INTO test_column_charset VALUES ('python', 'mysql', 'replication');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.column_charset, [45, 63, 8]) + else: + self.assertEqual(event.optional_metadata.column_charset, [255, 63, 8]) + + def test_column_name(self): + create_query = "CREATE TABLE test_column_name (col_int INT, col_varchar VARCHAR(30), col_bool BOOL);" + insert_query = "INSERT INTO test_column_name VALUES (1, 'Hello', true);" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.column_name_list, ['col_int', 'col_varchar', 'col_bool']) + + def test_set_str_value(self): + create_query = "CREATE TABLE test_set_str_value (skills SET('Programming', 'Writing', 'Design'));" + insert_query = "INSERT INTO test_set_str_value VALUES ('Programming,Writing');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.set_str_value_list, [['Programming', 'Writing', 'Design']]) + + def test_enum_str_value(self): + create_query = "CREATE TABLE test_enum_str_value (pet ENUM('Dog', 'Cat'));" + insert_query = "INSERT INTO test_enum_str_value VALUES ('Cat');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.set_enum_str_value_list, [['Dog', 'Cat']]) + + def test_geometry_type(self): + create_query = "CREATE TABLE test_geometry_type (location POINT);" + insert_query = "INSERT INTO test_geometry_type VALUES (Point(37.123, 125.987));" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.geometry_type_list, [1]) + + def test_simple_primary_key(self): + create_query = "CREATE TABLE test_simple_primary_key (c_key1 INT, c_key2 INT, c_not_key INT, PRIMARY KEY(c_key1, c_key2));" + insert_query = "INSERT INTO test_simple_primary_key VALUES (1, 2, 3);" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.simple_primary_key_list, [0, 1]) + + def test_primary_key_with_prefix(self): + create_query = "CREATE TABLE test_primary_key_with_prefix (c_key1 CHAR(100), c_key2 CHAR(10), c_not_key INT, c_key3 CHAR(100), PRIMARY KEY(c_key1(5), c_key2, c_key3(10)));" + insert_query = "INSERT INTO test_primary_key_with_prefix VALUES('1', '2', 3, '4');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + self.assertEqual(event.optional_metadata.primary_keys_with_prefix, {0: 5, 1: 0, 3: 10}) + + def test_enum_and_set_default_charset(self): + create_query = "CREATE TABLE test_enum_and_set_default_charset (pet ENUM('Dog', 'Cat'), skills SET('Programming', 'Writing', 'Design')) CHARACTER SET utf8mb4;" + insert_query = "INSERT INTO test_enum_and_set_default_charset VALUES('Dog', 'Design');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [45, 45]) + else: + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [255, 255]) + + def test_enum_and_set_column_charset(self): + create_query = "CREATE TABLE test_enum_and_set_column_charset (pet ENUM('Dog', 'Cat') CHARACTER SET utf8mb4, number SET('00', '01', '10', '11') CHARACTER SET binary);" + insert_query = "INSERT INTO test_enum_and_set_column_charset VALUES('Cat', '10');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if self.isMariaDB(): + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [45, 63]) + else: + self.assertEqual(event.optional_metadata.enum_and_set_collation_list, [255, 63]) + + def test_visibility(self): + create_query = "CREATE TABLE test_visibility (name VARCHAR(50), secret_key VARCHAR(50) DEFAULT 'qwerty' INVISIBLE);" + insert_query = "INSERT INTO test_visibility VALUES('Audrey');" + + self.execute(create_query) + self.execute(insert_query) + self.execute("COMMIT") + + event = self.stream.fetchone() + self.assertIsInstance(event, TableMapEvent) + if not self.isMariaDB(): + self.assertEqual(event.optional_metadata.visibility_list, [True, False]) + + def tearDown(self): + self.execute("SET GLOBAL binlog_row_metadata='MINIMAL';") + super(TestOptionalMetaData, self).tearDown() + if __name__ == "__main__": import unittest unittest.main()