1
1
# -*- coding: utf-8 -*-
2
2
3
- import pymysql
4
3
import copy
5
-
6
- from pymysql .cursors import Cursor
7
-
8
- from pymysqlreplication import BinLogStreamReader
9
4
import os
10
5
import sys
6
+ import typing
7
+ import pymysql
8
+
9
+ from pymysqlreplication import BinLogStreamReader
11
10
12
11
if sys .version_info < (2 , 7 ):
13
12
import unittest2 as unittest
14
13
else :
15
14
import unittest
16
15
16
+ if typing .TYPE_CHECKING :
17
+ from pymysql .cursors import Cursor
18
+
17
19
base = unittest .TestCase
18
20
19
21
@@ -30,7 +32,7 @@ def setUp(self) -> None:
30
32
"port" : 3306 ,
31
33
"use_unicode" : True ,
32
34
"charset" : "utf8" ,
33
- "db" : "pymysqlreplication_test"
35
+ "db" : "pymysqlreplication_test" ,
34
36
}
35
37
36
38
self .conn_control = None
@@ -50,25 +52,27 @@ def getMySQLVersion(self) -> str:
50
52
"""Return the MySQL version of the server
51
53
If version is 5.6.10-log the result is 5.6.10
52
54
"""
53
- return self .execute ("SELECT VERSION()" ).fetchone ()[0 ].split ('-' )[0 ]
55
+ return self .execute ("SELECT VERSION()" ).fetchone ()[0 ].split ("-" )[0 ]
54
56
55
57
def isMySQL56AndMore (self ) -> bool :
56
- version = float (self .getMySQLVersion ().rsplit ('.' , 1 )[0 ])
58
+ version = float (self .getMySQLVersion ().rsplit ("." , 1 )[0 ])
57
59
if version >= 5.6 :
58
60
return True
59
61
return False
60
62
61
63
def isMySQL57 (self ) -> bool :
62
- version = float (self .getMySQLVersion ().rsplit ('.' , 1 )[0 ])
64
+ version = float (self .getMySQLVersion ().rsplit ("." , 1 )[0 ])
63
65
return version == 5.7
64
66
65
67
def isMySQL80AndMore (self ) -> bool :
66
- version = float (self .getMySQLVersion ().rsplit ('.' , 1 )[0 ])
68
+ version = float (self .getMySQLVersion ().rsplit ("." , 1 )[0 ])
67
69
return version >= 8.0
68
70
69
71
def isMariaDB (self ) -> bool :
70
72
if self .__is_mariaDB is None :
71
- self .__is_mariaDB = "MariaDB" in self .execute ("SELECT VERSION()" ).fetchone ()[0 ]
73
+ self .__is_mariaDB = (
74
+ "MariaDB" in self .execute ("SELECT VERSION()" ).fetchone ()[0 ]
75
+ )
72
76
return self .__is_mariaDB
73
77
74
78
@property
@@ -92,7 +96,7 @@ def execute(self, query: str) -> Cursor:
92
96
c = self .conn_control .cursor ()
93
97
c .execute (query )
94
98
return c
95
-
99
+
96
100
def execute_with_args (self , query : str , args ) -> Cursor :
97
101
c = self .conn_control .cursor ()
98
102
c .execute (query , args )
@@ -102,12 +106,13 @@ def resetBinLog(self) -> None:
102
106
self .execute ("RESET MASTER" )
103
107
if self .stream is not None :
104
108
self .stream .close ()
105
- self .stream = BinLogStreamReader (self .database , server_id = 1024 ,
106
- ignored_events = self .ignoredEvents ())
109
+ self .stream = BinLogStreamReader (
110
+ self .database , server_id = 1024 , ignored_events = self .ignoredEvents ()
111
+ )
107
112
108
113
def set_sql_mode (self ) -> None :
109
114
"""set sql_mode to test with same sql_mode (mysql 5.7 sql_mode default is changed)"""
110
- version = float (self .getMySQLVersion ().rsplit ('.' , 1 )[0 ])
115
+ version = float (self .getMySQLVersion ().rsplit ("." , 1 )[0 ])
111
116
if version == 5.7 :
112
117
self .execute ("SET @@sql_mode='NO_ENGINE_SUBSTITUTION'" )
113
118
@@ -118,7 +123,7 @@ def bin_log_format(self):
118
123
return result [0 ]
119
124
120
125
def bin_log_basename (self ) -> str :
121
- cursor : Cursor = self .execute (' SELECT @@log_bin_basename' )
126
+ cursor : Cursor = self .execute (" SELECT @@log_bin_basename" )
122
127
bin_log_basename = cursor .fetchone ()[0 ]
123
128
bin_log_basename = bin_log_basename .split ("/" )[- 1 ]
124
129
return bin_log_basename
@@ -134,7 +139,7 @@ def setUp(self) -> None:
134
139
"port" : int (os .environ .get ("MARIADB_10_6_PORT" ) or 3308 ),
135
140
"use_unicode" : True ,
136
141
"charset" : "utf8" ,
137
- "db" : "pymysqlreplication_test"
142
+ "db" : "pymysqlreplication_test" ,
138
143
}
139
144
140
145
self .conn_control = None
@@ -147,9 +152,9 @@ def setUp(self) -> None:
147
152
self .connect_conn_control (db )
148
153
self .stream = None
149
154
self .resetBinLog ()
150
-
155
+
151
156
def bin_log_basename (self ) -> str :
152
- cursor : Cursor = self .execute (' SELECT @@log_bin_basename' )
157
+ cursor : Cursor = self .execute (" SELECT @@log_bin_basename" )
153
158
bin_log_basename = cursor .fetchone ()[0 ]
154
159
bin_log_basename = bin_log_basename .split ("/" )[- 1 ]
155
160
return bin_log_basename
0 commit comments