diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 4f8213d5..393524ee 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -13,7 +13,8 @@ XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, - MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent) + MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent, + MariadbGtidListEvent) from .exceptions import BinLogNotEnabled from .gtid import GtidSet from .packet import BinLogPacketWrapper @@ -624,6 +625,7 @@ def _allowed_event_list(self, only_events, ignored_events, MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, + MariadbGtidListEvent )) if ignored_events is not None: for e in ignored_events: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 12c285c8..1de1e681 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -126,7 +126,41 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _dump(self): super()._dump() - print("SQL statement :", self.sql_statement) + print("SQL statement :", self.sql_statement) + +class MariadbGtidListEvent(BinLogEvent): + """ + GTID List event + https://mariadb.com/kb/en/gtid_list_event/ + + Attributes: + gtid_length: Number of GTIDs + gtid_list: list of 'MariadbGtidObejct' + + 'MariadbGtidObejct' Attributes: + domain_id: Replication Domain ID + server_id: Server_ID + gtid_seq_no: GTID sequence + gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no' + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + + super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + + class MariadbGtidObejct(BinLogEvent): + """ + Information class of elements in GTID list + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs) + self.domain_id = self.packet.read_uint32() + self.server_id = self.packet.read_uint32() + self.gtid_seq_no = self.packet.read_uint64() + self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no) + + + self.gtid_length = self.packet.read_uint32() + self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)] class RotateEvent(BinLogEvent): diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 33d53200..04d60581 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -89,7 +89,7 @@ class BinLogPacketWrapper(object): constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent, constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent, - constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent, + constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent, constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent } diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 32b237e0..d9195843 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -27,9 +27,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 20) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 19) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 19) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 21) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 20) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 20) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -1075,7 +1075,40 @@ def test_start_encryption_event(self): self.assertEqual(schema, 1) self.assertEqual(key_version, key_version_from_key_file) self.assertEqual(type(nonce), bytes) - self.assertEqual(len(nonce), 12) + self.assertEqual(len(nonce), 12) + + def test_gtid_list_event(self): + # set max_binlog_size to create new binlog file + query = 'SET GLOBAL max_binlog_size=4096' + self.execute(query) + # parse only Maradb GTID list event + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + blocking=False, + only_events=[MariadbGtidListEvent], + is_mariadb=True, + ) + + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + query = "INSERT INTO test (data) VALUES('Hello World')" + + for cnt in range(0,15): + self.execute(query) + self.execute("COMMIT") + + # 'mariadb gtid list event' of first binlog file + event = self.stream.fetchone() + self.assertEqual(event.event_type,163) + self.assertIsInstance(event,MariadbGtidListEvent) + + # 'mariadb gtid list event' of second binlog file + event = self.stream.fetchone() + self.assertEqual(event.event_type,163) + self.assertEqual(event.gtid_list[0].gtid, '0-1-15') + class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase):