|
17 | 17 | from pymysqlreplication.constants.BINLOG import *
|
18 | 18 | from pymysqlreplication.row_event import *
|
19 | 19 |
|
20 |
| -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"] |
| 20 | +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader","TestMariadbBinlogStreaReader"] |
21 | 21 |
|
22 | 22 |
|
23 | 23 | class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
|
@@ -1002,6 +1002,25 @@ def test_parsing(self):
|
1002 | 1002 | gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac:1-:1")
|
1003 | 1003 | gtid = Gtid("57b70f4e-20d3-11e5-a393-4a63946f7eac::1")
|
1004 | 1004 |
|
| 1005 | +class TestMariadbBinlogStreaReader(base.PyMySQLReplicationTestCase): |
| 1006 | + def setUp(self): |
| 1007 | + super(TestMariadbBinlogStreaReader,self).setUp() |
| 1008 | + if not self.isMariaDB(): |
| 1009 | + raise unittest.SkipTest("Skipping test: Not a MariaDB instance") |
| 1010 | + |
| 1011 | + def test_gtid_list_event(self): |
| 1012 | + event = self.stream.fetchone() |
| 1013 | + self.assertEqual(event.position, 4) |
| 1014 | + |
| 1015 | + #FormatDescriptionEvent |
| 1016 | + event = self.stream.fetchone() |
| 1017 | + self.assertEqual(event.event_type,15) |
| 1018 | + self.assertIsInstance(event,FormatDescriptionEvent) |
| 1019 | + |
| 1020 | + #MariadbAnnotateRowsEvent |
| 1021 | + event = self.stream.fetchone() |
| 1022 | + self.assertEqual(event.event_type,163) |
| 1023 | + self.assertIsInstance(event,MariadbGtidListEvent) |
1005 | 1024 |
|
1006 | 1025 | if __name__ == "__main__":
|
1007 | 1026 | import unittest
|
|
0 commit comments