4
4
5
5
from pymysqlreplication import constants , event , row_event
6
6
7
- from typing import List , Tuple , Dict , Optional , Union
8
- from pymysql .connections import MysqlPacket
7
+ from typing import List , Tuple , Dict , Optional , Union , FrozenSet
8
+ from pymysql .connections import MysqlPacket , Connection
9
9
10
10
# Constants from PyMYSQL source code
11
11
NULL_COLUMN = 251
@@ -83,26 +83,26 @@ class BinLogPacketWrapper(object):
83
83
}
84
84
85
85
def __init__ (self ,
86
- from_packet ,
87
- table_map ,
88
- ctl_connection ,
89
- mysql_version ,
90
- use_checksum ,
91
- allowed_events ,
92
- only_tables ,
93
- ignored_tables ,
94
- only_schemas ,
95
- ignored_schemas ,
96
- freeze_schema ,
97
- fail_on_table_metadata_unavailable ,
98
- ignore_decode_errors ) :
86
+ from_packet : MysqlPacket ,
87
+ table_map : dict ,
88
+ ctl_connection : Connection ,
89
+ mysql_version : Tuple [ int , int , int ] ,
90
+ use_checksum : bool ,
91
+ allowed_events : FrozenSet [ event . BinLogEvent ] ,
92
+ only_tables : Optional [ List [ str ]] ,
93
+ ignored_tables : Optional [ List [ str ]] ,
94
+ only_schemas : Optional [ List [ str ]] ,
95
+ ignored_schemas : Optional [ List [ str ]] ,
96
+ freeze_schema : bool ,
97
+ fail_on_table_metadata_unavailable : bool ,
98
+ ignore_decode_errors : bool ) -> None :
99
99
# -1 because we ignore the ok byte
100
- self .read_bytes : int = 0
100
+ self .read_bytes = 0
101
101
# Used when we want to override a value in the data buffer
102
- self .__data_buffer : bytes = b''
102
+ self .__data_buffer = b''
103
103
104
- self .packet = from_packet
105
- self .charset = ctl_connection .charset
104
+ self .packet : MysqlPacket = from_packet
105
+ self .charset : str = ctl_connection .charset
106
106
107
107
# OK value
108
108
# timestamp
@@ -113,13 +113,13 @@ def __init__(self,
113
113
unpack = struct .unpack ('<cIBIIIH' , self .packet .read (20 ))
114
114
115
115
# Header
116
- self .timestamp = unpack [1 ]
117
- self .event_type = unpack [2 ]
118
- self .server_id = unpack [3 ]
119
- self .event_size = unpack [4 ]
116
+ self .timestamp : int = unpack [1 ]
117
+ self .event_type : int = unpack [2 ]
118
+ self .server_id : int = unpack [3 ]
119
+ self .event_size : int = unpack [4 ]
120
120
# position of the next event
121
- self .log_pos = unpack [5 ]
122
- self .flags = unpack [6 ]
121
+ self .log_pos : int = unpack [5 ]
122
+ self .flags : int = unpack [6 ]
123
123
124
124
# MySQL 5.6 and more if binlog-checksum = CRC32
125
125
if use_checksum :
@@ -132,16 +132,20 @@ def __init__(self,
132
132
133
133
if event_class not in allowed_events :
134
134
return
135
- self .event = event_class (self , event_size_without_header , table_map ,
136
- ctl_connection ,
137
- mysql_version = mysql_version ,
138
- only_tables = only_tables ,
139
- ignored_tables = ignored_tables ,
140
- only_schemas = only_schemas ,
141
- ignored_schemas = ignored_schemas ,
142
- freeze_schema = freeze_schema ,
143
- fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable ,
144
- ignore_decode_errors = ignore_decode_errors )
135
+ self .event : event .BinLogEvent = event_class (
136
+ self ,
137
+ event_size_without_header ,
138
+ table_map ,
139
+ ctl_connection ,
140
+ mysql_version = mysql_version ,
141
+ only_tables = only_tables ,
142
+ ignored_tables = ignored_tables ,
143
+ only_schemas = only_schemas ,
144
+ ignored_schemas = ignored_schemas ,
145
+ freeze_schema = freeze_schema ,
146
+ fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable ,
147
+ ignore_decode_errors = ignore_decode_errors
148
+ )
145
149
if self .event ._processed == False :
146
150
self .event = None
147
151
@@ -157,15 +161,15 @@ def read(self, size: int) -> Union[int, bytes]:
157
161
return data + self .packet .read (size - len (data ))
158
162
return self .packet .read (size )
159
163
160
- def unread (self , data : str ) :
164
+ def unread (self , data : Union [ int , bytes ]) -> None :
161
165
"""
162
166
Push again data in data buffer.
163
167
Use to extract a bit from a value and ensure that the rest of the code reads data normally
164
168
"""
165
169
self .read_bytes -= len (data )
166
170
self .__data_buffer += data
167
171
168
- def advance (self , size : int ):
172
+ def advance (self , size : int ) -> None :
169
173
size = int (size )
170
174
self .read_bytes += size
171
175
buffer_len = len (self .__data_buffer )
@@ -213,7 +217,7 @@ def __getattr__(self, key):
213
217
raise AttributeError ("%s instance has no attribute '%s'" %
214
218
(self .__class__ , key ))
215
219
216
- def read_int_be_by_size (self , size : int ):
220
+ def read_int_be_by_size (self , size : int ) -> int :
217
221
"""
218
222
Read a big endian integer values based on byte number
219
223
"""
@@ -359,7 +363,10 @@ def read_binary_json(self, size: int) -> Optional[str]:
359
363
360
364
return self .read_binary_json_type (t , length )
361
365
362
- def read_binary_json_type (self , t : int , length : int ) -> Optional [Union [bool , str ]]:
366
+ def read_binary_json_type (self , t : int , length : int ) \
367
+ -> Optional [Union [
368
+ Dict [Union [int , bytes ], Union [bool , str , None ]],
369
+ List [int ], bool , int ]]:
363
370
large = (t in (JSONB_TYPE_LARGE_OBJECT , JSONB_TYPE_LARGE_ARRAY ))
364
371
if t in (JSONB_TYPE_SMALL_OBJECT , JSONB_TYPE_LARGE_OBJECT ):
365
372
return self .read_binary_json_object (length - 1 , large )
@@ -392,7 +399,7 @@ def read_binary_json_type(self, t: int, length: int) -> Optional[Union[bool, str
392
399
393
400
raise ValueError ('Json type %d is not handled' % t )
394
401
395
- def read_binary_json_type_inlined (self , t : bytes , large : bool ) -> Optional [Union [bool , str ]]:
402
+ def read_binary_json_type_inlined (self , t : bytes , large : bool ) -> Optional [Union [bool , int ]]:
396
403
if t == JSONB_TYPE_LITERAL :
397
404
value = self .read_uint32 () if large else self .read_uint16 ()
398
405
if value == JSONB_LITERAL_NULL :
@@ -412,7 +419,8 @@ def read_binary_json_type_inlined(self, t: bytes, large: bool) -> Optional[Union
412
419
413
420
raise ValueError ('Json type %d is not handled' % t )
414
421
415
- def read_binary_json_object (self , length : int , large : bool ) -> Dict [str , str ]:
422
+ def read_binary_json_object (self , length : int , large : bool ) \
423
+ -> Dict [Union [int , bytes ], Union [bool , str , None ]]:
416
424
if large :
417
425
elements = self .read_uint32 ()
418
426
size = self .read_uint32 ()
@@ -450,7 +458,7 @@ def read_binary_json_object(self, length: int, large: bool) -> Dict[str, str]:
450
458
451
459
return out
452
460
453
- def read_binary_json_array (self , length : int , large : bool ) -> List :
461
+ def read_binary_json_array (self , length : int , large : bool ) -> List [ int ] :
454
462
if large :
455
463
elements = self .read_uint32 ()
456
464
size = self .read_uint32 ()
@@ -465,7 +473,7 @@ def read_binary_json_array(self, length: int, large: bool) -> List:
465
473
read_offset_or_inline (self , large )
466
474
for _ in range (elements )]
467
475
468
- def _read (x ) :
476
+ def _read (x : Tuple [ int , Optional [ int ], Optional [ Union [ bool , str ]]]) -> int :
469
477
if x [1 ] is None :
470
478
return x [2 ]
471
479
return self .read_binary_json_type (x [0 ], length )
0 commit comments