5
5
import binascii
6
6
from copy import deepcopy
7
7
from io import BytesIO
8
+ from typing import List , Optional , Tuple , Union , Set
8
9
9
- def overlap (i1 , i2 ):
10
+
11
+ def overlap (i1 : Tuple [int , int ], i2 : Tuple [int , int ]) -> bool :
10
12
return i1 [0 ] < i2 [1 ] and i1 [1 ] > i2 [0 ]
11
13
12
- def contains (i1 , i2 ) :
14
+ def contains (i1 : Tuple [ int , int ], i2 : Tuple [ int , int ]) -> bool :
13
15
return i2 [0 ] >= i1 [0 ] and i2 [1 ] <= i1 [1 ]
14
16
15
17
class Gtid (object ):
16
- """A mysql GTID is composed of a server-id and a set of right-open
18
+ """
19
+ A mysql GTID is composed of a server-id and a set of right-open
17
20
intervals [a,b), and represent all transactions x that happened on
18
21
server SID such as
19
22
@@ -49,7 +52,7 @@ class Gtid(object):
49
52
Exception: Adding a Gtid with a different SID.
50
53
"""
51
54
@staticmethod
52
- def parse_interval (interval ) :
55
+ def parse_interval (interval : str ) -> Tuple [ int , int ] :
53
56
"""
54
57
We parse a human-generated string here. So our end value b
55
58
is incremented to conform to the internal representation format.
@@ -65,8 +68,9 @@ def parse_interval(interval):
65
68
return (a , b + 1 )
66
69
67
70
@staticmethod
68
- def parse (gtid ):
69
- """Parse a GTID from mysql textual format.
71
+ def parse (gtid : str ) -> Tuple [str , List [Tuple [int , int ]]]:
72
+ """
73
+ Parse a GTID from mysql textual format.
70
74
71
75
Raises:
72
76
- ValueError: if GTID format is incorrect.
@@ -84,15 +88,15 @@ def parse(gtid):
84
88
85
89
return (sid , intervals_parsed )
86
90
87
- def __add_interval (self , itvl ) :
91
+ def __add_interval (self , itvl : Tuple [ int , int ]) -> None :
88
92
"""
89
93
Use the internal representation format and add it
90
94
to our intervals, merging if required.
91
95
92
96
Raises:
93
97
Exception: if Malformated interval or Overlapping interval
94
98
"""
95
- new = []
99
+ new : List [ Tuple [ int , int ]] = []
96
100
97
101
if itvl [0 ] > itvl [1 ]:
98
102
raise Exception ('Malformed interval %s' % (itvl ,))
@@ -114,11 +118,13 @@ def __add_interval(self, itvl):
114
118
115
119
self .intervals = sorted (new + [itvl ])
116
120
117
- def __sub_interval (self , itvl ):
118
- """Using the internal representation, remove an interval
121
+ def __sub_interval (self , itvl : Tuple [int , int ]) -> None :
122
+ """
123
+ Using the internal representation, remove an interval
119
124
120
- Raises: Exception if itvl malformated"""
121
- new = []
125
+ Raises: Exception if itvl malformated
126
+ """
127
+ new : List [Tuple [int , int ]] = []
122
128
123
129
if itvl [0 ] > itvl [1 ]:
124
130
raise Exception ('Malformed interval %s' % (itvl ,))
@@ -139,8 +145,9 @@ def __sub_interval(self, itvl):
139
145
140
146
self .intervals = new
141
147
142
- def __contains__ (self , other ):
143
- """Test if other is contained within self.
148
+ def __contains__ (self , other : 'Gtid' ) -> bool :
149
+ """
150
+ Test if other is contained within self.
144
151
First we compare sid they must be equals.
145
152
146
153
Then we search if intervals from other are contained within
@@ -152,22 +159,22 @@ def __contains__(self, other):
152
159
return all (any (contains (me , them ) for me in self .intervals )
153
160
for them in other .intervals )
154
161
155
- def __init__ (self , gtid , sid = None , intervals = []):
156
- if sid :
157
- intervals = intervals
158
- else :
162
+ def __init__ (self , gtid : str , sid : Optional [str ] = None , intervals : Optional [List [Tuple [int , int ]]] = None ) -> None :
163
+ if sid is None :
159
164
sid , intervals = Gtid .parse (gtid )
160
165
161
166
self .sid = sid
162
167
self .intervals = []
163
168
for itvl in intervals :
164
169
self .__add_interval (itvl )
165
170
166
- def __add__ (self , other ):
167
- """Include the transactions of this gtid.
171
+ def __add__ (self , other : 'Gtid' ) -> 'Gtid' :
172
+ """
173
+ Include the transactions of this gtid.
168
174
169
175
Raises:
170
- Exception: if the attempted merge has different SID"""
176
+ Exception: if the attempted merge has different SID
177
+ """
171
178
if self .sid != other .sid :
172
179
raise Exception ('Attempt to merge different SID'
173
180
'%s != %s' % (self .sid , other .sid ))
@@ -179,9 +186,10 @@ def __add__(self, other):
179
186
180
187
return result
181
188
182
- def __sub__ (self , other ):
183
- """Remove intervals. Do not raise, if different SID simply
184
- ignore"""
189
+ def __sub__ (self , other : 'Gtid' ) -> 'Gtid' :
190
+ """
191
+ Remove intervals. Do not raise, if different SID simply ignore
192
+ """
185
193
result = deepcopy (self )
186
194
if self .sid != other .sid :
187
195
return result
@@ -191,27 +199,30 @@ def __sub__(self, other):
191
199
192
200
return result
193
201
194
- def __str__ (self ):
195
- """We represent the human value here - a single number
196
- for one transaction, or a closed interval (decrementing b)"""
202
+ def __str__ (self ) -> str :
203
+ """
204
+ We represent the human value here - a single number
205
+ for one transaction, or a closed interval (decrementing b)
206
+ """
197
207
return '%s:%s' % (self .sid ,
198
208
':' .join (('%d-%d' % (x [0 ], x [1 ]- 1 )) if x [0 ] + 1 != x [1 ]
199
209
else str (x [0 ])
200
210
for x in self .intervals ))
201
211
202
- def __repr__ (self ):
212
+ def __repr__ (self ) -> str :
203
213
return '<Gtid "%s">' % self
204
214
205
215
@property
206
- def encoded_length (self ):
216
+ def encoded_length (self ) -> int :
207
217
return (16 + # sid
208
218
8 + # n_intervals
209
219
2 * # stop/start
210
220
8 * # stop/start mark encoded as int64
211
221
len (self .intervals ))
212
222
213
- def encode (self ):
214
- """Encode a Gtid in binary
223
+ def encode (self ) -> bytes :
224
+ """
225
+ Encode a Gtid in binary
215
226
Bytes are in **little endian**.
216
227
217
228
Format:
@@ -251,8 +262,9 @@ def encode(self):
251
262
return buffer
252
263
253
264
@classmethod
254
- def decode (cls , payload ):
255
- """Decode from binary a Gtid
265
+ def decode (cls , payload : BytesIO ) -> 'Gtid' :
266
+ """
267
+ Decode from binary a Gtid
256
268
257
269
:param BytesIO payload to decode
258
270
"""
@@ -281,35 +293,35 @@ def decode(cls, payload):
281
293
else '%d' % x
282
294
for x in intervals ])))
283
295
284
- def __eq__ (self , other ) :
296
+ def __eq__ (self , other : 'Gtid' ) -> bool :
285
297
if other .sid != self .sid :
286
298
return False
287
299
return self .intervals == other .intervals
288
300
289
- def __lt__ (self , other ) :
301
+ def __lt__ (self , other : 'Gtid' ) -> bool :
290
302
if other .sid != self .sid :
291
303
return self .sid < other .sid
292
304
return self .intervals < other .intervals
293
305
294
- def __le__ (self , other ) :
306
+ def __le__ (self , other : 'Gtid' ) -> bool :
295
307
if other .sid != self .sid :
296
308
return self .sid <= other .sid
297
309
return self .intervals <= other .intervals
298
310
299
- def __gt__ (self , other ) :
311
+ def __gt__ (self , other : 'Gtid' ) -> bool :
300
312
if other .sid != self .sid :
301
313
return self .sid > other .sid
302
314
return self .intervals > other .intervals
303
315
304
- def __ge__ (self , other ) :
316
+ def __ge__ (self , other : 'Gtid' ) -> bool :
305
317
if other .sid != self .sid :
306
318
return self .sid >= other .sid
307
319
return self .intervals >= other .intervals
308
320
309
321
310
322
class GtidSet (object ):
311
323
"""Represents a set of Gtid"""
312
- def __init__ (self , gtid_set ) :
324
+ def __init__ (self , gtid_set : Optional [ Union [ None , str , Set [ Gtid ], List [ Gtid ], Gtid ]] = None ) -> None :
313
325
"""
314
326
Construct a GtidSet initial state depends of the nature of `gtid_set` param.
315
327
@@ -325,21 +337,21 @@ def __init__(self, gtid_set):
325
337
- Exception: if Gtid interval are either malformated or overlapping
326
338
"""
327
339
328
- def _to_gtid (element ) :
340
+ def _to_gtid (element : str ) -> Gtid :
329
341
if isinstance (element , Gtid ):
330
342
return element
331
343
return Gtid (element .strip (' \n ' ))
332
344
333
345
if not gtid_set :
334
- self .gtids = []
346
+ self .gtids : List [ Gtid ] = []
335
347
elif isinstance (gtid_set , (list , set )):
336
- self .gtids = [_to_gtid (x ) for x in gtid_set ]
348
+ self .gtids : List [ Gtid ] = [_to_gtid (x ) for x in gtid_set ]
337
349
else :
338
- self .gtids = [Gtid (x .strip (' \n ' )) for x in gtid_set .split (',' )]
350
+ self .gtids : List [ Gtid ] = [Gtid (x .strip (' \n ' )) for x in gtid_set .split (',' )]
339
351
340
- def merge_gtid (self , gtid ) :
352
+ def merge_gtid (self , gtid : Gtid ) -> None :
341
353
"""Insert a Gtid in current GtidSet."""
342
- new_gtids = []
354
+ new_gtids : List [ Gtid ] = []
343
355
for existing in self .gtids :
344
356
if existing .sid == gtid .sid :
345
357
new_gtids .append (existing + gtid )
@@ -349,7 +361,7 @@ def merge_gtid(self, gtid):
349
361
new_gtids .append (gtid )
350
362
self .gtids = new_gtids
351
363
352
- def __contains__ (self , other ) :
364
+ def __contains__ (self , other : Union [ 'GtidSet' , Gtid ]) -> bool :
353
365
"""
354
366
Test if self contains other, could be a GtidSet or a Gtid.
355
367
@@ -363,7 +375,7 @@ def __contains__(self, other):
363
375
return any (other in x for x in self .gtids )
364
376
raise NotImplementedError
365
377
366
- def __add__ (self , other ) :
378
+ def __add__ (self , other : Union [ 'GtidSet' , Gtid ]) -> 'GtidSet' :
367
379
"""
368
380
Merge current instance with an other GtidSet or with a Gtid alone.
369
381
@@ -384,22 +396,23 @@ def __add__(self, other):
384
396
385
397
raise NotImplementedError
386
398
387
- def __str__ (self ):
399
+ def __str__ (self ) -> str :
388
400
"""
389
401
Returns a comma separated string of gtids.
390
402
"""
391
403
return ',' .join (str (x ) for x in self .gtids )
392
404
393
- def __repr__ (self ):
405
+ def __repr__ (self ) -> str :
394
406
return '<GtidSet %r>' % self .gtids
395
407
396
408
@property
397
- def encoded_length (self ):
409
+ def encoded_length (self ) -> int :
398
410
return (8 + # n_sids
399
411
sum (x .encoded_length for x in self .gtids ))
400
412
401
- def encoded (self ):
402
- """Encode a GtidSet in binary
413
+ def encoded (self ) -> bytes :
414
+ """
415
+ Encode a GtidSet in binary
403
416
Bytes are in **little endian**.
404
417
405
418
- `n_sid`: u64 is the number of Gtid to read
@@ -421,8 +434,9 @@ def encoded(self):
421
434
encode = encoded
422
435
423
436
@classmethod
424
- def decode (cls , payload ):
425
- """Decode a GtidSet from binary.
437
+ def decode (cls , payload : BytesIO ) -> 'GtidSet' :
438
+ """
439
+ Decode a GtidSet from binary.
426
440
427
441
:param BytesIO payload to decode
428
442
"""
@@ -432,5 +446,5 @@ def decode(cls, payload):
432
446
433
447
return cls ([Gtid .decode (payload ) for _ in range (0 , n_sid )])
434
448
435
- def __eq__ (self , other ) :
449
+ def __eq__ (self , other : 'GtidSet' ) -> bool :
436
450
return self .gtids == other .gtids
0 commit comments