Skip to content

Commit dc190cb

Browse files
mirageoasissoulee-devwonkajindavinc71998
authored
Feature : rows query log event (julien-duponchelle#416)
* Implement ROWS_QUERY_LOG_EVENT * Change event count because new event added * Add test for rows_query_log_event * Add TestRowsQueryLogEvents class into __all__ * fix binlog_rows_query_log_events apply option for test * revert port setting * fix super method * comment: add docstring for event class * fix: add SESSION keyword for set binlog_rows_query_log_events * fixed : fixed test_basic.py test_allowed_event_list method --------- Co-authored-by: Soul Lee <[email protected]> Co-authored-by: lre12 <[email protected]> Co-authored-by: davinc71998 <[email protected]>
1 parent 4c2dcf2 commit dc190cb

File tree

4 files changed

+81
-29
lines changed

4 files changed

+81
-29
lines changed

Diff for: pymysqlreplication/binlogstream.py

+17-18
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
# -*- coding: utf-8 -*-
22

3-
import pymysql
43
import struct
54
from distutils.version import LooseVersion
65

6+
import pymysql
77
from pymysql.constants.COMMAND import COM_BINLOG_DUMP, COM_REGISTER_SLAVE
88
from pymysql.cursors import DictCursor
99

10-
from .packet import BinLogPacketWrapper
1110
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, FORMAT_DESCRIPTION_EVENT
12-
from .gtid import GtidSet
1311
from .event import (
1412
QueryEvent, RotateEvent, FormatDescriptionEvent,
1513
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1614
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1715
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
18-
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent)
16+
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent)
1917
from .exceptions import BinLogNotEnabled
18+
from .gtid import GtidSet
19+
from .packet import BinLogPacketWrapper
2020
from .row_event import (
2121
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
2222

@@ -33,7 +33,6 @@
3333

3434

3535
class ReportSlave(object):
36-
3736
"""Represent the values that you may report when connecting as a slave
3837
to a master. SHOW SLAVE HOSTS related"""
3938

@@ -68,7 +67,7 @@ def __init__(self, value):
6867
self.hostname = value
6968

7069
def __repr__(self):
71-
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' %\
70+
return '<ReportSlave hostname=%s username=%s password=%s port=%d>' % \
7271
(self.hostname, self.username, self.password, self.port)
7372

7473
def encoded(self, server_id, master_id=0):
@@ -123,7 +122,6 @@ def encoded(self, server_id, master_id=0):
123122

124123

125124
class BinLogStreamReader(object):
126-
127125
"""Connect to replication stream and read event
128126
"""
129127
report_slave = None
@@ -317,7 +315,7 @@ def __connect_to_stream(self):
317315
4294967))
318316
# If heartbeat is too low, the connection will disconnect before,
319317
# this is also the behavior in mysql
320-
heartbeat = float(min(net_timeout/2., self.slave_heartbeat))
318+
heartbeat = float(min(net_timeout / 2., self.slave_heartbeat))
321319
if heartbeat > 4294967:
322320
heartbeat = 4294967
323321

@@ -353,7 +351,7 @@ def __connect_to_stream(self):
353351
cur.close()
354352

355353
prelude = struct.pack('<i', len(self.log_file) + 11) \
356-
+ bytes(bytearray([COM_BINLOG_DUMP]))
354+
+ bytes(bytearray([COM_BINLOG_DUMP]))
357355

358356
if self.__resume_stream:
359357
prelude += struct.pack('<I', self.log_pos)
@@ -370,7 +368,7 @@ def __connect_to_stream(self):
370368
prelude += self.log_file.encode()
371369
else:
372370
if self.is_mariadb:
373-
prelude = self.__set_mariadb_settings()
371+
prelude = self.__set_mariadb_settings()
374372
else:
375373
# Format for mysql packet master_auto_position
376374
#
@@ -417,8 +415,8 @@ def __connect_to_stream(self):
417415
8 + # binlog_pos_info_size
418416
4) # encoded_data_size
419417

420-
prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
421-
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
418+
prelude = b'' + struct.pack('<i', header_size + encoded_data_size) \
419+
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
422420

