diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index fa65aa22..1ead555c 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,8 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, XAPrepareEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) + HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent, + PreviousGtidsEvent) from .exceptions import BinLogNotEnabled from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -600,7 +601,8 @@ def _allowed_event_list(self, only_events, ignored_events, TableMapEvent, HeartbeatLogEvent, NotImplementedEvent, - MariadbGtidEvent + MariadbGtidEvent, + PreviousGtidsEvent )) if ignored_events is not None: for e in ignored_events: diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index d75c9db8..3c26fc8b 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -90,6 +90,40 @@ def __repr__(self): return '' % self.gtid +class PreviousGtidsEvent(BinLogEvent): + """ + PreviousGtidEvent is contains the Gtids executed in the last binary log file. + Attributes: + n_sid: which size is the gtid_set + sid: 16bytes UUID as a binary + n_intervals: how many intervals are sent + Eg: [4c9e3dfc-9d25-11e9-8d2e-0242ac1cfd7e:1-100, 4c9e3dfc-9d25-11e9-8d2e-0242ac1cfd7e:1-10:20-30] + """ + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(PreviousGtidsEvent, self).__init__(from_packet, event_size, table_map, + ctl_connection, **kwargs) + + self._n_sid = self.packet.read_int64() + self._gtids = [] + + for _ in range(self._n_sid): + sid = self.packet.read(16) + n_intervals = self.packet.read_uint64() + intervals = [f"{self.packet.read_int64()}-{self.packet.read_uint64()}" for _ in range(n_intervals)] + nibbles = binascii.hexlify(sid).decode('ascii') + gtid = '%s-%s-%s-%s-%s:%s' % ( + nibbles[:8], nibbles[8:12], nibbles[12:16], nibbles[16:20], nibbles[20:], ':'.join(intervals)) + self._gtids.append(gtid) + + self._previous_gtids = ','.join(self._gtids) + + def _dump(self): + print("previous_gtids: %s" % self._previous_gtids) + + def __repr__(self): + return '' % self._previous_gtids + + class MariadbGtidEvent(BinLogEvent): """ GTID change in binlog event in MariaDB @@ -289,7 +323,7 @@ def _read_status_vars_value_for_key(self, key): elif key == Q_TIME_ZONE_CODE: # 0x05 time_zone_len = self.packet.read_uint8() if time_zone_len: - self.time_zone = self.packet.read(time_zone_len) + self.time_zone = self.packet.read(time_zone_len) elif key == Q_CATALOG_NZ_CODE: # 0x06 catalog_len = self.packet.read_uint8() if catalog_len: diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index 94baefdf..a1af014a 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -65,6 +65,7 @@ class BinLogPacketWrapper(object): constants.XID_EVENT: event.XidEvent, constants.INTVAR_EVENT: event.IntvarEvent, constants.GTID_LOG_EVENT: event.GtidEvent, + constants.PREVIOUS_GTIDS_LOG_EVENT: event.PreviousGtidsEvent, constants.STOP_EVENT: event.StopEvent, constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent, constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent, @@ -81,7 +82,6 @@ class BinLogPacketWrapper(object): #5.6 GTID enabled replication events constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent, - constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent, # MariaDB GTID constants.MARIADB_ANNOTATE_ROWS_EVENT: event.NotImplementedEvent, constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent, diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 0db8a264..9b823147 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -22,12 +22,12 @@ class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self): - return [GtidEvent] + return [GtidEvent, PreviousGtidsEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 16) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 15) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 15) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -522,7 +522,7 @@ def test_end_log_pos(self): class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self): - return [GtidEvent] + return [GtidEvent, PreviousGtidsEvent] def test_insert_multiple_row_event(self): query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" @@ -844,6 +844,7 @@ def test_read_query_event(self): query = "COMMIT;" self.execute(query) + self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent) firstevent = self.stream.fetchone() self.assertIsInstance(firstevent, GtidEvent) @@ -893,6 +894,7 @@ def test_position_gtid(self): self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent) self.assertIsInstance(self.stream.fetchone(), GtidEvent) event = self.stream.fetchone() diff --git a/pymysqlreplication/tests/test_data_type.py b/pymysqlreplication/tests/test_data_type.py index cd6464c5..a1e4001a 100644 --- a/pymysqlreplication/tests/test_data_type.py +++ b/pymysqlreplication/tests/test_data_type.py @@ -32,7 +32,7 @@ def encode_value(v): class TestDataType(base.PyMySQLReplicationTestCase): def ignoredEvents(self): - return [GtidEvent] + return [GtidEvent, PreviousGtidsEvent] def create_and_insert_value(self, create_query, insert_query): self.execute(create_query)