|
19 | 19 | from pymysqlreplication.constants.BINLOG import *
|
20 | 20 | from pymysqlreplication.row_event import *
|
21 | 21 |
|
22 |
| -__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader","TestMariadbBinlogStreamReader"] |
| 22 | +__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting"] |
23 | 23 |
|
24 | 24 |
|
25 | 25 | class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
|
26 | 26 | def ignoredEvents(self):
|
27 | 27 | return [GtidEvent]
|
28 | 28 |
|
29 | 29 | def test_allowed_event_list(self):
|
30 |
| - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 17) |
31 |
| - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 16) |
32 |
| - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 16) |
| 30 | + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 18) |
| 31 | + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 17) |
| 32 | + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 17) |
33 | 33 | self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
|
34 | 34 |
|
35 | 35 | def test_read_query_event(self):
|
@@ -1036,6 +1036,37 @@ def test_annotate_rows_event(self):
|
1036 | 1036 | self.assertEqual(event.sql_statement,insert_query)
|
1037 | 1037 | self.assertIsInstance(event,MariadbAnnotateRowsEvent)
|
1038 | 1038 |
|
| 1039 | +class TestStatementConnectionSetting(base.PyMySQLReplicationTestCase): |
| 1040 | + def setUp(self): |
| 1041 | + super(TestStatementConnectionSetting, self).setUp() |
| 1042 | + self.stream.close() |
| 1043 | + self.stream = BinLogStreamReader( |
| 1044 | + self.database, |
| 1045 | + server_id=1024, |
| 1046 | + only_events=(RandEvent, QueryEvent), |
| 1047 | + fail_on_table_metadata_unavailable=True |
| 1048 | + ) |
| 1049 | + self.execute("SET @@binlog_format='STATEMENT'") |
| 1050 | + |
| 1051 | + def test_rand_event(self): |
| 1052 | + self.execute("CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data INT NOT NULL, PRIMARY KEY (id))") |
| 1053 | + self.execute("INSERT INTO test (data) VALUES(RAND())") |
| 1054 | + self.execute("COMMIT") |
| 1055 | + |
| 1056 | + self.assertEqual(self.bin_log_format(), "STATEMENT") |
| 1057 | + self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
| 1058 | + self.assertIsInstance(self.stream.fetchone(), QueryEvent) |
| 1059 | + |
| 1060 | + expect_rand_event = self.stream.fetchone() |
| 1061 | + self.assertIsInstance(expect_rand_event, RandEvent) |
| 1062 | + self.assertEqual(type(expect_rand_event.seed1), int) |
| 1063 | + self.assertEqual(type(expect_rand_event.seed2), int) |
| 1064 | + |
| 1065 | + def tearDown(self): |
| 1066 | + self.execute("SET @@binlog_format='ROW'") |
| 1067 | + self.assertEqual(self.bin_log_format(), "ROW") |
| 1068 | + super(TestStatementConnectionSetting, self).tearDown() |
| 1069 | + |
1039 | 1070 |
|
1040 | 1071 | if __name__ == "__main__":
|
1041 | 1072 | import unittest
|
|
0 commit comments