Skip to content

Commit c40f6f4

Browse files
Developed the Mariadbannotate ros event (julien-duponchelle#412)
1 parent ebc84d9 commit c40f6f4

File tree

7 files changed

+173
-63
lines changed

7 files changed

+173
-63
lines changed

docker-compose.yml

+14
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,17 @@ services:
1515
ports:
1616
- 3307:3307
1717
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
18+
19+
mariadb-10.6:
20+
image: mariadb:10.6
21+
environment:
22+
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
23+
ports:
24+
- "3308:3306"
25+
command: |
26+
--server-id=1
27+
--default-authentication-plugin=mysql_native_password
28+
--log-bin=master-bin
29+
--binlog-format=row
30+
--log-slave-updates=on
31+

examples/mariadb_gtid/read_event.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pymysql
22

33
from pymysqlreplication import BinLogStreamReader, gtid
4-
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent
4+
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent
55
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
66

77
MARIADB_SETTINGS = {
@@ -65,10 +65,12 @@ def query_server_id(self):
6565
RotateEvent,
6666
WriteRowsEvent,
6767
UpdateRowsEvent,
68-
DeleteRowsEvent
68+
DeleteRowsEvent,
69+
MariadbAnnotateRowsEvent
6970
],
7071
auto_position=gtid,
71-
is_mariadb=True
72+
is_mariadb=True,
73+
annotate_rows_event=True
7274
)
7375

7476
print('Starting reading events from GTID ', gtid)

pymysqlreplication/binlogstream.py

+75-55
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
1515
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
17-
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
17+
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
18+
MariadbAnnotateRowsEvent)
1819
from .exceptions import BinLogNotEnabled
1920
from .row_event import (
2021
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
@@ -141,6 +142,7 @@ def __init__(self, connection_settings, server_id,
141142
fail_on_table_metadata_unavailable=False,
142143
slave_heartbeat=None,
143144
is_mariadb=False,
145+
annotate_rows_event=False,
144146
ignore_decode_errors=False):
145147
"""
146148
Attributes:
@@ -178,6 +180,8 @@ def __init__(self, connection_settings, server_id,
178180
for semantics
179181
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
180182
to point to Mariadb specific GTID.
183+
annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
184+
used with 'is_mariadb'
181185
ignore_decode_errors: If true, any decode errors encountered
182186
when reading column data will be ignored.
183187
"""
@@ -219,6 +223,7 @@ def __init__(self, connection_settings, server_id,
219223
self.auto_position = auto_position
220224
self.skip_to_timestamp = skip_to_timestamp
221225
self.is_mariadb = is_mariadb
226+
self.__annotate_rows_event = annotate_rows_event
222227

223228
if end_log_pos:
224229
self.is_past_end_log_pos = False
@@ -331,67 +336,39 @@ def __connect_to_stream(self):
331336
self._register_slave()
332337

333338
if not self.auto_position:
334-
# only when log_file and log_pos both provided, the position info is
335-
# valid, if not, get the current position from master
336-
if self.log_file is None or self.log_pos is None:
337-
cur = self._stream_connection.cursor()
338-
cur.execute("SHOW MASTER STATUS")
339-
master_status = cur.fetchone()
340-
if master_status is None:
341-
raise BinLogNotEnabled()
342-
self.log_file, self.log_pos = master_status[:2]
343-
cur.close()
344-
345-
prelude = struct.pack('<i', len(self.log_file) + 11) \
346-
+ bytes(bytearray([COM_BINLOG_DUMP]))
347-
348-
if self.__resume_stream:
349-
prelude += struct.pack('<I', self.log_pos)
350-
else:
351-
prelude += struct.pack('<I', 4)
352-
353-
flags = 0
354-
if not self.__blocking:
355-
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
356-
prelude += struct.pack('<H', flags)
357-
358-
prelude += struct.pack('<I', self.__server_id)
359-
prelude += self.log_file.encode()
360-
else:
361339
if self.is_mariadb:
362-
# https://mariadb.com/kb/en/5-slave-registration/
363-
cur = self._stream_connection.cursor()
364-
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
365-
cur.execute("SET @slave_gtid_strict_mode=1")
366-
cur.execute("SET @slave_gtid_ignore_duplicates=0")
367-
cur.close()
368-
369-
# https://mariadb.com/kb/en/com_binlog_dump/
370-
header_size = (
371-
4 + # binlog pos
372-
2 + # binlog flags
373-
4 + # slave server_id,
374-
4 # requested binlog file name , set it to empty
375-
)
376-
377-
prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))
378-
379-
# binlog pos
380-
prelude += struct.pack('<i', 4)
340+
prelude = self.__set_mariadb_settings()
341+
else:
342+
# only when log_file and log_pos both provided, the position info is
343+
# valid, if not, get the current position from master
344+
if self.log_file is None or self.log_pos is None:
345+
cur = self._stream_connection.cursor()
346+
cur.execute("SHOW MASTER STATUS")
347+
master_status = cur.fetchone()
348+
if master_status is None:
349+
raise BinLogNotEnabled()
350+
self.log_file, self.log_pos = master_status[:2]
351+
cur.close()
352+
353+
prelude = struct.pack('<i', len(self.log_file) + 11) \
354+
+ bytes(bytearray([COM_BINLOG_DUMP]))
355+
356+
if self.__resume_stream:
357+
prelude += struct.pack('<I', self.log_pos)
358+
else:
359+
prelude += struct.pack('<I', 4)
381360

