Skip to content

Feature : rows query log event #416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

35 changes: 17 additions & 18 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
# -*- coding: utf-8 -*-

import pymysql
import struct
from distutils.version import LooseVersion

import pymysql
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
from pymysql.cursors import DictCursor

from .packet import BinLogPacketWrapper
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
from .gtid import GtidSet
from .event import (
QueryEvent, RotateEvent, FormatDescriptionEvent,
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent)
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent)
from .exceptions import BinLogNotEnabled
from .gtid import GtidSet
from .packet import BinLogPacketWrapper
from .row_event import (
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)

Expand All @@ -33,7 +33,6 @@


class ReportSlave(object):

"""Represent the values that you may report when connecting as a slave
to a master. SHOW SLAVE HOSTS related"""

Expand Down Expand Up @@ -68,7 +67,7 @@ def __init__(self, value):
self.hostname = value

def __repr__(self):
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' %\
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' % \
(self.hostname, self.username, self.password, self.port)

def encoded(self, server_id, master_id=0):
Expand Down Expand Up @@ -123,7 +122,6 @@ def encoded(self, server_id, master_id=0):


class BinLogStreamReader(object):

"""Connect to replication stream and read event
"""
report_slave = None
Expand Down Expand Up @@ -316,7 +314,7 @@ def __connect_to_stream(self):
4294967))
# If heartbeat is too low, the connection will disconnect before,
# this is also the behavior in mysql
heartbeat = float(min(net_timeout/2., self.slave_heartbeat))
heartbeat = float(min(net_timeout / 2., self.slave_heartbeat))
if heartbeat > 4294967:
heartbeat = 4294967

Expand Down Expand Up @@ -352,7 +350,7 @@ def __connect_to_stream(self):
cur.close()

prelude = struct.pack('<i', len(self.log_file) + 11) \
+ bytes(bytearray([COM_BINLOG_DUMP]))
+ bytes(bytearray([COM_BINLOG_DUMP]))

if self.__resume_stream:
prelude += struct.pack('<I', self.log_pos)
Expand All @@ -369,7 +367,7 @@ def __connect_to_stream(self):
prelude += self.log_file.encode()
else:
if self.is_mariadb:
prelude = self.__set_mariadb_settings()
prelude = self.__set_mariadb_settings()
else:
# Format for mysql packet master_auto_position
#
Expand Down Expand Up @@ -416,8 +414,8 @@ def __connect_to_stream(self):
8 + # binlog_pos_info_size
4) # encoded_data_size

prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
prelude = b'' + struct.pack('<i', header_size + encoded_data_size) \
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))

flags = 0
if not self.__blocking:
Expand Down Expand Up @@ -454,7 +452,7 @@ def __connect_to_stream(self):
def __set_mariadb_settings(self):
# https://mariadb.com/kb/en/5-slave-registration/
cur = self._stream_connection.cursor()
if self.auto_position != None :
if self.auto_position != None:
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
cur.execute("SET @slave_gtid_strict_mode=1")
cur.execute("SET @slave_gtid_ignore_duplicates=0")
Expand All @@ -465,7 +463,7 @@ def __set_mariadb_settings(self):
4 + # binlog pos
2 + # binlog flags
4 + # slave server_id,
4 # requested binlog file name , set it to empty
4 # requested binlog file name , set it to empty
)

prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))
Expand All @@ -477,11 +475,11 @@ def __set_mariadb_settings(self):

# Enable annotate rows event
if self.__annotate_rows_event:
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT

if not self.__blocking:
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK

# binlog flags
prelude += struct.pack('<H', flags)

Expand Down Expand Up @@ -621,10 +619,11 @@ def _allowed_event_list(self, only_events, ignored_events,
HeartbeatLogEvent,
NotImplementedEvent,
MariadbGtidEvent,
RowsQueryLogEvent,
MariadbAnnotateRowsEvent,
RandEvent,
MariadbStartEncryptionEvent
))
MariadbStartEncryptionEvent,
))
if ignored_events is not None:
for e in ignored_events:
events.remove(e)
Expand Down
23 changes: 23 additions & 0 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ def _dump(self):
print("type: %d" % (self.type))
print("Value: %d" % (self.value))


class RandEvent(BinLogEvent):
"""
RandEvent is generated every time a statement uses the RAND() function.
Expand Down Expand Up @@ -488,6 +489,7 @@ def _dump(self):
print("seed1: %d" % (self.seed1))
print("seed2: %d" % (self.seed2))


class MariadbStartEncryptionEvent(BinLogEvent):
"""
Since MariaDB 10.1.7,
Expand Down Expand Up @@ -517,6 +519,27 @@ def _dump(self):
print(f"Nonce: {self.nonce}")


