Skip to content

Commit 7fd706d

Browse files
authored
Add support for MariaDB GTID (julien-duponchelle#366)
* Add MariaDBGtidEvent class * Add MariaDB specific events
1 parent 728e3d1 commit 7fd706d

File tree

7 files changed

+292
-73
lines changed

7 files changed

+292
-73
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
version: '3'
2+
3+
services:
4+
testdb:
5+
container_name: "testdb"
6+
image: mariadb:10.6
7+
environment:
8+
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
9+
MARIADB_DATABASE: mydb
10+
MARIADB_USER: replication_user
11+
MARIADB_PASSWORD: secret123passwd
12+
ports:
13+
- "3306:3306"
14+
command: |
15+
--server-id=1
16+
--default-authentication-plugin=mysql_native_password
17+
--log-bin=master-bin
18+
--binlog-format=row
19+
--log-slave-updates=on
20+
--binlog-do-db=mydb
21+
volumes:
22+
- ./queries.sql:/docker-entrypoint-initdb.d/queries.sql
23+
networks:
24+
- mariadb-cluster
25+
26+
networks:
27+
mariadb-cluster:
28+
ipam:
29+
driver: default
30+
config:
31+
- subnet: 172.200.0.0/24

examples/mariadb_gtid/queries.sql

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# configure replication user
2+
grant replication slave on *.* to 'replication_user'@'%';
3+
flush privileges;
4+
5+
# create objects
6+
create table r1 (
7+
i1 int auto_increment primary key,
8+
c1 varchar(10),
9+
d1 datetime default current_timestamp()
10+
);
11+
12+
insert into r1 (c1) values ('#1'),('#2'),('#3'),('#4'),('#5'),('#6'),('#7');
13+
14+
create table r2 (i2 int primary key, d2 datetime) ;
15+
insert into r2 (i2, d2) values (1, now());
16+
insert into r2 (i2, d2) values (2, now());
17+
insert into r2 (i2, d2) values (3, now());
18+
insert into r2 (i2, d2) values (4, now());
19+
20+
update r1 set c1=concat(c1, '-up');
21+
22+
select * from r2;
23+
24+
delete from r1 where i1 < 4;
25+
26+
drop table r2;
27+
28+
alter table r1 add column b1 bool default False;
29+
insert into r1 (c1, b1) values ('#8', True);

examples/mariadb_gtid/read_event.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import pymysql
2+
3+
from pymysqlreplication import BinLogStreamReader, gtid
4+
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent
5+
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
6+
7+
MARIADB_SETTINGS = {
8+
"host": "127.0.0.1",
9+
"port": 3306,
10+
"user": "replication_user",
11+
"passwd": "secret123passwd",
12+
}
13+
14+
15+
class MariaDbGTID:
16+
def __init__(self, conn_config):
17+
self.connection = pymysql.connect(**conn_config)
18+
19+
def query_single_value(self, sql: str):
20+
res = None
21+
22+
with self.connection.cursor() as cursor:
23+
cursor.execute(sql)
24+
row = cursor.fetchone()
25+
res = str(row[0])
26+
27+
return res
28+
29+
def extract_gtid(self, gtid: str, server_id: str):
30+
if gtid is None or server_id is None:
31+
return None
32+
33+
gtid_parts = gtid.split("-")
34+
35+
if len(gtid_parts) != 3:
36+
return None
37+
38+
if gtid_parts[1] == server_id:
39+
return gtid
40+
41+
return None
42+
43+
def query_gtid_current_pos(self, server_id: str):
44+
return self.extract_gtid(self.query_single_value("SELECT @@gtid_current_pos"), server_id)
45+
46+
def query_server_id(self):
47+
return int(self.query_single_value("SELECT @@server_id"))
48+
49+
50+
if __name__ == "__main__":
51+
db = MariaDbGTID(MARIADB_SETTINGS)
52+
53+
server_id = db.query_server_id()
54+
print('Server ID: ', server_id)
55+
56+
# gtid = db.query_gtid_current_pos(server_id)
57+
gtid = '0-1-1' # initial pos
58+
59+
stream = BinLogStreamReader(
60+
connection_settings=MARIADB_SETTINGS,
61+
server_id=server_id,
62+
blocking=False,
63+
only_events=[
64+
MariadbGtidEvent,
65+
RotateEvent,
66+
WriteRowsEvent,
67+
UpdateRowsEvent,
68+
DeleteRowsEvent
69+
],
70+
auto_position=gtid,
71+
is_mariadb=True
72+
)
73+
74+
print('Starting reading events from GTID ', gtid)
75+
for binlogevent in stream:
76+
binlogevent.dump()
77+
78+
if isinstance(binlogevent, MariadbGtidEvent):
79+
gtid = binlogevent.gtid
80+
81+
print('Last encountered GTID: ', gtid)
82+
83+
stream.close()

pymysqlreplication/binlogstream.py

Lines changed: 112 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
1515
XidEvent, GtidEvent, StopEvent,
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
17-
HeartbeatLogEvent, NotImplementedEvent)
17+
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
1818
from .exceptions import BinLogNotEnabled
1919
from .row_event import (
2020
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
@@ -139,7 +139,8 @@ def __init__(self, connection_settings, server_id,
139139
report_slave=None, slave_uuid=None,
140140
pymysql_wrapper=None,
141141
fail_on_table_metadata_unavailable=False,
142-
slave_heartbeat=None):
142+
slave_heartbeat=None,
143+
is_mariadb=False):
143144
"""
144145
Attributes:
145146
ctl_connection_settings: Connection settings for cluster holding
@@ -174,6 +175,8 @@ def __init__(self, connection_settings, server_id,
174175
many event to skip in binlog). See
175176
MASTER_HEARTBEAT_PERIOD in mysql documentation
176177
for semantics
178+
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
179+
to point to Mariadb specific GTID.
177180
"""
178181