423421
flags = 0
424422
if not self.__blocking:
@@ -455,7 +453,7 @@ def __connect_to_stream(self):
455453
def __set_mariadb_settings(self):
456454
# https://mariadb.com/kb/en/5-slave-registration/
457455
cur = self._stream_connection.cursor()
458-
if self.auto_position != None :
456+
if self.auto_position != None:
459457
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
460458
cur.execute("SET @slave_gtid_strict_mode=1")
461459
cur.execute("SET @slave_gtid_ignore_duplicates=0")
@@ -466,7 +464,7 @@ def __set_mariadb_settings(self):
466464
4 + # binlog pos
467465
2 + # binlog flags
468466
4 + # slave server_id,
469-
4 # requested binlog file name , set it to empty
467+
4 # requested binlog file name , set it to empty
470468
)
471469

472470
prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))
@@ -478,11 +476,11 @@ def __set_mariadb_settings(self):
478476

479477
# Enable annotate rows event
480478
if self.__annotate_rows_event:
481-
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
479+
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
482480

483481
if not self.__blocking:
484482
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
485-
483+
486484
# binlog flags
487485
prelude += struct.pack('<H', flags)
488486

@@ -622,10 +620,11 @@ def _allowed_event_list(self, only_events, ignored_events,
622620
HeartbeatLogEvent,
623621
NotImplementedEvent,
624622
MariadbGtidEvent,
623+
RowsQueryLogEvent,
625624
MariadbAnnotateRowsEvent,
626625
RandEvent,
627-
MariadbStartEncryptionEvent
628-
))
626+
MariadbStartEncryptionEvent,
627+
))
629628
if ignored_events is not None:
630629
for e in ignored_events:
631630
events.remove(e)

Diff for: pymysqlreplication/event.py

+23
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ def _dump(self):
454454
print("type: %d" % (self.type))
455455
print("Value: %d" % (self.value))
456456

457+
457458
class RandEvent(BinLogEvent):
458459
"""
459460
RandEvent is generated every time a statement uses the RAND() function.
@@ -488,6 +489,7 @@ def _dump(self):
488489
print("seed1: %d" % (self.seed1))
489490
print("seed2: %d" % (self.seed2))
490491

492+
491493
class MariadbStartEncryptionEvent(BinLogEvent):
492494
"""
493495
Since MariaDB 10.1.7,
@@ -517,6 +519,27 @@ def _dump(self):
517519
print(f"Nonce: {self.nonce}")
518520

519521

522+
class RowsQueryLogEvent(BinLogEvent):
523+
"""
524+
Record original query for the row events in Row-Based Replication
525+
526+
More details are available in the MySQL Knowledge Base:
527+
https://dev.mysql.com/doc/dev/mysql-server/latest/classRows__query__log__event.html
528+
529+
:ivar query_length: uint - Length of the SQL statement
530+
:ivar query: str - The executed SQL statement
531+
"""
532+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
533+
super(RowsQueryLogEvent, self).__init__(from_packet, event_size, table_map,
534+
ctl_connection, **kwargs)
535+
self.query_length = self.packet.read_uint8()
536+
self.query = self.packet.read(self.query_length).decode('utf-8')
537+
def dump(self):
538+
print("=== %s ===" % (self.__class__.__name__))
539+
print("Query length: %d" % self.query_length)
540+
print("Query: %s" % self.query)
541+
542+
520543
class NotImplementedEvent(BinLogEvent):
521544
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
522545
super().__init__(

Diff for: pymysqlreplication/packet.py

+2
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class BinLogPacketWrapper(object):
7070
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
7171
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
7272
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
73+
constants.ROWS_QUERY_LOG_EVENT: event.RowsQueryLogEvent,
7374
constants.RAND_EVENT: event.RandEvent,
7475
# row_event
7576
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
@@ -82,6 +83,7 @@ class BinLogPacketWrapper(object):
8283

8384
#5.6 GTID enabled replication events
8485
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
86+
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
8587
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
8688
# MariaDB GTID
8789
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,

Diff for: pymysqlreplication/tests/test_basic.py

+39-11
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
# -*- coding: utf-8 -*-
2+
import copy
3+
import io
24
import os
5+
import sys
6+
import time
37

48
import pymysql
5-
import copy
6-
import time
7-
import sys
8-
import io
9+
910
if sys.version_info < (2, 7):
1011
import unittest2 as unittest
1112
else:
@@ -15,22 +16,20 @@
1516
from pymysqlreplication import BinLogStreamReader
1617
from pymysqlreplication.gtid import GtidSet, Gtid
1718
from pymysqlreplication.event import *
18-
from pymysqlreplication.exceptions import TableMetadataUnavailableError
1919
from pymysqlreplication.constants.BINLOG import *
2020
from pymysqlreplication.row_event import *
21-
from pathlib import Path
2221

23-
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting"]
22+
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"]
2423

2524

2625
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
2726
def ignoredEvents(self):
2827
return [GtidEvent]
2928

3029
def test_allowed_event_list(self):
31-
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 19)
32-
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 18)
33-
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 18)
30+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 20)
31+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 19)
32+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 19)
3433
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
3534