382361
flags = 0
362+
383363
if not self.__blocking:
384364
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
385-
386-
# binlog flags
387365
prelude += struct.pack('<H', flags)
388366

389-
# server id (4 bytes)
390367
prelude += struct.pack('<I', self.__server_id)
391-
392-
# empty_binlog_name (4 bytes)
393-
prelude += b'\0\0\0\0'
394-
368+
prelude += self.log_file.encode()
369+
else:
370+
if self.is_mariadb:
371+
prelude = self.__set_mariadb_settings()
395372
else:
396373
# Format for mysql packet master_auto_position
397374
#
@@ -473,6 +450,48 @@ def __connect_to_stream(self):
473450
self._stream_connection._next_seq_id = 1
474451
self.__connected_stream = True
475452

453+
def __set_mariadb_settings(self):
454+
# https://mariadb.com/kb/en/5-slave-registration/
455+
cur = self._stream_connection.cursor()
456+
if self.auto_position != None :
457+
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
458+
cur.execute("SET @slave_gtid_strict_mode=1")
459+
cur.execute("SET @slave_gtid_ignore_duplicates=0")
460+
cur.close()
461+
462+
# https://mariadb.com/kb/en/com_binlog_dump/
463+
header_size = (
464+
4 + # binlog pos
465+
2 + # binlog flags
466+
4 + # slave server_id,
467+
4 # requested binlog file name , set it to empty
468+
)
469+
470+
prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))
471+
472+
# binlog pos
473+
prelude += struct.pack('<i', 4)
474+
475+
flags = 0
476+
477+
# Enable annotate rows event
478+
if self.__annotate_rows_event:
479+
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
480+
481+
if not self.__blocking:
482+
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
483+
484+
# binlog flags
485+
prelude += struct.pack('<H', flags)
486+
487+
# server id (4 bytes)
488+
prelude += struct.pack('<I', self.__server_id)
489+
490+
# empty_binlog_name (4 bytes)
491+
prelude += b'\0\0\0\0'
492+
493+
return prelude
494+
476495
def fetchone(self):
477496
while True:
478497
if self.end_log_pos and self.is_past_end_log_pos:
@@ -600,7 +619,8 @@ def _allowed_event_list(self, only_events, ignored_events,
600619
TableMapEvent,
601620
HeartbeatLogEvent,
602621
NotImplementedEvent,
603-
MariadbGtidEvent
622+
MariadbGtidEvent,
623+
MariadbAnnotateRowsEvent
604624
))
605625
if ignored_events is not None:
606626
for e in ignored_events:

pymysqlreplication/event.py

