diff --git a/pymysqlreplication/constants/STATUS_VAR_KEY.py b/pymysqlreplication/constants/STATUS_VAR_KEY.py new file mode 100644 index 00000000..aa0ffe78 --- /dev/null +++ b/pymysqlreplication/constants/STATUS_VAR_KEY.py @@ -0,0 +1,38 @@ +#from enum import IntEnum + +#class StatusVarsKey(IntEnum): +"""List of Query_event_status_vars + + A status variable in query events is a sequence of status KEY-VALUE pairs. + The class variables enumerated below are KEYs. + Each KEY determines the length of corresponding VALUE. + + For further details refer to: + mysql-server: https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L463-L532 + MySQL Documentation: https://dev.mysql.com/doc/internals/en/query-event.html + + Status variable key names From mysql-server source code, edited by dongwook-chan +""" + +# KEY +Q_FLAGS2_CODE = 0x00 +Q_SQL_MODE_CODE = 0X01 +Q_CATALOG_CODE = 0x02 +Q_AUTO_INCREMENT = 0x03 +Q_CHARSET_CODE = 0x04 +Q_TIME_ZONE_CODE = 0x05 +Q_CATALOG_NZ_CODE = 0x06 +Q_LC_TIME_NAMES_CODE = 0x07 +Q_CHARSET_DATABASE_CODE = 0x08 +Q_TABLE_MAP_FOR_UPDATE_CODE = 0x09 +Q_MASTER_DATA_WRITTEN_CODE = 0x0A +Q_INVOKER = 0x0B +Q_UPDATED_DB_NAMES = 0x0C +Q_MICROSECONDS = 0x0D +Q_COMMIT_TS = 0x0E +Q_COMMIT_TS2 = 0X0F +Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP = 0X10 +Q_DDL_LOGGED_WITH_XID = 0X11 +Q_DEFAULT_COLLATION_FOR_UTF8MB4 = 0X12 +Q_SQL_REQUIRE_PRIMARY_KEY = 0X13 +Q_DEFAULT_TABLE_ENCRYPTION = 0X14 diff --git a/pymysqlreplication/constants/__init__.py b/pymysqlreplication/constants/__init__.py index 0c9d19fd..11bff9d8 100644 --- a/pymysqlreplication/constants/__init__.py +++ b/pymysqlreplication/constants/__init__.py @@ -2,3 +2,4 @@ from .BINLOG import * from .FIELD_TYPE import * +from .STATUS_VAR_KEY import * diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index dbba589d..d5ee1060 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -3,6 +3,7 @@ import binascii import struct import datetime +from pymysqlreplication.constants.STATUS_VAR_KEY import * class BinLogEvent(object): @@ -167,7 +168,13 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.status_vars_length = self.packet.read_uint16() # Payload - self.status_vars = self.packet.read(self.status_vars_length) + status_vars_end_pos = self.packet.read_bytes + self.status_vars_length + while self.packet.read_bytes < status_vars_end_pos: # while 남은 data length가 얼마만큼? OR read_bytes + # read KEY for status variable + status_vars_key = self.packet.read_uint8() + # read VALUE for status variable + self._read_status_vars_value_for_key(status_vars_key) + self.schema = self.packet.read(self.schema_length) self.packet.advance(1) @@ -181,6 +188,80 @@ def _dump(self): print("Execution time: %d" % (self.execution_time)) print("Query: %s" % (self.query)) + + # TODO: check if instance attribute with the same name already exists + # TODO: put all the instace attribute in separate class? called status_vars + # TODO: does length need to be remembered? + # TODO: ref(mysql doc. and mysql-server) for each hunk + def _read_status_vars_value_for_key(self, key): + """parse status variable VALUE for given KEY + + A status variable in query events is a sequence of status KEY-VALUE pairs. + Parsing logic from mysql-server source code edited by dongwook-chan + https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336 + + Args: + key: key for status variable + """ + if key == Q_FLAGS2_CODE: # 0x00 + self.flags2 = self.packet.read_uint32() + elif key == Q_SQL_MODE_CODE: # 0x01 + self.sql_mode = self.packet.read_uint64() + elif key == Q_CATALOG_CODE: # 0x02 for MySQL 5.0.x + pass + elif key == Q_AUTO_INCREMENT: # 0x03 + self.auto_increment_increment = self.packet.read_uint16() + self.auto_increment_offset = self.packet.read_uint16() + elif key == Q_CHARSET_CODE: # 0x04 + self.character_set_client = self.packet.read_uint16() + self.collation_connection = self.packet.read_uint16() + self.collation_server = self.packet.read_uint16() + elif key == Q_TIME_ZONE_CODE: # 0x05 + time_zone_len = self.packet.read_uint8() + if time_zone_len: + self.time_zone = self.packet.read(time_zone_len) + elif key == Q_CATALOG_NZ_CODE: # 0x06 + catalog_len = self.packet.read_uint8() + if catalog_len: + self.catalog_nz_code = self.packet.read(catalog_len) + elif key == Q_LC_TIME_NAMES_CODE: # 0x07 + self.lc_time_names_number = self.packet.read_uint16() + elif key == Q_CHARSET_DATABASE_CODE: # 0x08 + self.charset_database_number = self.packet.read_uint16() + elif key == Q_TABLE_MAP_FOR_UPDATE_CODE: # 0x09 + self.table_map_for_update = self.packet.read_uint64() + elif key == Q_MASTER_DATA_WRITTEN_CODE: # 0x0A + pass + elif key == Q_INVOKER: # 0x0B + user_len = self.packet.read_uint8() + if user_len: + self.user = self.packet.read(user_len) + host_len = self.packet.read_uint8() + if host_len: + self.host = self.packet.read(host_len) + elif key == Q_UPDATED_DB_NAMES: # 0x0C + mts_accessed_dbs = self.packet.read_uint8() + dbs = [] + for i in range(mts_accessed_dbs): + db = self.packet.read_string() + dbs.append(db) + self.mts_accessed_db_names = dbs + elif key == Q_MICROSECONDS: # 0x0D + self.microseconds = self.packet.read_uint24() + elif key == Q_COMMIT_TS: # 0x0E + pass + elif key == Q_COMMIT_TS2: # 0x0F + pass + elif key == Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:# 0x10 + self.explicit_defaults_ts = self.packet.read_uint8() + elif key == Q_DDL_LOGGED_WITH_XID: # 0x11 + self.ddl_xid = self.packet.read_uint64() + elif key == Q_DEFAULT_COLLATION_FOR_UTF8MB4: # 0x12 + self.default_collation_for_utf8mb4_number = self.packet.read_uint16() + elif key == Q_SQL_REQUIRE_PRIMARY_KEY: # 0x13 + self.sql_require_primary_key = self.packet.read_uint8() + elif key == Q_DEFAULT_TABLE_ENCRYPTION: # 0x14 + self.default_table_encryption = self.packet.read_uint8() class BeginLoadQueryEvent(BinLogEvent): """ diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index de29f4f7..a5a7c0fd 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -461,3 +461,20 @@ def _read(x): return self.read_binary_json_type(x[0], length) return [_read(x) for x in values_type_offset_inline] + + def read_string(self): + """Read a 'Length Coded String' from the data buffer. + + Read __data_buffer until NULL character (0 = \0 = \x00) + + Returns: + Binary string parsed from __data_buffer + """ + string = b'' + while True: + char = self.read(1) + if char == b'\0': + break + string += char + + return string diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index 169e581e..37873444 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -57,6 +57,26 @@ def create_and_insert_value(self, create_query, insert_query): self.assertIsInstance(event, WriteRowsEvent) return event + def create_table(self, create_query): + """Create table + + Create table in db and return query event. + + Returns: + Query event + """ + + self.execute(create_query) + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + + event = self.stream.fetchone() + + self.assertEqual(event.event_type, QUERY_EVENT) + + return event + def test_decimal(self): create_query = "CREATE TABLE test (test DECIMAL(2,1))" insert_query = "INSERT INTO test VALUES(4.2)" @@ -641,5 +661,24 @@ def test_partition_id(self): self.assertEqual(event.extra_data_type, 1) self.assertEqual(event.partition_id, 3) + def test_status_vars(self): + """Test parse of status variables in query events + + Majority of status variables available depends on the settings of db. + Therefore, this test only tests system variable values independent from settings of db. + Note that if you change default db name 'pymysqlreplication_test', + event.mts_accessed_db_names MUST be asserted against the changed db name. + + Returns: + binary string parsed from __data_buffer + + Raises: + AssertionError: if no + """ + create_query = "CREATE TABLE test (id INTEGER)" + event = self.create_table(create_query) + self.assertEqual(event.catalog_nz_code, b'std') + self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test']) + if __name__ == "__main__": unittest.main()