Skip to content

Commit a0c7388

Browse files
Add ‘MariadbGtidListEvent‘ class (#447)
* Add MariadbGtidListEvent class for parsing GTID List events * Add test code for MariadbGTIDListEvent class * fix - fixed how packets are read
1 parent f524638 commit a0c7388

File tree

4 files changed

+76
-7
lines changed

4 files changed

+76
-7
lines changed

pymysqlreplication/binlogstream.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
1414
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1515
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
16-
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent)
16+
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent,
17+
MariadbGtidListEvent)
1718
from .exceptions import BinLogNotEnabled
1819
from .gtid import GtidSet
1920
from .packet import BinLogPacketWrapper
@@ -624,6 +625,7 @@ def _allowed_event_list(self, only_events, ignored_events,
624625
MariadbAnnotateRowsEvent,
625626
RandEvent,
626627
MariadbStartEncryptionEvent,
628+
MariadbGtidListEvent
627629
))
628630
if ignored_events is not None:
629631
for e in ignored_events:

pymysqlreplication/event.py

+35-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,41 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
126126

127127
def _dump(self):
128128
super()._dump()
129-
print("SQL statement :", self.sql_statement)
129+
print("SQL statement :", self.sql_statement)
130+
131+
class MariadbGtidListEvent(BinLogEvent):
132+
"""
133+
GTID List event
134+
https://mariadb.com/kb/en/gtid_list_event/
135+
136+
Attributes:
137+
gtid_length: Number of GTIDs
138+
gtid_list: list of 'MariadbGtidObejct'
139+
140+
'MariadbGtidObejct' Attributes:
141+
domain_id: Replication Domain ID
142+
server_id: Server_ID
143+
gtid_seq_no: GTID sequence
144+
gtid: 'domain_id'+ 'server_id' + 'gtid_seq_no'
145+
"""
146+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
147+
148+
super(MariadbGtidListEvent, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
149+
150+
class MariadbGtidObejct(BinLogEvent):
151+
"""
152+
Information class of elements in GTID list
153+
"""
154+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
155+
super(MariadbGtidObejct, self).__init__(from_packet, event_size, table_map, ctl_connection, **kwargs)
156+
self.domain_id = self.packet.read_uint32()
157+
self.server_id = self.packet.read_uint32()
158+
self.gtid_seq_no = self.packet.read_uint64()
159+
self.gtid = "%d-%d-%d" % (self.domain_id, self.server_id, self.gtid_seq_no)
160+
161+
162+
self.gtid_length = self.packet.read_uint32()
163+
self.gtid_list = [MariadbGtidObejct(from_packet, event_size, table_map, ctl_connection, **kwargs) for i in range(self.gtid_length)]
130164

131165

132166
class RotateEvent(BinLogEvent):

pymysqlreplication/packet.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class BinLogPacketWrapper(object):
8989
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
9090
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
9191
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
92-
constants.MARIADB_GTID_GTID_LIST_EVENT: event.NotImplementedEvent,
92+
constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent,
9393
constants.MARIADB_START_ENCRYPTION_EVENT: event.MariadbStartEncryptionEvent
9494
}
9595

pymysqlreplication/tests/test_basic.py

+37-4
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ def ignoredEvents(self):
2727
return [GtidEvent]
2828

2929
def test_allowed_event_list(self):
30-
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 20)
31-
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 19)
32-
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 19)
30+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 21)
31+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 20)
32+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 20)
3333
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
3434

3535
def test_read_query_event(self):
@@ -1074,7 +1074,40 @@ def test_start_encryption_event(self):
10741074
self.assertEqual(schema, 1)
10751075
self.assertEqual(key_version, key_version_from_key_file)
10761076
self.assertEqual(type(nonce), bytes)
1077-
self.assertEqual(len(nonce), 12)
1077+
self.assertEqual(len(nonce), 12)
1078+
1079+
def test_gtid_list_event(self):
1080+
# set max_binlog_size to create new binlog file
1081+
query = 'SET GLOBAL max_binlog_size=4096'
1082+
self.execute(query)
1083+
# parse only Maradb GTID list event
1084+
self.stream.close()
1085+
self.stream = BinLogStreamReader(
1086+
self.database,
1087+
server_id=1024,
1088+
blocking=False,
1089+
only_events=[MariadbGtidListEvent],
1090+
is_mariadb=True,
1091+
)
1092+
1093+
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
1094+
self.execute(query)
1095+
query = "INSERT INTO test (data) VALUES('Hello World')"
1096+
1097+
for cnt in range(0,15):
1098+
self.execute(query)
1099+
self.execute("COMMIT")
1100+
1101+
# 'mariadb gtid list event' of first binlog file
1102+
event = self.stream.fetchone()
1103+
self.assertEqual(event.event_type,163)
1104+
self.assertIsInstance(event,MariadbGtidListEvent)
1105+
1106+
# 'mariadb gtid list event' of second binlog file
1107+
event = self.stream.fetchone()
1108+
self.assertEqual(event.event_type,163)
1109+
self.assertEqual(event.gtid_list[0].gtid, '0-1-15')
1110+
10781111

10791112

10801113
class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase):

0 commit comments

Comments
 (0)