Skip to content

Add support for MariaDB GTID #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/mariadb_gtid/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
version: '3'

services:
testdb:
container_name: "testdb"
image: mariadb:10.6
environment:
MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
MARIADB_DATABASE: mydb
MARIADB_USER: replication_user
MARIADB_PASSWORD: secret123passwd
ports:
- "3306:3306"
command: |
--server-id=1
--default-authentication-plugin=mysql_native_password
--log-bin=master-bin
--binlog-format=row
--log-slave-updates=on
--binlog-do-db=mydb
volumes:
- ./queries.sql:/docker-entrypoint-initdb.d/queries.sql
networks:
- mariadb-cluster

networks:
mariadb-cluster:
ipam:
driver: default
config:
- subnet: 172.200.0.0/24
29 changes: 29 additions & 0 deletions examples/mariadb_gtid/queries.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# configure replication user
grant replication slave on *.* to 'replication_user'@'%';
flush privileges;

# create objects
create table r1 (
i1 int auto_increment primary key,
c1 varchar(10),
d1 datetime default current_timestamp()
);

insert into r1 (c1) values ('#1'),('#2'),('#3'),('#4'),('#5'),('#6'),('#7');

create table r2 (i2 int primary key, d2 datetime) ;
insert into r2 (i2, d2) values (1, now());
insert into r2 (i2, d2) values (2, now());
insert into r2 (i2, d2) values (3, now());
insert into r2 (i2, d2) values (4, now());

update r1 set c1=concat(c1, '-up');

select * from r2;

delete from r1 where i1 < 4;

drop table r2;

alter table r1 add column b1 bool default False;
insert into r1 (c1, b1) values ('#8', True);
83 changes: 83 additions & 0 deletions examples/mariadb_gtid/read_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import pymysql

from pymysqlreplication import BinLogStreamReader, gtid
from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent

MARIADB_SETTINGS = {
"host": "127.0.0.1",
"port": 3306,
"user": "replication_user",
"passwd": "secret123passwd",
}


class MariaDbGTID:
def __init__(self, conn_config):
self.connection = pymysql.connect(**conn_config)

def query_single_value(self, sql: str):
res = None

with self.connection.cursor() as cursor:
cursor.execute(sql)
row = cursor.fetchone()
res = str(row[0])

return res

def extract_gtid(self, gtid: str, server_id: str):
if gtid is None or server_id is None:
return None

gtid_parts = gtid.split("-")

if len(gtid_parts) != 3:
return None

if gtid_parts[1] == server_id:
return gtid

return None

def query_gtid_current_pos(self, server_id: str):
return self.extract_gtid(self.query_single_value("SELECT @@gtid_current_pos"), server_id)

def query_server_id(self):
return int(self.query_single_value("SELECT @@server_id"))


if __name__ == "__main__":
db = MariaDbGTID(MARIADB_SETTINGS)

server_id = db.query_server_id()
print('Server ID: ', server_id)

# gtid = db.query_gtid_current_pos(server_id)
gtid = '0-1-1' # initial pos

stream = BinLogStreamReader(
connection_settings=MARIADB_SETTINGS,
server_id=server_id,
blocking=False,
only_events=[
MariadbGtidEvent,
RotateEvent,
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent
],
auto_position=gtid,
is_mariadb=True
)

print('Starting reading events from GTID ', gtid)
for binlogevent in stream:
binlogevent.dump()

if isinstance(binlogevent, MariadbGtidEvent):
gtid = binlogevent.gtid

print('Last encountered GTID: ', gtid)

stream.close()
182 changes: 112 additions & 70 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
QueryEvent, RotateEvent, FormatDescriptionEvent,
XidEvent, GtidEvent, StopEvent,
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
HeartbeatLogEvent, NotImplementedEvent)
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
from .exceptions import BinLogNotEnabled
from .row_event import (
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
Expand Down Expand Up @@ -139,7 +139,8 @@ def __init__(self, connection_settings, server_id,
report_slave=None, slave_uuid=None,
pymysql_wrapper=None,
fail_on_table_metadata_unavailable=False,
slave_heartbeat=None):
slave_heartbeat=None,
is_mariadb=False):
"""
Attributes:
ctl_connection_settings: Connection settings for cluster holding
Expand Down Expand Up @@ -174,6 +175,8 @@ def __init__(self, connection_settings, server_id,
many event to skip in binlog). See
MASTER_HEARTBEAT_PERIOD in mysql documentation
for semantics
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
to point to Mariadb specific GTID.
"""

self.__connection_settings = connection_settings
Expand Down Expand Up @@ -211,6 +214,7 @@ def __init__(self, connection_settings, server_id,
self.log_file = log_file
self.auto_position = auto_position
self.skip_to_timestamp = skip_to_timestamp
self.is_mariadb = is_mariadb

if end_log_pos:
self.is_past_end_log_pos = False
Expand Down Expand Up @@ -341,77 +345,114 @@ def __connect_to_stream(self):
prelude += struct.pack('<I', self.__server_id)
prelude += self.log_file.encode()
else:
# Format for mysql packet master_auto_position
#
# All fields are little endian
# All fields are unsigned

# Packet length uint 4bytes
# Packet type byte 1byte == 0x1e
# Binlog flags ushort 2bytes == 0 (for retrocompatibilty)
# Server id uint 4bytes
# binlognamesize uint 4bytes
# binlogname str Nbytes N = binlognamesize
# Zeroified
# binlog position uint 4bytes == 4
# payload_size uint 4bytes

# What come next, is the payload, where the slave gtid_executed
# is sent to the master
# n_sid ulong 8bytes == which size is the gtid_set
# | sid uuid 16bytes UUID as a binary
# | n_intervals ulong 8bytes == how many intervals are sent
# | for this gtid
# | | start ulong 8bytes Start position of this interval
# | | stop ulong 8bytes Stop position of this interval

# A gtid set looks like:
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
#
# In this particular gtid set,
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
# is the first member of the set, it is called a gtid.
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
# and have two intervals, 1-3 and 8-10, 1 is the start position of
# the first interval 3 is the stop position of the first interval.

gtid_set = GtidSet(self.auto_position)
encoded_data_size = gtid_set.encoded_length

header_size = (2 + # binlog_flags
4 + # server_id
4 + # binlog_name_info_size
4 + # empty binlog name
8 + # binlog_pos_info_size
4) # encoded_data_size

prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))
if self.is_mariadb:
# https://mariadb.com/kb/en/5-slave-registration/
cur = self._stream_connection.cursor()

