diff --git a/examples/mariadb_gtid/docker-compose.yml b/examples/mariadb_gtid/docker-compose.yml new file mode 100644 index 00000000..dd7d5ecc --- /dev/null +++ b/examples/mariadb_gtid/docker-compose.yml @@ -0,0 +1,31 @@ +version: '3' + +services: + testdb: + container_name: "testdb" + image: mariadb:10.6 + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 + MARIADB_DATABASE: mydb + MARIADB_USER: replication_user + MARIADB_PASSWORD: secret123passwd + ports: + - "3306:3306" + command: | + --server-id=1 + --default-authentication-plugin=mysql_native_password + --log-bin=master-bin + --binlog-format=row + --log-slave-updates=on + --binlog-do-db=mydb + volumes: + - ./queries.sql:/docker-entrypoint-initdb.d/queries.sql + networks: + - mariadb-cluster + +networks: + mariadb-cluster: + ipam: + driver: default + config: + - subnet: 172.200.0.0/24 diff --git a/examples/mariadb_gtid/queries.sql b/examples/mariadb_gtid/queries.sql new file mode 100644 index 00000000..1ae7ed55 --- /dev/null +++ b/examples/mariadb_gtid/queries.sql @@ -0,0 +1,29 @@ +# configure replication user +grant replication slave on *.* to 'replication_user'@'%'; +flush privileges; + +# create objects +create table r1 ( + i1 int auto_increment primary key, + c1 varchar(10), + d1 datetime default current_timestamp() +); + +insert into r1 (c1) values ('#1'),('#2'),('#3'),('#4'),('#5'),('#6'),('#7'); + +create table r2 (i2 int primary key, d2 datetime) ; +insert into r2 (i2, d2) values (1, now()); +insert into r2 (i2, d2) values (2, now()); +insert into r2 (i2, d2) values (3, now()); +insert into r2 (i2, d2) values (4, now()); + +update r1 set c1=concat(c1, '-up'); + +select * from r2; + +delete from r1 where i1 < 4; + +drop table r2; + +alter table r1 add column b1 bool default False; +insert into r1 (c1, b1) values ('#8', True); diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py new file mode 100644 index 00000000..cc88a97f --- /dev/null +++ b/examples/mariadb_gtid/read_event.py @@ -0,0 +1,83 @@ +import pymysql + +from pymysqlreplication import BinLogStreamReader, gtid +from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent +from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent + +MARIADB_SETTINGS = { + "host": "127.0.0.1", + "port": 3306, + "user": "replication_user", + "passwd": "secret123passwd", +} + + +class MariaDbGTID: + def __init__(self, conn_config): + self.connection = pymysql.connect(**conn_config) + + def query_single_value(self, sql: str): + res = None + + with self.connection.cursor() as cursor: + cursor.execute(sql) + row = cursor.fetchone() + res = str(row[0]) + + return res + + def extract_gtid(self, gtid: str, server_id: str): + if gtid is None or server_id is None: + return None + + gtid_parts = gtid.split("-") + + if len(gtid_parts) != 3: + return None + + if gtid_parts[1] == server_id: + return gtid + + return None + + def query_gtid_current_pos(self, server_id: str): + return self.extract_gtid(self.query_single_value("SELECT @@gtid_current_pos"), server_id) + + def query_server_id(self): + return int(self.query_single_value("SELECT @@server_id")) + + +if __name__ == "__main__": + db = MariaDbGTID(MARIADB_SETTINGS) + + server_id = db.query_server_id() + print('Server ID: ', server_id) + + # gtid = db.query_gtid_current_pos(server_id) + gtid = '0-1-1' # initial pos + + stream = BinLogStreamReader( + connection_settings=MARIADB_SETTINGS, + server_id=server_id, + blocking=False, + only_events=[ + MariadbGtidEvent, + RotateEvent, + WriteRowsEvent, + UpdateRowsEvent, + DeleteRowsEvent + ], + auto_position=gtid, + is_mariadb=True + ) + + print('Starting reading events from GTID ', gtid) + for binlogevent in stream: + binlogevent.dump() + + if isinstance(binlogevent, MariadbGtidEvent): + gtid = binlogevent.gtid + + print('Last encountered GTID: ', gtid) + + stream.close() diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index f2f29e65..ed69b2a2 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,7 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - HeartbeatLogEvent, NotImplementedEvent) + HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -139,7 +139,8 @@ def __init__(self, connection_settings, server_id, report_slave=None, slave_uuid=None, pymysql_wrapper=None, fail_on_table_metadata_unavailable=False, - slave_heartbeat=None): + slave_heartbeat=None, + is_mariadb=False): """ Attributes: ctl_connection_settings: Connection settings for cluster holding @@ -174,6 +175,8 @@ def __init__(self, connection_settings, server_id, many event to skip in binlog). See MASTER_HEARTBEAT_PERIOD in mysql documentation for semantics + is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position + to point to Mariadb specific GTID. """ self.__connection_settings = connection_settings @@ -211,6 +214,7 @@ def __init__(self, connection_settings, server_id, self.log_file = log_file self.auto_position = auto_position self.skip_to_timestamp = skip_to_timestamp + self.is_mariadb = is_mariadb if end_log_pos: self.is_past_end_log_pos = False @@ -341,77 +345,114 @@ def __connect_to_stream(self): prelude += struct.pack('' % self.gtid +class MariadbGtidEvent(BinLogEvent): + """ + GTID change in binlog event in MariaDB + https://mariadb.com/kb/en/gtid_event/ + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + + super(MariadbGtidEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + + self.server_id = self.packet.server_id + self.gtid_seq_no = self.packet.read_uint64() + self.domain_id = self.packet.read_uint32() + self.flags = self.packet.read_uint8() + self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + + def _dump(self): + super(MariadbGtidEvent, self)._dump() + print("Flags:", self.flags) + print('GTID:', self.gtid) + + class RotateEvent(BinLogEvent): """Change MySQL bin log file @@ -154,7 +175,7 @@ def _dump(self): class QueryEvent(BinLogEvent): - '''This evenement is trigger when a query is run of the database. + '''This event is trigger when a query is run of the database. Only replicated queries are logged.''' def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): super(QueryEvent, self).__init__(from_packet, event_size, table_map, diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index a5a7c0fd..c4e4dfdc 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -77,10 +77,16 @@ class BinLogPacketWrapper(object): constants.WRITE_ROWS_EVENT_V2: row_event.WriteRowsEvent, constants.DELETE_ROWS_EVENT_V2: row_event.DeleteRowsEvent, constants.TABLE_MAP_EVENT: row_event.TableMapEvent, + #5.6 GTID enabled replication events constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, - constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent - + constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent, + # MariaDB GTID + constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent, + constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, + constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent, + constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent, + constants.MARIADB_START_ENCRYPTION_EVENT: event.NotImplementedEvent } def __init__(self, from_packet, table_map, ctl_connection, use_checksum,