3635
def test_read_query_event(self):
@@ -523,6 +522,7 @@ def test_end_log_pos(self):
523522
self.assertEqual(last_log_pos, 888)
524523
self.assertEqual(last_event_type, TABLE_MAP_EVENT)
525524

525+
526526
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
527527
def ignoredEvents(self):
528528
return [GtidEvent]
@@ -763,6 +763,7 @@ def test_alter_column(self):
763763
self.assertEqual(event.rows[0]["values"]["data"], 'A value')
764764
self.stream.fetchone() # insert with three values
765765

766+
766767
class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase):
767768

768769
def setUp(self):
@@ -821,6 +822,7 @@ def test_separate_ctl_settings_no_error(self):
821822
finally:
822823
self.resetBinLog()
823824

825+
824826
class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase):
825827
def setUp(self):
826828
super().setUp()
@@ -940,6 +942,7 @@ def test_gtidset_representation_payload(self):
940942

941943
self.assertEqual(str(myset), str(parsedset))
942944

945+
943946
class GtidTests(unittest.TestCase):
944947
def test_ordering(self):
945948
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-56")
@@ -1007,6 +1010,7 @@ def test_parsing(self):
10071010
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1")
10081011
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1")
10091012

1013+
10101014
class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase):
10111015

10121016
def test_annotate_rows_event(self):
@@ -1072,7 +1076,8 @@ def test_start_encryption_event(self):
10721076
self.assertEqual(key_version, key_version_from_key_file)
10731077
self.assertEqual(type(nonce), bytes)
10741078
self.assertEqual(len(nonce), 12)
1075-
1079+
1080+
10761081
class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase):
10771082
def setUp(self):
10781083
super().setUp()
@@ -1105,6 +1110,29 @@ def tearDown(self):
11051110
super().tearDown()
11061111

11071112

1113+
class TestRowsQueryLogEvents(base.PyMySQLReplicationTestCase):
1114+
def setUp(self):
1115+
super(TestRowsQueryLogEvents, self).setUp()
1116+
self.execute("SET SESSION binlog_rows_query_log_events=1")
1117+
1118+
def tearDown(self):
1119+
self.execute("SET SESSION binlog_rows_query_log_events=0")
1120+
super(TestRowsQueryLogEvents, self).tearDown()
1121+
1122+
def test_rows_query_log_event(self):
1123+
self.stream.close()
1124+
self.stream = BinLogStreamReader(
1125+
self.database,
1126+
server_id=1024,
1127+
only_events=[RowsQueryLogEvent],
1128+
)
1129+
self.execute("CREATE TABLE IF NOT EXISTS test (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255))")
1130+
self.execute("INSERT INTO test (name) VALUES ('Soul Lee')")
1131+
self.execute("COMMIT")
1132+
event = self.stream.fetchone()
1133+
self.assertIsInstance(event, RowsQueryLogEvent)
1134+
1135+
11081136
if __name__ == "__main__":
11091137
import unittest
11101138
unittest.main()

0 commit comments

Comments
 (0)