class RowsQueryLogEvent(BinLogEvent):
"""
Record original query for the row events in Row-Based Replication

More details are available in the MySQL Knowledge Base:
https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html

:ivar query_length: uint - Length of the SQL statement
:ivar query: str - The executed SQL statement
"""
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
self.query_length = self.packet.read_uint8()
self.query = self.packet.read(self.query_length).decode('utf-8')
def dump(self):
print("=== %s ===" % (self.__class__.__name__))
print("Query length: %d" % self.query_length)
print("Query: %s" % self.query)


class NotImplementedEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(NotImplementedEvent, self).__init__(
Expand Down
2 changes: 2 additions & 0 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class BinLogPacketWrapper(object):
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
constants.ROWS_QUERY_LOG_EVENT: event.RowsQueryLogEvent,
constants.RAND_EVENT: event.RandEvent,
# row_event
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
Expand All @@ -82,6 +83,7 @@ class BinLogPacketWrapper(object):

#5.6 GTID enabled replication events
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
# MariaDB GTID
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
Expand Down
50 changes: 39 additions & 11 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# -*- coding: utf-8 -*-
import copy
import io
import os
import sys
import time

import pymysql
import copy
import time
import sys
import io

if sys.version_info < (2, 7):
import unittest2 as unittest
else:
Expand All @@ -15,22 +16,20 @@
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.gtid import GtidSet, Gtid
from pymysqlreplication.event import *
from pymysqlreplication.exceptions import TableMetadataUnavailableError
from pymysqlreplication.constants.BINLOG import *
from pymysqlreplication.row_event import *
from pathlib import Path

__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting"]
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"]


class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
return [GtidEvent]

def test_allowed_event_list(self):
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 19)
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 18)
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 18)
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 20)
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 19)
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 19)
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)

def test_read_query_event(self):
Expand Down Expand Up @@ -523,6 +522,7 @@ def test_end_log_pos(self):
self.assertEqual(last_log_pos, 888)
self.assertEqual(last_event_type, TABLE_MAP_EVENT)


class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
def ignoredEvents(self):
return [GtidEvent]
Expand Down Expand Up @@ -763,6 +763,7 @@ def test_alter_column(self):
self.assertEqual(event.rows[0]["values"]["data"], 'A value')
self.stream.fetchone() # insert with three values


class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase):

def setUp(self):
Expand Down Expand Up @@ -821,6 +822,7 @@ def test_separate_ctl_settings_no_error(self):
finally:
self.resetBinLog()


class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase):
def setUp(self):
super(TestGtidBinLogStreamReader, self).setUp()
Expand Down Expand Up @@ -940,6 +942,7 @@ def test_gtidset_representation_payload(self):

self.assertEqual(str(myset), str(parsedset))


class GtidTests(unittest.TestCase):
def test_ordering(self):
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56")
Expand Down Expand Up @@ -1007,6 +1010,7 @@ def test_parsing(self):
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1")
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1")


class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase):

def test_annotate_rows_event(self):
Expand Down Expand Up @@ -1072,7 +1076,8 @@ def test_start_encryption_event(self):
self.assertEqual(key_version, key_version_from_key_file)
self.assertEqual(type(nonce), bytes)
self.assertEqual(len(nonce), 12)



class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase):
def setUp(self):
super(TestStatementConnectionSetting, self).setUp()
Expand Down Expand Up @@ -1105,6 +1110,29 @@ def tearDown(self):
super(TestStatementConnectionSetting, self).tearDown()


class TestRowsQueryLogEvents(base.PyMySQLReplicationTestCase):
def setUp(self):
super(TestRowsQueryLogEvents, self).setUp()
self.execute("SET SESSION binlog_rows_query_log_events=1")

def tearDown(self):
self.execute("SET SESSION binlog_rows_query_log_events=0")
super(TestRowsQueryLogEvents, self).tearDown()

def test_rows_query_log_event(self):
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=[RowsQueryLogEvent],
)
self.execute("CREATE TABLE IF NOT EXISTS test (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255))")
self.execute("INSERT INTO test (name) VALUES ('Soul Lee')")
self.execute("COMMIT")
event = self.stream.fetchone()
self.assertIsInstance(event, RowsQueryLogEvent)


if __name__ == "__main__":
import unittest
unittest.main()