+18
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,24 @@ def _dump(self):
111111
print('GTID:', self.gtid)
112112

113113

114+
class MariadbAnnotateRowsEvent(BinLogEvent):
115+
"""
116+
Annotate rows event
117+
If you want to check this binlog, change the value of the flag(line 382 of the 'binlogstream.py') option to 2
118+
https://mariadb.com/kb/en/annotate_rows_event/
119+
120+
Attributes:
121+
sql_statement: The SQL statement
122+
"""
123+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
124+
super(MariadbAnnotateRowsEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
125+
self.sql_statement = self.packet.read(event_size)
126+
127+
def _dump(self):
128+
super(MariadbAnnotateRowsEvent, self)._dump()
129+
print("SQL statement :", self.sql_statement)
130+
131+
114132
class RotateEvent(BinLogEvent):
115133
"""Change MySQL bin log file
116134

pymysqlreplication/packet.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class BinLogPacketWrapper(object):
8383
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
8484
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
8585
# MariaDB GTID
86-
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent,
86+
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
8787
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
8888
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
8989
constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent,

pymysqlreplication/tests/base.py

+26
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,29 @@ def bin_log_basename(self):
121121
bin_log_basename = cursor.fetchone()[0]
122122
bin_log_basename = bin_log_basename.split("/")[-1]
123123
return bin_log_basename
124+
125+
126+
class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase):
127+
def setUp(self):
128+
# default
129+
self.database = {
130+
"host": "localhost",
131+
"user": "root",
132+
"passwd": "",
133+
"port": 3308,
134+
"use_unicode": True,
135+
"charset": "utf8",
136+
"db": "pymysqlreplication_test"
137+
}
138+
139+
self.conn_control = None
140+
db = copy.copy(self.database)
141+
db["db"] = None
142+
self.connect_conn_control(db)
143+
self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
144+
self.execute("CREATE DATABASE pymysqlreplication_test")
145+
db = copy.copy(self.database)
146+
self.connect_conn_control(db)
147+
self.stream = None
148+
self.resetBinLog()
149+

pymysqlreplication/tests/test_basic.py

+34-4
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717
from pymysqlreplication.constants.BINLOG import *
1818
from pymysqlreplication.row_event import *
1919

20-
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"]
20+
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader","TestMariadbBinlogStreamReader"]
2121

2222

2323
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
2424
def ignoredEvents(self):
2525
return [GtidEvent]
2626

2727
def test_allowed_event_list(self):
28-
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 16)
29-
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 15)
30-
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 15)
28+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17)
29+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16)
30+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16)
3131
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
3232

3333
def test_read_query_event(self):
@@ -1002,6 +1002,36 @@ def test_parsing(self):
10021002
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1")
10031003
gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1")
10041004

1005+
class TestMariadbBinlogStreamReader(base.PyMySQLReplicationMariaDbTestCase):
1006+
1007+
def test_annotate_rows_event(self):
1008+
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
1009+
self.execute(query)
1010+
# Insert first event
1011+
query = "BEGIN;"
1012+
self.execute(query)
1013+
insert_query = b"INSERT INTO test (id, data) VALUES(1, 'Hello')"
1014+
self.execute(insert_query)
1015+
query = "COMMIT;"
1016+
self.execute(query)
1017+
1018+
self.stream.close()
1019+
self.stream = BinLogStreamReader(
1020+
self.database,
1021+
server_id=1024,
1022+
blocking=False,
1023+
only_events=[MariadbAnnotateRowsEvent],
1024+
is_mariadb=True,
1025+
annotate_rows_event=True,
1026+
)
1027+
1028+
event = self.stream.fetchone()
1029+
#Check event type 160,MariadbAnnotateRowsEvent
1030+
self.assertEqual(event.event_type,160)
1031+
#Check self.sql_statement
1032+
self.assertEqual(event.sql_statement,insert_query)
1033+
self.assertIsInstance(event,MariadbAnnotateRowsEvent)
1034+
10051035

10061036
if __name__ == "__main__":
10071037
import unittest

0 commit comments

Comments
 (0)