Skip to content

Commit ee4df93

Browse files
Merge branch 'feature/previous-gtid-event' of github.com:23-OSSCA-python-mysql-replication/python-mysql-replication into 23-OSSCA-python-mysql-replication-feature/previous-gtid-event
2 parents 460a702 + 4b19633 commit ee4df93

File tree

5 files changed

+46
-9
lines changed

5 files changed

+46
-9
lines changed

Diff for: pymysqlreplication/binlogstream.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1515
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
1616
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent,
17-
MariadbGtidListEvent, MariadbBinLogCheckPointEvent, UserVarEvent)
17+
MariadbGtidListEvent, MariadbBinLogCheckPointEvent, UserVarEvent,
18+
PreviousGtidsEvent)
1819
from .exceptions import BinLogNotEnabled
1920
from .gtid import GtidSet
2021
from .packet import BinLogPacketWrapper
@@ -183,7 +184,7 @@ def __init__(self, connection_settings, server_id,
183184
to point to Mariadb specific GTID.
184185
annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
185186
used with 'is_mariadb'
186-
ignore_decode_errors: If true, any decode errors encountered
187+
ignore_decode_errors: If true, any decode errors encountered
187188
when reading column data will be ignored.
188189
verify_checksum: If true, verify events read from the binary log by examining checksums.
189190
"""
@@ -478,7 +479,7 @@ def __set_mariadb_settings(self):
478479

479480
flags = 0
480481

481-
# Enable annotate rows event
482+
# Enable annotate rows event
482483
if self.__annotate_rows_event:
483484
flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
484485

@@ -631,7 +632,8 @@ def _allowed_event_list(self, only_events, ignored_events,
631632
MariadbStartEncryptionEvent,
632633
MariadbGtidListEvent,
633634
MariadbBinLogCheckPointEvent,
634-
UserVarEvent
635+
UserVarEvent,
636+
PreviousGtidsEvent
635637
))
636638
if ignored_events is not None:
637639
for e in ignored_events:

Diff for: pymysqlreplication/event.py

+34
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,40 @@ def __repr__(self):
121121
return '<GtidEvent "%s">' % self.gtid
122122

123123

124+
class PreviousGtidsEvent(BinLogEvent):
125+
"""
126+
PreviousGtidEvent is contains the Gtids executed in the last binary log file.
127+
Attributes:
128+
n_sid: which size is the gtid_set
129+
sid: 16bytes UUID as a binary
130+
n_intervals: how many intervals are sent
131+
Eg: [4c9e3dfc-9d25-11e9-8d2e-0242ac1cfd7e:1-100, 4c9e3dfc-9d25-11e9-8d2e-0242ac1cfd7e:1-10:20-30]
132+
"""
133+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
134+
super(PreviousGtidsEvent, self).__init__(from_packet, event_size, table_map,
135+
ctl_connection, **kwargs)
136+
137+
self._n_sid = self.packet.read_int64()
138+
self._gtids = []
139+
140+
for _ in range(self._n_sid):
141+
sid = self.packet.read(16)
142+
n_intervals = self.packet.read_uint64()
143+
intervals = [f"{self.packet.read_int64()}-{self.packet.read_uint64()}" for _ in range(n_intervals)]
144+
nibbles = binascii.hexlify(sid).decode('ascii')
145+
gtid = '%s-%s-%s-%s-%s:%s' % (
146+
nibbles[:8], nibbles[8:12], nibbles[12:16], nibbles[16:20], nibbles[20:], ':'.join(intervals))
147+
self._gtids.append(gtid)
148+
149+
self._previous_gtids = ','.join(self._gtids)
150+
151+
def _dump(self):
152+
print("previous_gtids: %s" % self._previous_gtids)
153+
154+
def __repr__(self):
155+
return '<PreviousGtidsEvent "%s">' % self._previous_gtids
156+
157+
124158
class MariadbGtidEvent(BinLogEvent):
125159
"""
126160
GTID(Global Transaction Identifier) change in binlog event in MariaDB

Diff for: pymysqlreplication/packet.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class BinLogPacketWrapper(object):
6565
constants.XID_EVENT: event.XidEvent,
6666
constants.INTVAR_EVENT: event.IntvarEvent,
6767
constants.GTID_LOG_EVENT: event.GtidEvent,
68+
constants.PREVIOUS_GTIDS_LOG_EVENT: event.PreviousGtidsEvent,
6869
constants.STOP_EVENT: event.StopEvent,
6970
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
7071
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
@@ -84,8 +85,6 @@ class BinLogPacketWrapper(object):
8485

8586
#5.6 GTID enabled replication events
8687
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
87-
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
88-
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
8988
# MariaDB GTID
9089
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
9190
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.MariadbBinLogCheckPointEvent,

Diff for: pymysqlreplication/tests/test_basic.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
3030
def ignoredEvents(self):
31-
return [GtidEvent]
31+
return [GtidEvent, PreviousGtidsEvent]
3232

3333
def test_allowed_event_list(self):
3434
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 23)
@@ -577,7 +577,7 @@ def create_binlog_packet_wrapper(pkt):
577577

578578
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
579579
def ignoredEvents(self):
580-
return [GtidEvent]
580+
return [GtidEvent, PreviousGtidsEvent]
581581

582582
def test_insert_multiple_row_event(self):
583583
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
@@ -902,6 +902,7 @@ def test_read_query_event(self):
902902
query = "COMMIT;"
903903
self.execute(query)
904904

905+
self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent)
905906
firstevent = self.stream.fetchone()
906907
self.assertIsInstance(firstevent, GtidEvent)
907908

@@ -951,6 +952,7 @@ def test_position_gtid(self):
951952

952953
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
953954
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
955+
self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent)
954956
self.assertIsInstance(self.stream.fetchone(), GtidEvent)
955957
event = self.stream.fetchone()
956958

Diff for: pymysqlreplication/tests/test_data_type.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def encode_value(v):
3232

3333
class TestDataType(base.PyMySQLReplicationTestCase):
3434
def ignoredEvents(self):
35-
return [GtidEvent]
35+
return [GtidEvent, PreviousGtidsEvent]
3636

3737
def create_and_insert_value(self, create_query, insert_query):
3838
self.execute(create_query)

0 commit comments

Comments
 (0)