Skip to content

Commit 73e2eeb

Browse files
mjs1995heehehesean-k1starcat37mikaniz
authored
Developed UserVarEvent and Added Statement-Based Logging Test (#466)
Co-authored-by: heehehe <[email protected]> Co-authored-by: sean-k1 <[email protected]> Co-authored-by: starcat37 <[email protected]> Co-authored-by: minakiz <[email protected]>
1 parent c09ef8c commit 73e2eeb

File tree

4 files changed

+367
-41
lines changed

4 files changed

+367
-41
lines changed

Diff for: pymysqlreplication/binlogstream.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1515
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
1616
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent,
17-
MariadbGtidListEvent, MariadbBinLogCheckPointEvent)
17+
MariadbGtidListEvent, MariadbBinLogCheckPointEvent, UserVarEvent)
1818
from .exceptions import BinLogNotEnabled
1919
from .gtid import GtidSet
2020
from .packet import BinLogPacketWrapper
@@ -626,7 +626,8 @@ def _allowed_event_list(self, only_events, ignored_events,
626626
RandEvent,
627627
MariadbStartEncryptionEvent,
628628
MariadbGtidListEvent,
629-
MariadbBinLogCheckPointEvent
629+
MariadbBinLogCheckPointEvent,
630+
UserVarEvent
630631
))
631632
if ignored_events is not None:
632633
for e in ignored_events:

Diff for: pymysqlreplication/event.py

+145-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import binascii
44
import struct
55
import datetime
6+
import decimal
67
from pymysqlreplication.constants.STATUS_VAR_KEY import *
78
from pymysqlreplication.exceptions import StatusVariableMismatch
9+
from typing import Union, Optional
810

911

1012
class BinLogEvent(object):
@@ -51,7 +53,6 @@ def _dump(self):
5153
"""Core data dumped for the event"""
5254
pass
5355