flags = 0
if not self.__blocking:
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
flags |= 0x04 # BINLOG_THROUGH_GTID
cur.execute("SET @mariadb_slave_capability=4")
cur.execute("SET @slave_connect_state='%s'" % self.auto_position)
cur.execute("SET @slave_gtid_strict_mode=1")
cur.execute("SET @slave_gtid_ignore_duplicates=0")
cur.close()

# binlog_flags (2 bytes)
# see:
# https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
prelude += struct.pack('<H', flags)
# https://mariadb.com/kb/en/com_binlog_dump/
header_size = (
4 + # binlog pos
2 + # binlog flags
4 + # slave server_id,
4 # requested binlog file name , set it to empty
)

# server_id (4 bytes)
prelude += struct.pack('<I', self.__server_id)
# binlog_name_info_size (4 bytes)
prelude += struct.pack('<I', 3)
# empty_binlog_name (4 bytes)
prelude += b'\0\0\0'
# binlog_pos_info (8 bytes)
prelude += struct.pack('<Q', 4)

# encoded_data_size (4 bytes)
prelude += struct.pack('<I', gtid_set.encoded_length)
# encoded_data
prelude += gtid_set.encoded()
prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP]))

# binlog pos
prelude += struct.pack('<i', 4)

flags = 0
if not self.__blocking:
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK

# binlog flags
prelude += struct.pack('<H', flags)

# server id (4 bytes)
prelude += struct.pack('<I', self.__server_id)

# empty_binlog_name (4 bytes)
prelude += b'\0\0\0\0'

else:
# Format for mysql packet master_auto_position
#
# All fields are little endian
# All fields are unsigned

# Packet length uint 4bytes
# Packet type byte 1byte == 0x1e
# Binlog flags ushort 2bytes == 0 (for retrocompatibilty)
# Server id uint 4bytes
# binlognamesize uint 4bytes
# binlogname str Nbytes N = binlognamesize
# Zeroified
# binlog position uint 4bytes == 4
# payload_size uint 4bytes

# What come next, is the payload, where the slave gtid_executed
# is sent to the master
# n_sid ulong 8bytes == which size is the gtid_set
# | sid uuid 16bytes UUID as a binary
# | n_intervals ulong 8bytes == how many intervals are sent
# | for this gtid
# | | start ulong 8bytes Start position of this interval
# | | stop ulong 8bytes Stop position of this interval

# A gtid set looks like:
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
#
# In this particular gtid set,
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
# is the first member of the set, it is called a gtid.
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
# and have two intervals, 1-3 and 8-10, 1 is the start position of
# the first interval 3 is the stop position of the first interval.

gtid_set = GtidSet(self.auto_position)
encoded_data_size = gtid_set.encoded_length

header_size = (2 + # binlog_flags
4 + # server_id
4 + # binlog_name_info_size
4 + # empty binlog name
8 + # binlog_pos_info_size
4) # encoded_data_size

prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
+ bytes(bytearray([COM_BINLOG_DUMP_GTID]))

flags = 0
if not self.__blocking:
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
flags |= 0x04 # BINLOG_THROUGH_GTID

# binlog_flags (2 bytes)
# see:
# https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
prelude += struct.pack('<H', flags)

# server_id (4 bytes)
prelude += struct.pack('<I', self.__server_id)
# binlog_name_info_size (4 bytes)
prelude += struct.pack('<I', 3)
# empty_binlog_namapprovale (4 bytes)
prelude += b'\0\0\0'
# binlog_pos_info (8 bytes)
prelude += struct.pack('<Q', 4)

# encoded_data_size (4 bytes)
prelude += struct.pack('<I', gtid_set.encoded_length)
# encoded_data
prelude += gtid_set.encoded()

if pymysql.__version__ < LooseVersion("0.6"):
self._stream_connection.wfile.write(prelude)
Expand Down Expand Up @@ -542,6 +583,7 @@ def _allowed_event_list(self, only_events, ignored_events,
TableMapEvent,
HeartbeatLogEvent,
NotImplementedEvent,
MariadbGtidEvent
))
if ignored_events is not None:
for e in ignored_events:
Expand Down
7 changes: 7 additions & 0 deletions pymysqlreplication/constants/BINLOG.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@
INTVAR_INVALID_INT_EVENT = 0x00
INTVAR_LAST_INSERT_ID_EVENT = 0x01
INTVAR_INSERT_ID_EVENT = 0x02

# MariaDB events
MARIADB_ANNOTATE_ROWS_EVENT = 0xa0
MARIADB_BINLOG_CHECKPOINT_EVENT = 0xa1
MARIADB_GTID_EVENT = 0xa2
MARIADB_GTID_GTID_LIST_EVENT = 0xa3
MARIADB_START_ENCRYPTION_EVENT = 0xa4
Loading