diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 82b89974..020994c9 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -108,6 +108,43 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.last_committed = struct.unpack(" 8.0: + return True + return version == 8.0 and version_detail >= 10 + def isMySQL80AndMore(self): version = float(self.getMySQLVersion().rsplit(".", 1)[0]) return version >= 8.0 diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 3eadd7d4..8c3835c0 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -14,7 +14,6 @@ from pymysql.protocol import MysqlPacket import pytest - __all__ = [ "TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", @@ -1473,6 +1472,42 @@ def test_rows_query_log_event(self): self.assertIsInstance(event, RowsQueryLogEvent) +class TestGtidEvent(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestGtidEvent, self).setUp() + self.execute("SET SESSION binlog_rows_query_log_events=1") + + def tearDown(self): + self.execute("SET SESSION binlog_rows_query_log_events=0") + super(TestGtidEvent, self).tearDown() + + def test_gtid_event(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=[GtidEvent, FormatDescriptionEvent], + ) + if not self.isMySQL801AndMore(): + self.skipTest("Mysql version is under 8.0.1") + self.execute( + "CREATE TABLE IF NOT EXISTS test (id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255))" + ) + format_description_event = self.stream.fetchone() + gtid_event = self.stream.fetchone() + self.assertIsInstance(format_description_event, FormatDescriptionEvent) + self.assertIsInstance(gtid_event, GtidEvent) + self.assertIsInstance(gtid_event.event_type, int) + self.assertIsInstance(gtid_event.sid, bytes) + self.assertIsInstance(gtid_event.gno, int) + self.assertIsInstance(gtid_event.lt_type, int) + self.assertIsInstance(gtid_event.last_committed, int) + self.assertIsInstance(gtid_event.sequence_number, int) + self.assertEqual(gtid_event.sequence_number, 1) + self.assertIsInstance(gtid_event.immediate_commit_timestamp, bytes) + self.assertIsInstance(gtid_event.original_commit_timestamp, bytes) + + class TestLatin1(base.PyMySQLReplicationTestCase): def setUp(self): super().setUp(charset="latin1")