@@ -290,7 +290,6 @@ def close(self):
290
290
if self .__connected_ctl :
291
291
# break reference cycle between stream reader and underlying
292
292
# mysql connection object
293
- self ._ctl_connection ._get_table_information = None
294
293
self ._ctl_connection .close ()
295
294
self .__connected_ctl = False
296
295
@@ -301,9 +300,9 @@ def __connect_to_ctl(self):
301
300
self ._ctl_connection_settings ["cursorclass" ] = DictCursor
302
301
self ._ctl_connection_settings ["autocommit" ] = True
303
302
self ._ctl_connection = self .pymysql_wrapper (** self ._ctl_connection_settings )
304
- self ._ctl_connection ._get_table_information = self .__get_table_information
305
303
self ._ctl_connection ._get_dbms = self .__get_dbms
306
304
self .__connected_ctl = True
305
+ self .__check_optional_meta_data ()
307
306
308
307
def __checksum_enabled (self ):
309
308
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
@@ -548,6 +547,28 @@ def __set_mariadb_settings(self):
548
547
549
548
return prelude
550
549
550
+ def __check_optional_meta_data (self ):
551
+ cur = self ._ctl_connection .cursor ()
552
+ cur .execute ("SHOW VARIABLES LIKE 'BINLOG_ROW_METADATA';" )
553
+ value = cur .fetchone ()
554
+ if value is None : # BinLog Variable Not exist It means Not Supported Version
555
+ logging .log (
556
+ logging .WARN ,
557
+ """
558
+ Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions,
559
+ use python-mysql-replication version Before 1.0 version """ ,
560
+ )
561
+ else :
562
+ value = value .get ("Value" , "" )
563
+ if value .upper () != "FULL" :
564
+ logging .log (
565
+ logging .WARN ,
566
+ """
567
+ Setting The Variable Value BINLOG_ROW_METADATA = FULL
568
+ By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
569
+ """ ,
570
+ )
571
+
551
572
def fetchone (self ):
552
573
while True :
553
574
if self .end_log_pos and self .is_past_end_log_pos :
@@ -709,38 +730,6 @@ def _allowed_event_list(
709
730
pass
710
731
return frozenset (events )
711
732
712
- def __get_table_information (self , schema , table ):
713
- for i in range (1 , 3 ):
714
- try :
715
- if not self .__connected_ctl :
716
- self .__connect_to_ctl ()
717
-
718
- cur = self ._ctl_connection .cursor ()
719
- cur .execute (
720
- """
721
- SELECT
722
- COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
723
- COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
724
- DATA_TYPE, CHARACTER_OCTET_LENGTH
725
- FROM
726
- information_schema.columns
727
- WHERE
728
- table_schema = %s AND table_name = %s
729
- """ ,
730
- (schema , table ),
731
- )
732
- result = sorted (cur .fetchall (), key = lambda x : x ["ORDINAL_POSITION" ])
733
- cur .close ()
734
-
735
- return result
736
- except pymysql .OperationalError as error :
737
- code , message = error .args
738
- if code in MYSQL_EXPECTED_ERROR_CODES :
739
- self .__connected_ctl = False
740
- continue
741
- else :
742
- raise error
743
-
744
733
def __get_dbms (self ):
745
734
if not self .__connected_ctl :
746
735
self .__connect_to_ctl ()
0 commit comments