179182
self.__connection_settings = connection_settings
@@ -211,6 +214,7 @@ def __init__(self, connection_settings, server_id,
211214
self.log_file = log_file
212215
self.auto_position = auto_position
213216
self.skip_to_timestamp = skip_to_timestamp
217+
self.is_mariadb = is_mariadb
214218

215219
if end_log_pos:
216220
self.is_past_end_log_pos = False
@@ -341,77 +345,114 @@ def __connect_to_stream(self):
341345
prelude += struct.pack('<I', self.__server_id)
342346
prelude += self.log_file.encode()
343347
else:
344-
# Format for mysql packet master_auto_position
345-
#
346-
# All fields are little endian
347-
# All fields are unsigned
348-
349-
# Packet length uint 4bytes
350-
# Packet type byte 1byte == 0x1e
351-
# Binlog flags ushort 2bytes == 0 (for retrocompatibilty)
352-
# Server id uint 4bytes
353-
# binlognamesize uint 4bytes
354-
# binlogname str Nbytes N = binlognamesize
355-
# Zeroified
356-
# binlog position uint 4bytes == 4
357-
# payload_size uint 4bytes
358-
359-
# What come next, is the payload, where the slave gtid_executed
360-
# is sent to the master
361-
# n_sid ulong 8bytes == which size is the gtid_set
362-
# | sid uuid 16bytes UUID as a binary
363-
# | n_intervals ulong 8bytes == how many intervals are sent
364-
# | for this gtid
365-
# | | start ulong 8bytes Start position of this interval
366-
# | | stop ulong 8bytes Stop position of this interval
367-
368-
# A gtid set looks like:
369-
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
370-
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
371-
#
372-
# In this particular gtid set,
373-
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
374-
# is the first member of the set, it is called a gtid.
375-
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
376-
# and have two intervals, 1-3 and 8-10, 1 is the start position of
377-
# the first interval 3 is the stop position of the first interval.
378-
379-
gtid_set = GtidSet(self.auto_position)
380-
encoded_data_size = gtid_set.encoded_length
381-
382-
header_size = (2 + # binlog_flags
383-
4 + # server_id
384-
4 + # binlog_name_info_size
385-
4 + # empty binlog name
386-
8 + # binlog_pos_info_size
387-
4) # encoded_data_size
388-
389-
prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
390-
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
348+
if self.is_mariadb:
349+
# https://mariadb.com/kb/en/5-slave-registration/
350+
cur = self._stream_connection.cursor()
391351

392-
flags = 0
393-
if not self.__blocking:
394-
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
395-
flags |= 0x04 # BINLOG_THROUGH_GTID
352+
cur.execute("SET @mariadb_slave_capability=4")
353+
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
354+
cur.execute("SET @slave_gtid_strict_mode=1")
355+
cur.execute("SET @slave_gtid_ignore_duplicates=0")
356+
cur.close()
396357

397-
# binlog_flags (2 bytes)
398-
# see:
399-
# https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
400-
prelude += struct.pack('<H', flags)
358+
# https://mariadb.com/kb/en/com_binlog_dump/
359+
header_size = (
360+
4 + # binlog pos
361+
2 + # binlog flags
362+
4 + # slave server_id,
363+
4 # requested binlog file name , set it to empty
364+
)
401365

402-
# server_id (4 bytes)
403-
prelude += struct.pack('<I', self.__server_id)
404-
# binlog_name_info_size (4 bytes)
405-
prelude += struct.pack('<I', 3)
406-
# empty_binlog_name (4 bytes)
407-
prelude += b'\0\0\0'
408-
# binlog_pos_info (8 bytes)
409-
prelude += struct.pack('<Q', 4)
410-
411-
# encoded_data_size (4 bytes)
412-
prelude += struct.pack('<I', gtid_set.encoded_length)
413-
# encoded_data
414-
prelude += gtid_set.encoded()
366+
prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))
367+
368+
# binlog pos
369+
prelude += struct.pack('<i', 4)
370+
371+
flags = 0
372+
if not self.__blocking:
373+
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
374+
375+
# binlog flags
376+
prelude += struct.pack('<H', flags)
377+
378+
# server id (4 bytes)
379+
prelude += struct.pack('<I', self.__server_id)
380+
381+
# empty_binlog_name (4 bytes)
382+
prelude += b'\0\0\0\0'
383+
384+
else:
385+
# Format for mysql packet master_auto_position
386+
#
387+
# All fields are little endian
388+
# All fields are unsigned
389+
390+
# Packet length uint 4bytes
391+
# Packet type byte 1byte == 0x1e
392+
# Binlog flags ushort 2bytes == 0 (for retrocompatibilty)
393+
# Server id uint 4bytes
394+
# binlognamesize uint 4bytes
395+
# binlogname str Nbytes N = binlognamesize
396+
# Zeroified
397+
# binlog position uint 4bytes == 4
398+
# payload_size uint 4bytes
399+
400+
# What come next, is the payload, where the slave gtid_executed
401+
# is sent to the master
402+
# n_sid ulong 8bytes == which size is the gtid_set
403+
# | sid uuid 16bytes UUID as a binary
404+
# | n_intervals ulong 8bytes == how many intervals are sent
405+
# | for this gtid
406+
# | | start ulong 8bytes Start position of this interval
407+
# | | stop ulong 8bytes Stop position of this interval
408+
409+
# A gtid set looks like:
410+
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
411+
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
412+
#
413+
# In this particular gtid set,
414+
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
415+
# is the first member of the set, it is called a gtid.
416+
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
417+
# and have two intervals, 1-3 and 8-10, 1 is the start position of
418+
# the first interval 3 is the stop position of the first interval.
419+
420+
gtid_set = GtidSet(self.auto_position)
421+
encoded_data_size = gtid_set.encoded_length
422+
423+
header_size = (2 + # binlog_flags
424+
4 + # server_id
425+
4 + # binlog_name_info_size
426+
4 + # empty binlog name
427+
8 + # binlog_pos_info_size
428+
4) # encoded_data_size
429+
430+
prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
431+
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
432+
433+
flags = 0
434+
if not self.__blocking:
435+
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
436+
flags |= 0x04 # BINLOG_THROUGH_GTID
437+
438+
# binlog_flags (2 bytes)
439+
# see:
440+
# https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
441+
prelude += struct.pack('<H', flags)
442+
443+
# server_id (4 bytes)
444+
prelude += struct.pack('<I', self.__server_id)
445+
# binlog_name_info_size (4 bytes)
446+
prelude += struct.pack('<I', 3)
447+
# empty_binlog_namapprovale (4 bytes)
448+
prelude += b'\0\0\0'
449+
# binlog_pos_info (8 bytes)
450+
prelude += struct.pack('<Q', 4)
451+
452+
# encoded_data_size (4 bytes)
453+
prelude += struct.pack('<I', gtid_set.encoded_length)
454+
# encoded_data
455+
prelude += gtid_set.encoded()
415456

416457
if pymysql.__version__ < LooseVersion("0.6"):
417458
self._stream_connection.wfile.write(prelude)
@@ -542,6 +583,7 @@ def _allowed_event_list(self, only_events, ignored_events,
542583
TableMapEvent,
543584
HeartbeatLogEvent,
544585
NotImplementedEvent,
586+
MariadbGtidEvent
545587
))
546588
if ignored_events is not None:
547589
for e in ignored_events:

pymysqlreplication/constants/BINLOG.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,10 @@
4141
INTVAR_INVALID_INT_EVENT = 0x00
4242
INTVAR_LAST_INSERT_ID_EVENT = 0x01
4343
INTVAR_INSERT_ID_EVENT = 0x02
44+
45+
# MariaDB events
46+
MARIADB_ANNOTATE_ROWS_EVENT = 0xa0
47+
MARIADB_BINLOG_CHECKPOINT_EVENT = 0xa1
48+
MARIADB_GTID_EVENT = 0xa2
49+
MARIADB_GTID_GTID_LIST_EVENT = 0xa3
50+
MARIADB_START_ENCRYPTION_EVENT = 0xa4

0 commit comments

Comments
 (0)