18
18
from pymysqlreplication .exceptions import TableMetadataUnavailableError
19
19
from pymysqlreplication .constants .BINLOG import *
20
20
from pymysqlreplication .row_event import *
21
+ from pathlib import Path
21
22
22
23
__all__ = ["TestBasicBinLogStreamReader" , "TestMultipleRowBinLogStreamReader" , "TestCTLConnectionSettings" , "TestGtidBinLogStreamReader" , "TestMariadbBinlogStreamReader" , "TestStatementConnectionSetting" ]
23
24
@@ -27,9 +28,9 @@ def ignoredEvents(self):
27
28
return [GtidEvent ]
28
29
29
30
def test_allowed_event_list (self ):
30
- self .assertEqual (len (self .stream ._allowed_event_list (None , None , False )), 18 )
31
- self .assertEqual (len (self .stream ._allowed_event_list (None , None , True )), 17 )
32
- self .assertEqual (len (self .stream ._allowed_event_list (None , [RotateEvent ], False )), 17 )
31
+ self .assertEqual (len (self .stream ._allowed_event_list (None , None , False )), 19 )
32
+ self .assertEqual (len (self .stream ._allowed_event_list (None , None , True )), 18 )
33
+ self .assertEqual (len (self .stream ._allowed_event_list (None , [RotateEvent ], False )), 18 )
33
34
self .assertEqual (len (self .stream ._allowed_event_list ([RotateEvent ], None , False )), 1 )
34
35
35
36
def test_read_query_event (self ):
@@ -1036,6 +1037,42 @@ def test_annotate_rows_event(self):
1036
1037
self .assertEqual (event .sql_statement ,insert_query )
1037
1038
self .assertIsInstance (event ,MariadbAnnotateRowsEvent )
1038
1039
1040
+ def test_start_encryption_event (self ):
1041
+ query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
1042
+ self .execute (query )
1043
+ query = "INSERT INTO test (data) VALUES('Hello World')"
1044
+ self .execute (query )
1045
+ self .execute ("COMMIT" )
1046
+
1047
+ self .assertIsInstance (self .stream .fetchone (), RotateEvent )
1048
+ self .assertIsInstance (self .stream .fetchone (), FormatDescriptionEvent )
1049
+
1050
+ start_encryption_event = self .stream .fetchone ()
1051
+ self .assertIsInstance (start_encryption_event , MariadbStartEncryptionEvent )
1052
+
1053
+ schema = start_encryption_event .schema
1054
+ key_version = start_encryption_event .key_version
1055
+ nonce = start_encryption_event .nonce
1056
+
1057
+ from pathlib import Path
1058
+
1059
+ encryption_key_file_path = Path (__file__ ).parent .parent .parent
1060
+
1061
+ try :
1062
+ with open (f"{ encryption_key_file_path } /.mariadb/no_encryption_key.key" , "r" ) as key_file :
1063
+ first_line = key_file .readline ()
1064
+ key_version_from_key_file = int (first_line .split (";" )[0 ])
1065
+ except Exception as e :
1066
+ self .fail ("raised unexpected exception: {exception}" .format (exception = e ))
1067
+ finally :
1068
+ self .resetBinLog ()
1069
+
1070
+ # schema is always 1
1071
+ self .assertEqual (schema , 1 )
1072
+ self .assertEqual (key_version , key_version_from_key_file )
1073
+ self .assertEqual (type (nonce ), bytes )
1074
+ self .assertEqual (len (nonce ), 12 )
1075
+
1039
1076
class TestStatementConnectionSetting (base .PyMySQLReplicationTestCase ):
1040
1077
def setUp (self ):
1041
1078
super (TestStatementConnectionSetting , self ).setUp ()
@@ -1065,8 +1102,8 @@ def test_rand_event(self):
1065
1102
def tearDown (self ):
1066
1103
self .execute ("SET @@binlog_format='ROW'" )
1067
1104
self .assertEqual (self .bin_log_format (), "ROW" )
1068
- super (TestStatementConnectionSetting , self ).tearDown ()
1069
-
1105
+ super (TestStatementConnectionSetting , self ).tearDown ()
1106
+
1070
1107
1071
1108
if __name__ == "__main__" :
1072
1109
import unittest
0 commit comments