14
14
QueryEvent , RotateEvent , FormatDescriptionEvent ,
15
15
XidEvent , GtidEvent , StopEvent , XAPrepareEvent ,
16
16
BeginLoadQueryEvent , ExecuteLoadQueryEvent ,
17
- HeartbeatLogEvent , NotImplementedEvent ,
18
- MariadbGtidEvent , RandEvent )
17
+ HeartbeatLogEvent , NotImplementedEvent , MariadbGtidEvent ,
18
+ MariadbAnnotateRowsEvent , RandEvent )
19
19
from .exceptions import BinLogNotEnabled
20
20
from .row_event import (
21
21
UpdateRowsEvent , WriteRowsEvent , DeleteRowsEvent , TableMapEvent )
@@ -142,6 +142,7 @@ def __init__(self, connection_settings, server_id,
142
142
fail_on_table_metadata_unavailable = False ,
143
143
slave_heartbeat = None ,
144
144
is_mariadb = False ,
145
+ annotate_rows_event = False ,
145
146
ignore_decode_errors = False ):
146
147
"""
147
148
Attributes:
@@ -167,7 +168,8 @@ def __init__(self, connection_settings, server_id,
167
168
skip_to_timestamp: Ignore all events until reaching specified
168
169
timestamp.
169
170
report_slave: Report slave in SHOW SLAVE HOSTS.
170
- slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
171
+ slave_uuid: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or
172
+ SHOW REPLICAS(MySQL 8.0.22+) depends on your MySQL version.
171
173
fail_on_table_metadata_unavailable: Should raise exception if we
172
174
can't get table information on
173
175
row_events
@@ -179,6 +181,8 @@ def __init__(self, connection_settings, server_id,
179
181
for semantics
180
182
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
181
183
to point to Mariadb specific GTID.
184
+ annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
185
+ used with 'is_mariadb'
182
186
ignore_decode_errors: If true, any decode errors encountered
183
187
when reading column data will be ignored.
184
188
"""
@@ -220,6 +224,7 @@ def __init__(self, connection_settings, server_id,
220
224
self .auto_position = auto_position
221
225
self .skip_to_timestamp = skip_to_timestamp
222
226
self .is_mariadb = is_mariadb
227
+ self .__annotate_rows_event = annotate_rows_event
223
228
224
229
if end_log_pos :
225
230
self .is_past_end_log_pos = False
@@ -302,7 +307,7 @@ def __connect_to_stream(self):
302
307
303
308
if self .slave_uuid :
304
309
cur = self ._stream_connection .cursor ()
305
- cur .execute ("set @slave_uuid= '%s'" % self .slave_uuid )
310
+ cur .execute ("SET @slave_uuid = %s, @replica_uuid = %s" , ( self .slave_uuid , self . slave_uuid ) )
306
311
cur .close ()
307
312
308
313
if self .slave_heartbeat :
@@ -332,67 +337,39 @@ def __connect_to_stream(self):
332
337
self ._register_slave ()
333
338
334
339
if not self .auto_position :
335
- # only when log_file and log_pos both provided, the position info is
336
- # valid, if not, get the current position from master
337
- if self .log_file is None or self .log_pos is None :
338
- cur = self ._stream_connection .cursor ()
339
- cur .execute ("SHOW MASTER STATUS" )
340
- master_status = cur .fetchone ()
341
- if master_status is None :
342
- raise BinLogNotEnabled ()
343
- self .log_file , self .log_pos = master_status [:2 ]
344
- cur .close ()
345
-
346
- prelude = struct .pack ('<i' , len (self .log_file ) + 11 ) \
347
- + bytes (bytearray ([COM_BINLOG_DUMP ]))
348
-
349
- if self .__resume_stream :
350
- prelude += struct .pack ('<I' , self .log_pos )
351
- else :
352
- prelude += struct .pack ('<I' , 4 )
353
-
354
- flags = 0
355
- if not self .__blocking :
356
- flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
357
- prelude += struct .pack ('<H' , flags )
358
-
359
- prelude += struct .pack ('<I' , self .__server_id )
360
- prelude += self .log_file .encode ()
361
- else :
362
340
if self .is_mariadb :
363
- # https://mariadb.com/kb/en/5-slave-registration/
364
- cur = self ._stream_connection .cursor ()
365
- cur .execute ("SET @slave_connect_state='%s'" % self .auto_position )
366
- cur .execute ("SET @slave_gtid_strict_mode=1" )
367
- cur .execute ("SET @slave_gtid_ignore_duplicates=0" )
368
- cur .close ()
369
-
370
- # https://mariadb.com/kb/en/com_binlog_dump/
371
- header_size = (
372
- 4 + # binlog pos
373
- 2 + # binlog flags
374
- 4 + # slave server_id,
375
- 4 # requested binlog file name , set it to empty
376
- )
377
-
378
- prelude = struct .pack ('<i' , header_size ) + bytes (bytearray ([COM_BINLOG_DUMP ]))
379
-
380
- # binlog pos
381
- prelude += struct .pack ('<i' , 4 )
341
+ prelude = self .__set_mariadb_settings ()
342
+ else :
343
+ # only when log_file and log_pos both provided, the position info is
344
+ # valid, if not, get the current position from master
345
+ if self .log_file is None or self .log_pos is None :
346
+ cur = self ._stream_connection .cursor ()
347
+ cur .execute ("SHOW MASTER STATUS" )
348
+ master_status = cur .fetchone ()
349
+ if master_status is None :
350
+ raise BinLogNotEnabled ()
351
+ self .log_file , self .log_pos = master_status [:2 ]
352
+ cur .close ()
353
+
354
+ prelude = struct .pack ('<i' , len (self .log_file ) + 11 ) \
355
+ + bytes (bytearray ([COM_BINLOG_DUMP ]))
356
+
357
+ if self .__resume_stream :
358
+ prelude += struct .pack ('<I' , self .log_pos )
359
+ else :
360
+ prelude += struct .pack ('<I' , 4 )
382
361
383
362
flags = 0
363
+
384
364
if not self .__blocking :
385
365
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
386
-
387
- # binlog flags
388
366
prelude += struct .pack ('<H' , flags )
389
367
390
- # server id (4 bytes)
391
368
prelude += struct .pack ('<I' , self .__server_id )
392
-
393
- # empty_binlog_name (4 bytes)
394
- prelude += b' \0 \0 \0 \0 '
395
-
369
+ prelude += self . log_file . encode ()
370
+ else :
371
+ if self . is_mariadb :
372
+ prelude = self . __set_mariadb_settings ()
396
373
else :
397
374
# Format for mysql packet master_auto_position
398
375
#
@@ -474,6 +451,48 @@ def __connect_to_stream(self):
474
451
self ._stream_connection ._next_seq_id = 1
475
452
self .__connected_stream = True
476
453
454
+ def __set_mariadb_settings (self ):
455
+ # https://mariadb.com/kb/en/5-slave-registration/
456
+ cur = self ._stream_connection .cursor ()
457
+ if self .auto_position != None :
458
+ cur .execute ("SET @slave_connect_state='%s'" % self .auto_position )
459
+ cur .execute ("SET @slave_gtid_strict_mode=1" )
460
+ cur .execute ("SET @slave_gtid_ignore_duplicates=0" )
461
+ cur .close ()
462
+
463
+ # https://mariadb.com/kb/en/com_binlog_dump/
464
+ header_size = (
465
+ 4 + # binlog pos
466
+ 2 + # binlog flags
467
+ 4 + # slave server_id,
468
+ 4 # requested binlog file name , set it to empty
469
+ )
470
+
471
+ prelude = struct .pack ('<i' , header_size ) + bytes (bytearray ([COM_BINLOG_DUMP ]))
472
+
473
+ # binlog pos
474
+ prelude += struct .pack ('<i' , 4 )
475
+
476
+ flags = 0
477
+
478
+ # Enable annotate rows event
479
+ if self .__annotate_rows_event :
480
+ flags |= 0x02 # BINLOG_SEND_ANNOTATE_ROWS_EVENT
481
+
482
+ if not self .__blocking :
483
+ flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
484
+
485
+ # binlog flags
486
+ prelude += struct .pack ('<H' , flags )
487
+
488
+ # server id (4 bytes)
489
+ prelude += struct .pack ('<I' , self .__server_id )
490
+
491
+ # empty_binlog_name (4 bytes)
492
+ prelude += b'\0 \0 \0 \0 '
493
+
494
+ return prelude
495
+
477
496
def fetchone (self ):
478
497
while True :
479
498
if self .end_log_pos and self .is_past_end_log_pos :
@@ -602,6 +621,7 @@ def _allowed_event_list(self, only_events, ignored_events,
602
621
HeartbeatLogEvent ,
603
622
NotImplementedEvent ,
604
623
MariadbGtidEvent ,
624
+ MariadbAnnotateRowsEvent ,
605
625
RandEvent
606
626
))
607
627
if ignored_events is not None :
0 commit comments