5
5
6
6
import pymysql
7
7
from pymysql .constants .COMMAND import COM_BINLOG_DUMP , COM_REGISTER_SLAVE
8
- from pymysql .cursors import DictCursor , Cursor
8
+ from pymysql .cursors import Cursor , DictCursor
9
+ from pymysql .connections import Connection , MysqlPacket
9
10
10
11
from .constants .BINLOG import TABLE_MAP_EVENT , ROTATE_EVENT , FORMAT_DESCRIPTION_EVENT
11
12
from .event import (
21
22
from .row_event import (
22
23
UpdateRowsEvent , WriteRowsEvent , DeleteRowsEvent , TableMapEvent )
23
24
from typing import ByteString , Union , Optional , List , Tuple , Dict , Any , Iterator , FrozenSet , Type
24
- from pymysql .connections import Connection
25
25
26
26
try :
27
27
from pymysql .constants .COMMAND import COM_BINLOG_DUMP_GTID
36
36
37
37
38
38
class ReportSlave (object ):
39
- """Represent the values that you may report when connecting as a slave
40
- to a master. SHOW SLAVE HOSTS related"""
39
+ """
40
+ Represent the values that you may report
41
+ when connecting as a slave to a master. SHOW SLAVE HOSTS related.
42
+ """
41
43
42
44
def __init__ (self , value : Union [str , Tuple [str , str , str , int ]]) -> None :
43
45
"""
@@ -74,9 +76,9 @@ def __repr__(self) -> str:
74
76
75
77
def encoded (self , server_id : int , master_id : int = 0 ) -> ByteString :
76
78
"""
77
- server_id: the slave server-id
78
- master_id: usually 0. Appears as "master id" in SHOW SLAVE HOSTS
79
- on the master. Unknown what else it impacts.
79
+ :ivar server_id: int - the slave server-id
80
+ :ivar master_id: int - usually 0. Appears as "master id" in SHOW SLAVE HOSTS on the master.
81
+ Unknown what else it impacts.
80
82
"""
81
83
82
84
# 1 [15] COM_REGISTER_SLAVE
@@ -124,9 +126,10 @@ def encoded(self, server_id: int, master_id: int = 0) -> ByteString:
124
126
125
127
126
128
class BinLogStreamReader (object ):
127
- """Connect to replication stream and read event
128
129
"""
129
- report_slave : Optional [ReportSlave ] = None
130
+ Connect to replication stream and read event
131
+ """
132
+ report_slave : Optional [Union [str , Tuple [str , str , str , int ]]] = None
130
133
131
134
def __init__ (self , connection_settings : Dict , server_id : int ,
132
135
ctl_connection_settings : Optional [Dict ] = None , resume_stream : bool = False ,
@@ -137,7 +140,7 @@ def __init__(self, connection_settings: Dict, server_id: int,
137
140
only_tables : Optional [List [str ]] = None , ignored_tables : Optional [List [str ]] = None ,
138
141
only_schemas : Optional [List [str ]] = None , ignored_schemas : Optional [List [str ]] = None ,
139
142
freeze_schema : bool = False , skip_to_timestamp : Optional [float ] = None ,
140
- report_slave : Optional [ReportSlave ] = None , slave_uuid : Optional [str ] = None ,
143
+ report_slave : Optional [Union [ str , Tuple [ str , str , str , int ]] ] = None , slave_uuid : Optional [str ] = None ,
141
144
pymysql_wrapper : Optional [Connection ] = None ,
142
145
fail_on_table_metadata_unavailable : bool = False ,
143
146
slave_heartbeat : Optional [float ] = None ,
@@ -146,44 +149,42 @@ def __init__(self, connection_settings: Dict, server_id: int,
146
149
ignore_decode_errors : bool = False ) -> None :
147
150
"""
148
151
Attributes:
149
- ctl_connection_settings: Connection settings for cluster holding
152
+ ctl_connection_settings[Dict] : Connection settings for cluster holding
150
153
schema information
151
- resume_stream: Start for event from position or the latest event of
154
+ resume_stream[bool] : Start for event from position or the latest event of
152
155
binlog or from older available event
153
- blocking: When master has finished reading/sending binlog it will
156
+ blocking[bool] : When master has finished reading/sending binlog it will
154
157
send EOF instead of blocking connection.
155
- only_events: Array of allowed events
156
- ignored_events: Array of ignored events
157
- log_file: Set replication start log file
158
- log_pos: Set replication start log pos (resume_stream should be
158
+ only_events[List[str]] : Array of allowed events
159
+ ignored_events[List[str]] : Array of ignored events
160
+ log_file[str] : Set replication start log file
161
+ log_pos[int] : Set replication start log pos (resume_stream should be
159
162
true)
160
- end_log_pos: Set replication end log pos
161
- auto_position: Use master_auto_position gtid to set position
162
- only_tables: An array with the tables you want to watch (only works
163
+ end_log_pos[int] : Set replication end log pos
164
+ auto_position[str] : Use master_auto_position gtid to set position
165
+ only_tables[List[str]] : An array with the tables you want to watch (only works
163
166
in binlog_format ROW)
164
- ignored_tables: An array with the tables you want to skip
165
- only_schemas: An array with the schemas you want to watch
166
- ignored_schemas: An array with the schemas you want to skip
167
- freeze_schema: If true do not support ALTER TABLE. It's faster.
168
- skip_to_timestamp: Ignore all events until reaching specified
169
- timestamp.
170
- report_slave: Report slave in SHOW SLAVE HOSTS.
171
- slave_uuid: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or
167
+ ignored_tables[List[str]]: An array with the tables you want to skip
168
+ only_schemas[List[str]]: An array with the schemas you want to watch
169
+ ignored_schemas[List[str]]: An array with the schemas you want to skip
170
+ freeze_schema[bool]: If true do not support ALTER TABLE. It's faster.
171
+ skip_to_timestamp[float]: Ignore all events until reaching specified timestamp.
172
+ report_slave[ReportSlave]: Report slave in SHOW SLAVE HOSTS.
173
+ slave_uuid[str]: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or
172
174
SHOW REPLICAS(MySQL 8.0.22+) depends on your MySQL version.
173
- fail_on_table_metadata_unavailable: Should raise exception if we
174
- can't get table information on
175
- row_events
176
- slave_heartbeat: (seconds) Should master actively send heartbeat on
175
+ fail_on_table_metadata_unavailable[bool]: Should raise exception if we
176
+ can't get table information on row_events
177
+ slave_heartbeat[float]: (seconds) Should master actively send heartbeat on
177
178
connection. This also reduces traffic in GTID
178
179
replication on replication resumption (in case
179
180
many event to skip in binlog). See
180
181
MASTER_HEARTBEAT_PERIOD in mysql documentation
181
182
for semantics
182
- is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
183
+ is_mariadb[bool] : Flag to indicate it's a MariaDB server, used with auto_position
183
184
to point to Mariadb specific GTID.
184
- annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
185
+ annotate_rows_event[bool] : Parameter value to enable annotate rows event in mariadb,
185
186
used with 'is_mariadb'
186
- ignore_decode_errors: If true, any decode errors encountered
187
+ ignore_decode_errors[bool] : If true, any decode errors encountered
187
188
when reading column data will be ignored.
188
189
"""
189
190
@@ -230,12 +231,12 @@ def __init__(self, connection_settings: Dict, server_id: int,
230
231
self .is_past_end_log_pos : bool = False
231
232
232
233
if report_slave :
233
- self .report_slave : Optional [ ReportSlave ] = ReportSlave (report_slave )
234
+ self .report_slave : ReportSlave = ReportSlave (report_slave )
234
235
self .slave_uuid : Optional [str ] = slave_uuid
235
236
self .slave_heartbeat : Optional [float ] = slave_heartbeat
236
237
237
238
if pymysql_wrapper :
238
- self .pymysql_wrapper : Optional [ Connection ] = pymysql_wrapper
239
+ self .pymysql_wrapper : Connection = pymysql_wrapper
239
240
else :
240
241
self .pymysql_wrapper : Optional [Union [Connection , Type [Connection ]]] = pymysql .connect
241
242
self .mysql_version : Tuple = (0 , 0 , 0 )
@@ -262,7 +263,9 @@ def __connect_to_ctl(self) -> None:
262
263
self .__connected_ctl : bool = True
263
264
264
265
def __checksum_enabled (self ) -> bool :
265
- """Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
266
+ """
267
+ Return True if binlog-checksum = CRC32. Only for MySQL > 5.6
268
+ """
266
269
cur : Cursor = self ._stream_connection .cursor ()
267
270
cur .execute ("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'" )
268
271
result : Optional [Tuple [str , str ]] = cur .fetchone ()
@@ -295,19 +298,19 @@ def __connect_to_stream(self) -> None:
295
298
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
296
299
# server_id (4) -- server id of this slave
297
300
# log_file (string.EOF) -- filename of the binlog on the master
298
- self ._stream_connection = self .pymysql_wrapper (** self .__connection_settings )
301
+ self ._stream_connection : Connection = self .pymysql_wrapper (** self .__connection_settings )
299
302
300
303
self .__use_checksum : bool = self .__checksum_enabled ()
301
304
302
305
# If checksum is enabled we need to inform the server about the that
303
306
# we support it
304
307
if self .__use_checksum :
305
- cur = self ._stream_connection .cursor ()
308
+ cur : Cursor = self ._stream_connection .cursor ()
306
309
cur .execute ("SET @master_binlog_checksum= @@global.binlog_checksum" )
307
310
cur .close ()
308
311
309
312
if self .slave_uuid :
310
- cur = self ._stream_connection .cursor ()
313
+ cur : Cursor = self ._stream_connection .cursor ()
311
314
cur .execute ("SET @slave_uuid = %s, @replica_uuid = %s" , (self .slave_uuid , self .slave_uuid ))
312
315
cur .close ()
313
316
@@ -339,14 +342,14 @@ def __connect_to_stream(self) -> None:
339
342
340
343
if not self .auto_position :
341
344
if self .is_mariadb :
342
- prelude = self .__set_mariadb_settings ()
345
+ prelude : ByteString = self .__set_mariadb_settings ()
343
346
else :
344
347
# only when log_file and log_pos both provided, the position info is
345
348
# valid, if not, get the current position from master
346
349
if self .log_file is None or self .log_pos is None :
347
350
cur : Cursor = self ._stream_connection .cursor ()
348
351
cur .execute ("SHOW MASTER STATUS" )
349
- master_status : Optional [Tuple [str , int , ... ]] = cur .fetchone ()
352
+ master_status : Optional [Tuple [str , int , Any ]] = cur .fetchone ()
350
353
if master_status is None :
351
354
raise BinLogNotEnabled ()
352
355
self .log_file , self .log_pos = master_status [:2 ]
@@ -507,9 +510,9 @@ def fetchone(self) -> Union[BinLogPacketWrapper, None]:
507
510
508
511
try :
509
512
if pymysql .__version__ < LooseVersion ("0.6" ):
510
- pkt = self ._stream_connection .read_packet ()
513
+ pkt : MysqlPacket = self ._stream_connection .read_packet ()
511
514
else :
512
- pkt = self ._stream_connection ._read_packet ()
515
+ pkt : MysqlPacket = self ._stream_connection ._read_packet ()
513
516
except pymysql .OperationalError as error :
514
517
code , message = error .args
515
518
if code in MYSQL_EXPECTED_ERROR_CODES :
0 commit comments