54-
5556
class GtidEvent(BinLogEvent):
5657
"""GTID change in binlog event
5758
"""
@@ -519,7 +520,6 @@ class RandEvent(BinLogEvent):
519520
:ivar seed1: int - value for the first seed
520521
:ivar seed2: int - value for the second seed
521522
"""
522-
523523
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
524524
super().__init__(from_packet, event_size, table_map,
525525
ctl_connection, **kwargs)
@@ -542,6 +542,149 @@ def _dump(self):
542542
print("seed1: %d" % (self.seed1))
543543
print("seed2: %d" % (self.seed2))
544544

545+
class UserVarEvent(BinLogEvent):
546+
"""
547+
UserVarEvent is generated every time a statement uses a user variable.
548+
Indicates the value to use for the user variable in the next statement.
549+
550+
:ivar name_len: int - Length of user variable
551+
:ivar name: str - User variable name
552+
:ivar value: str - Value of the user variable
553+
:ivar type: int - Type of the user variable
554+
:ivar charset: int - The number of the character set for the user variable
555+
:ivar is_null: int - Non-zero if the variable value is the SQL NULL value, 0 otherwise
556+
:ivar flags: int - Extra flags associated with the user variable
557+
"""
558+
559+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
560+
super(UserVarEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
561+
562+
# Payload
563+
self.name_len: int = self.packet.read_uint32()
564+
self.name: str = self.packet.read(self.name_len).decode()
565+
self.is_null: int = self.packet.read_uint8()
566+
self.type_to_codes_and_method: dict = {
567+
0x00: ['STRING_RESULT', self._read_string],
568+
0x01: ['REAL_RESULT', self._read_real],
569+
0x02: ['INT_RESULT', self._read_int],
570+
0x03: ['ROW_RESULT', self._read_default],
571+
0x04: ['DECIMAL_RESULT', self._read_decimal]
572+
}
573+
574+
self.value: Optional[Union[str, float, int, decimal.Decimal]] = None
575+
self.flags: Optional[int] = None
576+
self.temp_value_buffer: Union[bytes, memoryview] = b''
577+
578+
if not self.is_null:
579+
self.type: int = self.packet.read_uint8()
580+
self.charset: int = self.packet.read_uint32()
581+
self.value_len: int = self.packet.read_uint32()
582+
self.temp_value_buffer: Union[bytes, memoryview] = self.packet.read(self.value_len)
583+
self.flags: int = self.packet.read_uint8()
584+
self._set_value_from_temp_buffer()
585+
else:
586+
self.type, self.charset, self.value_len, self.value, self.flags = None, None, None, None, None
587+
588+
def _set_value_from_temp_buffer(self):
589+
"""
590+
Set the value from the temporary buffer based on the type code.
591+
"""
592+
if self.temp_value_buffer:
593+
type_code, read_method = self.type_to_codes_and_method.get(self.type, ["UNKNOWN_RESULT", self._read_default])
594+
if type_code == 'INT_RESULT':
595+
self.value = read_method(self.temp_value_buffer, self.flags)
596+
else:
597+
self.value = read_method(self.temp_value_buffer)
598+
599+
def _read_string(self, buffer: bytes) -> str:
600+
"""
601+
Read string data.
602+
"""
603+
return buffer.decode()
604+
605+
def _read_real(self, buffer: bytes) -> float:
606+
"""
607+
Read real data.
608+
"""
609+
return struct.unpack('<d', buffer)[0]
610+
611+
def _read_int(self, buffer: bytes, flags: int) -> int:
612+
"""
613+
Read integer data.
614+
"""
615+
fmt = '<Q' if flags == 1 else '<q'
616+
return struct.unpack(fmt, buffer)[0]
617+
618+
def _read_decimal(self, buffer: bytes) -> decimal.Decimal:
619+
"""
620+
Read decimal data.
621+
"""
622+
self.precision = self.temp_value_buffer[0]
623+
self.decimals = self.temp_value_buffer[1]
624+
raw_decimal = self.temp_value_buffer[2:]
625+
return self._parse_decimal_from_bytes(raw_decimal, self.precision, self.decimals)
626+
627+
def _read_default(self) -> bytes:
628+
"""
629+
Read default data.
630+
Used when the type is None.
631+
"""
632+
return self.packet.read(self.value_len)
633+
634+
@staticmethod
635+
def _parse_decimal_from_bytes(raw_decimal: bytes, precision: int, decimals: int) -> decimal.Decimal:
636+
"""
637+
Parse decimal from bytes.
638+
"""
639+
digits_per_integer = 9
640+
compressed_bytes = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4]
641+
integral = precision - decimals
642+
643+
uncomp_integral, comp_integral = divmod(integral, digits_per_integer)
644+
uncomp_fractional, comp_fractional = divmod(decimals, digits_per_integer)
645+
646+
res = "-" if not raw_decimal[0] & 0x80 else ""
647+
mask = -1 if res == "-" else 0
648+
raw_decimal = bytearray([raw_decimal[0] ^ 0x80]) + raw_decimal[1:]
649+
650+
def decode_decimal_decompress_value(comp_indx, data, mask):
651+
size = compressed_bytes[comp_indx]
652+
if size > 0:
653+
databuff = bytearray(data[:size])
654+
for i in range(size):
655+
databuff[i] = (databuff[i] ^ mask) & 0xFF
656+
return size, int.from_bytes(databuff, byteorder='big')
657+
return 0, 0
658+
659+
pointer, value = decode_decimal_decompress_value(comp_integral, raw_decimal, mask)
660+
res += str(value)
661+
662+
for _ in range(uncomp_integral):
663+
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
664+
res += '%09d' % value
665+
pointer += 4
666+
667+
res += "."
668+
669+
for _ in range(uncomp_fractional):
670+
value = struct.unpack('>i', raw_decimal[pointer:pointer+4])[0] ^ mask
671+
res += '%09d' % value
672+
pointer += 4
673+
674+
size, value = decode_decimal_decompress_value(comp_fractional, raw_decimal[pointer:], mask)
675+
if size > 0:
676+
res += '%0*d' % (comp_fractional, value)
677+
return decimal.Decimal(res)
678+
679+
def _dump(self) -> None:
680+
super(UserVarEvent, self)._dump()
681+
print("User variable name: %s" % self.name)
682+
print("Is NULL: %s" % ("Yes" if self.is_null else "No"))
683+
if not self.is_null:
684+
print("Type: %s" % self.type_to_codes_and_method.get(self.type, ['UNKNOWN_TYPE'])[0])
685+
print("Charset: %s" % self.charset)
686+
print("Value: %s" % self.value)
687+
print("Flags: %s" % self.flags)
545688

546689
class MariadbStartEncryptionEvent(BinLogEvent):
547690
"""

Diff for: pymysqlreplication/packet.py

+1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class BinLogPacketWrapper(object):
7272
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
7373
constants.ROWS_QUERY_LOG_EVENT: event.RowsQueryLogEvent,
7474
constants.RAND_EVENT: event.RandEvent,
75+
constants.USER_VAR_EVENT: event.UserVarEvent,
7576
# row_event
7677
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
7778
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,

0 commit comments

Comments
 (0)