Skip to content

Commit f70f05b

Browse files
Parse status variables in query event (julien-duponchelle#360)
* Parse status variables in query event packet Status variables contain useful information: 1. List of databases affected by the query 2. Runtime value of system variables (SQL_MODE, AUTOCOMMIT, CHARSET_SERVER, etc.) 3. timezone of the master For further information please refer to; [Syntax of system variables field in query event](https://dev.mysql.com/doc/internals/en/query-event.html#q-microseconds) [Definition of enumeration for system variable keys](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L463-L532) [Semantics of system variable values](https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L156-L448)
1 parent 84f2cda commit f70f05b

File tree

5 files changed

+177
-1
lines changed

5 files changed

+177
-1
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#from enum import IntEnum
2+
3+
#class StatusVarsKey(IntEnum):
4+
"""List of Query_event_status_vars
5+
6+
A status variable in query events is a sequence of status KEY-VALUE pairs.
7+
The class variables enumerated below are KEYs.
8+
Each KEY determines the length of corresponding VALUE.
9+
10+
For further details refer to:
11+
mysql-server: https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/include/statement_events.h#L463-L532
12+
MySQL Documentation: https://dev.mysql.com/doc/internals/en/query-event.html
13+
14+
Status variable key names From mysql-server source code, edited by dongwook-chan
15+
"""
16+
17+
# KEY
18+
Q_FLAGS2_CODE = 0x00
19+
Q_SQL_MODE_CODE = 0X01
20+
Q_CATALOG_CODE = 0x02
21+
Q_AUTO_INCREMENT = 0x03
22+
Q_CHARSET_CODE = 0x04
23+
Q_TIME_ZONE_CODE = 0x05
24+
Q_CATALOG_NZ_CODE = 0x06
25+
Q_LC_TIME_NAMES_CODE = 0x07
26+
Q_CHARSET_DATABASE_CODE = 0x08
27+
Q_TABLE_MAP_FOR_UPDATE_CODE = 0x09
28+
Q_MASTER_DATA_WRITTEN_CODE = 0x0A
29+
Q_INVOKER = 0x0B
30+
Q_UPDATED_DB_NAMES = 0x0C
31+
Q_MICROSECONDS = 0x0D
32+
Q_COMMIT_TS = 0x0E
33+
Q_COMMIT_TS2 = 0X0F
34+
Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP = 0X10
35+
Q_DDL_LOGGED_WITH_XID = 0X11
36+
Q_DEFAULT_COLLATION_FOR_UTF8MB4 = 0X12
37+
Q_SQL_REQUIRE_PRIMARY_KEY = 0X13
38+
Q_DEFAULT_TABLE_ENCRYPTION = 0X14

pymysqlreplication/constants/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22

33
from .BINLOG import *
44
from .FIELD_TYPE import *
5+
from .STATUS_VAR_KEY import *

pymysqlreplication/event.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import binascii
44
import struct
55
import datetime
6+
from pymysqlreplication.constants.STATUS_VAR_KEY import *
67

78

89
class BinLogEvent(object):
@@ -167,7 +168,13 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
167168
self.status_vars_length = self.packet.read_uint16()
168169

169170
# Payload
170-
self.status_vars = self.packet.read(self.status_vars_length)
171+
status_vars_end_pos = self.packet.read_bytes + self.status_vars_length
172+
while self.packet.read_bytes < status_vars_end_pos: # while 남은 data length가 얼마만큼? OR read_bytes
173+
# read KEY for status variable
174+
status_vars_key = self.packet.read_uint8()
175+
# read VALUE for status variable
176+
self._read_status_vars_value_for_key(status_vars_key)
177+
171178
self.schema = self.packet.read(self.schema_length)
172179
self.packet.advance(1)
173180

@@ -181,6 +188,80 @@ def _dump(self):
181188
print("Execution time: %d" % (self.execution_time))
182189
print("Query: %s" % (self.query))
183190

191+
192+
# TODO: check if instance attribute with the same name already exists
193+
# TODO: put all the instace attribute in separate class? called status_vars
194+
# TODO: does length need to be remembered?
195+
# TODO: ref(mysql doc. and mysql-server) for each hunk
196+
def _read_status_vars_value_for_key(self, key):
197+
"""parse status variable VALUE for given KEY
198+
199+
A status variable in query events is a sequence of status KEY-VALUE pairs.
200+
Parsing logic from mysql-server source code edited by dongwook-chan
201+
https://github.com/mysql/mysql-server/blob/beb865a960b9a8a16cf999c323e46c5b0c67f21f/libbinlogevents/src/statement_events.cpp#L181-L336
202+
203+
Args:
204+
key: key for status variable
205+
"""
206+
if key == Q_FLAGS2_CODE: # 0x00
207+
self.flags2 = self.packet.read_uint32()
208+
elif key == Q_SQL_MODE_CODE: # 0x01
209+
self.sql_mode = self.packet.read_uint64()
210+
elif key == Q_CATALOG_CODE: # 0x02 for MySQL 5.0.x
211+
pass
212+
elif key == Q_AUTO_INCREMENT: # 0x03
213+
self.auto_increment_increment = self.packet.read_uint16()
214+
self.auto_increment_offset = self.packet.read_uint16()
215+
elif key == Q_CHARSET_CODE: # 0x04
216+
self.character_set_client = self.packet.read_uint16()
217+
self.collation_connection = self.packet.read_uint16()
218+
self.collation_server = self.packet.read_uint16()
219+
elif key == Q_TIME_ZONE_CODE: # 0x05
220+
time_zone_len = self.packet.read_uint8()
221+
if time_zone_len:
222+
self.time_zone = self.packet.read(time_zone_len)
223+
elif key == Q_CATALOG_NZ_CODE: # 0x06
224+
catalog_len = self.packet.read_uint8()
225+
if catalog_len:
226+
self.catalog_nz_code = self.packet.read(catalog_len)
227+
elif key == Q_LC_TIME_NAMES_CODE: # 0x07
228+
self.lc_time_names_number = self.packet.read_uint16()
229+
elif key == Q_CHARSET_DATABASE_CODE: # 0x08
230+
self.charset_database_number = self.packet.read_uint16()
231+
elif key == Q_TABLE_MAP_FOR_UPDATE_CODE: # 0x09
232+
self.table_map_for_update = self.packet.read_uint64()
233+
elif key == Q_MASTER_DATA_WRITTEN_CODE: # 0x0A
234+
pass
235+
elif key == Q_INVOKER: # 0x0B
236+
user_len = self.packet.read_uint8()
237+
if user_len:
238+
self.user = self.packet.read(user_len)
239+
host_len = self.packet.read_uint8()
240+
if host_len:
241+
self.host = self.packet.read(host_len)
242+
elif key == Q_UPDATED_DB_NAMES: # 0x0C
243+
mts_accessed_dbs = self.packet.read_uint8()
244+
dbs = []
245+
for i in range(mts_accessed_dbs):
246+
db = self.packet.read_string()
247+
dbs.append(db)
248+
self.mts_accessed_db_names = dbs
249+
elif key == Q_MICROSECONDS: # 0x0D
250+
self.microseconds = self.packet.read_uint24()
251+
elif key == Q_COMMIT_TS: # 0x0E
252+
pass
253+
elif key == Q_COMMIT_TS2: # 0x0F
254+
pass
255+
elif key == Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:# 0x10
256+
self.explicit_defaults_ts = self.packet.read_uint8()
257+
elif key == Q_DDL_LOGGED_WITH_XID: # 0x11
258+
self.ddl_xid = self.packet.read_uint64()
259+
elif key == Q_DEFAULT_COLLATION_FOR_UTF8MB4: # 0x12
260+
self.default_collation_for_utf8mb4_number = self.packet.read_uint16()
261+
elif key == Q_SQL_REQUIRE_PRIMARY_KEY: # 0x13
262+
self.sql_require_primary_key = self.packet.read_uint8()
263+
elif key == Q_DEFAULT_TABLE_ENCRYPTION: # 0x14
264+
self.default_table_encryption = self.packet.read_uint8()
184265

185266
class BeginLoadQueryEvent(BinLogEvent):
186267
"""

pymysqlreplication/packet.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,3 +461,20 @@ def _read(x):
461461
return self.read_binary_json_type(x[0], length)
462462

463463
return [_read(x) for x in values_type_offset_inline]
464+
465+
def read_string(self):
466+
"""Read a 'Length Coded String' from the data buffer.
467+
468+
Read __data_buffer until NULL character (0 = \0 = \x00)
469+
470+
Returns:
471+
Binary string parsed from __data_buffer
472+
"""
473+
string = b''
474+
while True:
475+
char = self.read(1)
476+
if char == b'\0':
477+
break
478+
string += char
479+
480+
return string

pymysqlreplication/tests/test_data_type.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,26 @@ def create_and_insert_value(self, create_query, insert_query):
5757
self.assertIsInstance(event, WriteRowsEvent)
5858
return event
5959

60+
def create_table(self, create_query):
61+
"""Create table
62+
63+
Create table in db and return query event.
64+
65+
Returns:
66+
Query event
67+
"""
68+
69+
self.execute(create_query)
70+
71+
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
72+
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
73+
74+
event = self.stream.fetchone()
75+
76+
self.assertEqual(event.event_type, QUERY_EVENT)
77+
78+
return event
79+
6080
def test_decimal(self):
6181
create_query = "CREATE TABLE test (test DECIMAL(2,1))"
6282
insert_query = "INSERT INTO test VALUES(4.2)"
@@ -641,5 +661,24 @@ def test_partition_id(self):
641661
self.assertEqual(event.extra_data_type, 1)
642662
self.assertEqual(event.partition_id, 3)
643663

664+
def test_status_vars(self):
665+
"""Test parse of status variables in query events
666+
667+
Majority of status variables available depends on the settings of db.
668+
Therefore, this test only tests system variable values independent from settings of db.
669+
Note that if you change default db name 'pymysqlreplication_test',
670+
event.mts_accessed_db_names MUST be asserted against the changed db name.
671+
672+
Returns:
673+
binary string parsed from __data_buffer
674+
675+
Raises:
676+
AssertionError: if no
677+
"""
678+
create_query = "CREATE TABLE test (id INTEGER)"
679+
event = self.create_table(create_query)
680+
self.assertEqual(event.catalog_nz_code, b'std')
681+
self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test'])
682+
644683
if __name__ == "__main__":
645684
unittest.main()

0 commit comments

Comments
 (0)