diff --git a/docker-compose.yml b/docker-compose.yml index d6449d2c..a0412268 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,3 +15,16 @@ services: ports: - 3307:3307 command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307 + + mariadb-10.6: + image: mariadb:10.6 + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1 + ports: + - "3308:3306" + command: | + --server-id=1 + --default-authentication-plugin=mysql_native_password + --log-bin=master-bin + --binlog-format=row + --log-slave-updates=on diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py index cc88a97f..2bd75a90 100644 --- a/examples/mariadb_gtid/read_event.py +++ b/examples/mariadb_gtid/read_event.py @@ -1,7 +1,7 @@ import pymysql from pymysqlreplication import BinLogStreamReader, gtid -from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent +from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent, MariadbGtidListEvent from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent MARIADB_SETTINGS = { @@ -65,7 +65,8 @@ def query_server_id(self): RotateEvent, WriteRowsEvent, UpdateRowsEvent, - DeleteRowsEvent + DeleteRowsEvent, + MariadbGtidListEvent ], auto_position=gtid, is_mariadb=True diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fa65aa22..74e664e8 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,8 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) + HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, + MariadbGtidListEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -600,7 +601,8 @@ def _allowed_event_list(self, only_events, ignored_events, TableMapEvent, HeartbeatLogEvent, NotImplementedEvent, - MariadbGtidEvent + MariadbGtidEvent, + MariadbGtidListEvent )) if ignored_events is not None: for e in ignored_events: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index f4cbe1c6..ade9e998 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -89,6 +89,45 @@ def _dump(self): def __repr__(self): return '' % self.gtid +class MariadbGtidListEvent(BinLogEvent): + """ + GTID List event + https://mariadb.com/kb/en/gtid_list_event/ + + Attributes: + gtid_length: Number of GTIDs + gtid_list: list of 'MariadbGtidObejct' + + 'MariadbGtidObejct' Attributes: + domain_id: Replication Domain ID + server_id: Server_ID + gtid_seq_no: GTID sequence + gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no' + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + + super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + + class MariadbGtidObejct(BinLogEvent): + """ + Information class of elements in GTID list + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.domain_id = self.packet.read_uint32() + self.server_id = self.packet.server_id + self.gtid_seq_no = self.packet.read_uint64() + self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + + + self.gtid_length = self.packet.read_uint32() + self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)] + + + def _dump(self): + super(MariadbGtidListEvent, self)._dump() + print("GTID length:",self.gtid_length) + print("GTID list: " + ",".join(list(map(lambda x: x.gtid,self.gtid_list)))) class MariadbGtidEvent(BinLogEvent): """ diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 94baefdf..4edf12ea 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -86,7 +86,7 @@ class BinLogPacketWrapper(object): constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent, constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent, - constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent, + constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent, constants.MARIADB_START_ENCRYPTION_EVENT: event.NotImplementedEvent } diff --git a/pymysqlreplication/tests/base.py b/pymysqlreplication/tests/base.py index 037c6d9d..bf8f8f3c 100644 --- a/pymysqlreplication/tests/base.py +++ b/pymysqlreplication/tests/base.py @@ -67,7 +67,7 @@ def isMySQL80AndMore(self): def isMariaDB(self): if self.__is_mariaDB is None: - self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone() + self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0] return self.__is_mariaDB @property @@ -121,3 +121,32 @@ def bin_log_basename(self): bin_log_basename = cursor.fetchone()[0] bin_log_basename = bin_log_basename.split("/")[-1] return bin_log_basename + + +class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase): + def setUp(self): + # default + self.database = { + "host": "localhost", + "user": "root", + "passwd": "", + "port": 3308, + "use_unicode": True, + "charset": "utf8", + "db": "pymysqlreplication_test" + } + + self.conn_control = None + db = copy.copy(self.database) + db["db"] = None + self.connect_conn_control(db) + self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test") + self.execute("CREATE DATABASE pymysqlreplication_test") + db = copy.copy(self.database) + self.connect_conn_control(db) + self.stream = None + self.resetBinLog() + + + + \ No newline at end of file diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index c109acad..ec1caaf8 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -17,7 +17,7 @@ from pymysqlreplication.constants.BINLOG import * from pymysqlreplication.row_event import * -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"] +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader","TestMariadbBinlogStreaReader"] class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): @@ -1002,6 +1002,21 @@ def test_parsing(self): gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1") gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1") +class TestMariadbBinlogStreaReader(base.PyMySQLReplicationMariaDbTestCase): + + def test_gtid_list_event(self): + event = self.stream.fetchone() + self.assertEqual(event.position, 4) + + #FormatDescriptionEvent + event = self.stream.fetchone() + self.assertEqual(event.event_type,15) + self.assertIsInstance(event,FormatDescriptionEvent) + + #MariadbAnnotateRowsEvent + event = self.stream.fetchone() + self.assertEqual(event.event_type,163) + self.assertIsInstance(event,MariadbGtidListEvent) if __name__ == "__main__": import unittest