5
5
import sys
6
6
from pymysql .cursors import Cursor
7
7
import pymysql
8
+ from pymysql import Connection
9
+ from typing import Optional
8
10
9
11
from pymysqlreplication import BinLogStreamReader
10
12
@@ -22,7 +24,7 @@ def ignoredEvents(self) -> list:
22
24
23
25
def setUp (self ) -> None :
24
26
# default
25
- self .database = {
27
+ self .database : dict = {
26
28
"host" : os .environ .get ("MYSQL_5_7" ) or "localhost" ,
27
29
"user" : "root" ,
28
30
"passwd" : "" ,
@@ -32,18 +34,18 @@ def setUp(self) -> None:
32
34
"db" : "pymysqlreplication_test" ,
33
35
}
34
36
35
- self .conn_control = None
37
+ self .conn_control : Optional [ Connection ] = None
36
38
db = copy .copy (self .database )
37
39
db ["db" ] = None
38
40
self .connect_conn_control (db )
39
41
self .execute ("DROP DATABASE IF EXISTS pymysqlreplication_test" )
40
42
self .execute ("CREATE DATABASE pymysqlreplication_test" )
41
43
db = copy .copy (self .database )
42
44
self .connect_conn_control (db )
43
- self .stream = None
45
+ self .stream : Optional [ BinLogStreamReader ] = None
44
46
self .resetBinLog ()
45
47
self .isMySQL56AndMore ()
46
- self .__is_mariaDB = None
48
+ self .__is_mariaDB : Optional [ bool ] = None
47
49
48
50
def getMySQLVersion (self ) -> str :
49
51
"""Return the MySQL version of the server
@@ -67,7 +69,7 @@ def isMySQL80AndMore(self) -> bool:
67
69
68
70
def isMariaDB (self ) -> bool :
69
71
if self .__is_mariaDB is None :
70
- self .__is_mariaDB = (
72
+ self .__is_mariaDB : bool = (
71
73
"MariaDB" in self .execute ("SELECT VERSION()" ).fetchone ()[0 ]
72
74
)
73
75
return self .__is_mariaDB
@@ -81,13 +83,13 @@ def supportsGTID(self) -> bool:
81
83
def connect_conn_control (self , db ) -> None :
82
84
if self .conn_control is not None :
83
85
self .conn_control .close ()
84
- self .conn_control = pymysql .connect (** db )
86
+ self .conn_control : Connection = pymysql .connect (** db )
85
87
86
88
def tearDown (self ) -> None :
87
89
self .conn_control .close ()
88
- self .conn_control = None
90
+ self .conn_control : Optional [ Connection ] = None
89
91
self .stream .close ()
90
- self .stream = None
92
+ self .stream : Optional [ BinLogStreamReader ] = None
91
93
92
94
def execute (self , query : str ) -> Cursor :
93
95
c = self .conn_control .cursor ()
@@ -103,7 +105,7 @@ def resetBinLog(self) -> None:
103
105
self .execute ("RESET MASTER" )
104
106
if self .stream is not None :
105
107
self .stream .close ()
106
- self .stream = BinLogStreamReader (
108
+ self .stream : BinLogStreamReader = BinLogStreamReader (
107
109
self .database , server_id = 1024 , ignored_events = self .ignoredEvents ()
108
110
)
109
111
@@ -139,15 +141,15 @@ def setUp(self) -> None:
139
141
"db" : "pymysqlreplication_test" ,
140
142
}
141
143
142
- self .conn_control = None
144
+ self .conn_control : Optional [ Connection ] = None
143
145
db = copy .copy (self .database )
144
146
db ["db" ] = None
145
147
self .connect_conn_control (db )
146
148
self .execute ("DROP DATABASE IF EXISTS pymysqlreplication_test" )
147
149
self .execute ("CREATE DATABASE pymysqlreplication_test" )
148
150
db = copy .copy (self .database )
149
151
self .connect_conn_control (db )
150
- self .stream = None
152
+ self .stream : Optional [ BinLogStreamReader ] = None
151
153
self .resetBinLog ()
152
154
153
155
def bin_log_basename (self ) -> str :
0 commit comments