Skip to content

Commit 0c0805b

Browse files
sean-k1dongwook-chanmjs1995mikanizstarcat37
authored
Mysql 8.0.14 version support Table map Event optional metaData (#446)
* feat: optional meta data * feat: optional meta data implement * feat: numeric_count , count parsing add * refactor : python code lint * Fix optional meta data over-read Previously, `get_optional_meta_data` attempt to read trailing 4 bytes reserved for Common-Footer. Changes: - Added `bytes_to_read` to indicate remaining bytes (including 4 bytes) - Updated `get_optional_meta_data` to read everything but 4 bytes * feat: fix bug signedness order * refactor : name changed * feat: fix bug if column length > =8 * feat : dump log add * refactor : read tlv format * feat: add _read_primary_keys_with_prefix & updates * refactor: field_type -> MetadataFieldType * refactor: fix to store optional_metadata as a variable * Test: Add test_set_str_value, test_enum_str_value * fix: fix dump location of optional_metadata * Test: Add primary keys simple, prefix * Test: Add signedness * fix: fix enum_and_set_default_column_charset_list from dict to list * Test: add percona:8.0.14 for optional metadata * fix: percona version change to 8.0 * test: add isMySQL80AndMore in optional metadata test * test: add isMySQL8014AndMore * fix: fix binlog_row_metadata setting location * test: add column_charset * Test: add default_charset * test: add mysql8 for optional metadata * Test: add column_name and refactor other tests * fix: modify isMySQL8014AndMore * refactoring , parsing column * fix: test pass * fix: json name error * feat : sync column * feat: sync column data first version * feat: add _get_field_type_key * feat : add charset * feat: get column_schema info from column info * schema default value change * refactor : code convention * restore column schema from optional meta data * delete print * enum refactoring * refactor delete unused code * fix: reverse_field_type & remove None from dump * feat: add mariadb charset * fix : decode error ignore unknown decode type * only optional metadata info * Revert "only optional metadata info " This reverts commit 59402f9. * ignore decode error * force encode utf-8 type * error packet goes on * remove: delete mysql8-related settings * test: add geometry, enum_and_set, visibility testcases * fix : Adding the missing column. * fix: charset error * refactor: variable name and protect None -refactor variable name -bug fix None lower() * test: test add when table dropped * fix : restore column schema from optional meta data * fix: protect when database column schema length and column_count are same * refactor: add charset_list.csv for adding charsets * fix: modify .extract_charset_list.sh typo * test add : when alter drop column case * test add column comment dropped * fix: test case drop table example * fix: modify process of reading charset_list.csv * setting BinlogStream class parameter optional_meta_data * Revert "setting BinlogStream class parameter optional_meta_data " This reverts commit b882122. * Revert "Revert "setting BinlogStream class parameter optional_meta_data "" resolve conflict This reverts commit ee27a5f. * resolve conflict test case error * resolve conflict missing resolve conflict missing * black lint * remove testcase * remove set, enum * fix: testcase fix : testcase fix testcase fix: testcase * table init changed table init changed * remove ununsed variable * check possible optional metadata version And delete get Table information * cherry pick from column_schema delete * Column Values add * sync column * TestCase resolved test case resolved Test Case resolved * delete print debug and print optional meta data conditional * docker test 8.0 * visibility None case * for visibility test * Column read String Mysql 5version * Revert "docker test 8.0" This reverts commit b9a2b66. * fix : testcase 8 version fix testcase fix test case testcase fix * Add when column name list length 0 It measns Now BINLOG_ROW_METADATA = FULL but still remain BINLOG_ROW_METADATA Mode = MINIMAL in Binlog * mysql 8.0.23 env Test * Revert "mysql 8.0.23 env Test" This reverts commit 30b5fe2. * enum catch out of index enum when Mysql 5.7 case error enum and set BINLOG_IMAGE = MINIMAL erase print --------- Co-authored-by: dongwook-chan <[email protected]> Co-authored-by: mjs <[email protected]> Co-authored-by: mikaniz <[email protected]> Co-authored-by: starcat37 <[email protected]> Co-authored-by: heehehe <[email protected]>
1 parent 6aef850 commit 0c0805b

13 files changed

+1291
-577
lines changed

pymysqlreplication/binlogstream.py

+28-40
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ def __init__(
176176
report_slave=None,
177177
slave_uuid=None,
178178
pymysql_wrapper=None,
179-
fail_on_table_metadata_unavailable=False,
180179
slave_heartbeat=None,
181180
is_mariadb=False,
182181
annotate_rows_event=False,
@@ -210,9 +209,6 @@ def __init__(
210209
report_slave: Report slave in SHOW SLAVE HOSTS.
211210
slave_uuid: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or
212211
SHOW REPLICAS(MySQL 8.0.22+) depends on your MySQL version.
213-
fail_on_table_metadata_unavailable: Should raise exception if we
214-
can't get table information on
215-
row_events
216212
slave_heartbeat: (seconds) Should master actively send heartbeat on
217213
connection. This also reduces traffic in GTID
218214
replication on replication resumption (in case
@@ -249,9 +245,9 @@ def __init__(
249245
self.__allowed_events = self._allowed_event_list(
250246
only_events, ignored_events, filter_non_implemented_events
251247
)
252-
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
253248
self.__ignore_decode_errors = ignore_decode_errors
254249
self.__verify_checksum = verify_checksum
250+
self.__optional_meta_data = False
255251

256252
# We can't filter on packet level TABLE_MAP and rotate event because
257253
# we need them for handling other operations
@@ -295,7 +291,6 @@ def close(self):
295291
if self.__connected_ctl:
296292
# break reference cycle between stream reader and underlying
297293
# mysql connection object
298-
self._ctl_connection._get_table_information = None
299294
self._ctl_connection.close()
300295
self.__connected_ctl = False
301296

@@ -306,9 +301,9 @@ def __connect_to_ctl(self):
306301
self._ctl_connection_settings["cursorclass"] = DictCursor
307302
self._ctl_connection_settings["autocommit"] = True
308303
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
309-
self._ctl_connection._get_table_information = self.__get_table_information
310304
self._ctl_connection._get_dbms = self.__get_dbms
311305
self.__connected_ctl = True
306+
self.__check_optional_meta_data()
312307

313308
def __checksum_enabled(self):
314309
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
@@ -553,6 +548,30 @@ def __set_mariadb_settings(self):
553548

554549
return prelude
555550

551+
def __check_optional_meta_data(self):
552+
cur = self._ctl_connection.cursor()
553+
cur.execute("SHOW VARIABLES LIKE 'BINLOG_ROW_METADATA';")
554+
value = cur.fetchone()
555+
if value is None: # BinLog Variable Not exist It means Not Supported Version
556+
logging.log(
557+
logging.WARN,
558+
"""
559+
Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions,
560+
use python-mysql-replication version Before 1.0 version """,
561+
)
562+
else:
563+
value = value.get("Value", "")
564+
if value.upper() != "FULL":
565+
logging.log(
566+
logging.WARN,
567+
"""
568+
Setting The Variable Value BINLOG_ROW_METADATA = FULL
569+
By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
570+
""",
571+
)
572+
else:
573+
self.__optional_meta_data = True
574+
556575
def fetchone(self):
557576
while True:
558577
if self.end_log_pos and self.is_past_end_log_pos:
@@ -596,9 +615,9 @@ def fetchone(self):
596615
self.__only_schemas,
597616
self.__ignored_schemas,
598617
self.__freeze_schema,
599-
self.__fail_on_table_metadata_unavailable,
600618
self.__ignore_decode_errors,
601619
self.__verify_checksum,
620+
self.__optional_meta_data,
602621
)
603622

604623
if binlog_event.event_type == ROTATE_EVENT:
@@ -715,44 +734,13 @@ def _allowed_event_list(
715734
pass
716735
return frozenset(events)
717736

718-
def __get_table_information(self, schema, table):
719-
for i in range(1, 3):
720-
try:
721-
if not self.__connected_ctl:
722-
self.__connect_to_ctl()
723-
724-
cur = self._ctl_connection.cursor()
725-
cur.execute(
726-
"""
727-
SELECT
728-
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
729-
COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
730-
DATA_TYPE, CHARACTER_OCTET_LENGTH
731-
FROM
732-
information_schema.columns
733-
WHERE
734-
table_schema = %s AND table_name = %s
735-
""",
736-
(schema, table),
737-
)
738-
result = sorted(cur.fetchall(), key=lambda x: x["ORDINAL_POSITION"])
739-
cur.close()
740-
741-
return result
742-
except pymysql.OperationalError as error:
743-
code, message = error.args
744-
if code in MYSQL_EXPECTED_ERROR_CODES:
745-
self.__connected_ctl = False
746-
continue
747-
else:
748-
raise error
749-
750737
def __get_dbms(self):
751738
if not self.__connected_ctl:
752739
self.__connect_to_ctl()
753740

754741
cur = self._ctl_connection.cursor()
755742
cur.execute("SELECT VERSION();")
743+
756744
version_info = cur.fetchone().get("VERSION()", "")
757745

758746
if "MariaDB" in version_info:

pymysqlreplication/column.py

+13-35
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,22 @@ class Column(object):
99
"""Definition of a column"""
1010

1111
def __init__(self, *args, **kwargs):
12-
if len(args) == 3:
12+
if len(args) == 2:
1313
self.__parse_column_definition(*args)
1414
else:
1515
self.__dict__.update(kwargs)
1616

17-
def __parse_column_definition(self, column_type, column_schema, packet):
17+
def __parse_column_definition(self, column_type, packet):
1818
self.type = column_type
19-
self.name = column_schema["COLUMN_NAME"]
20-
self.collation_name = column_schema["COLLATION_NAME"]
21-
self.character_set_name = column_schema["CHARACTER_SET_NAME"]
22-
self.comment = column_schema["COLUMN_COMMENT"]
23-
self.unsigned = column_schema["COLUMN_TYPE"].find("unsigned") != -1
24-
self.zerofill = column_schema["COLUMN_TYPE"].find("zerofill") != -1
25-
self.type_is_bool = False
26-
self.is_primary = column_schema["COLUMN_KEY"] == "PRI"
27-
28-
# Check for fixed-length binary type. When that's the case then we need
29-
# to zero-pad the values to full length at read time.
30-
self.fixed_binary_length = None
31-
if column_schema["DATA_TYPE"] == "binary":
32-
self.fixed_binary_length = column_schema["CHARACTER_OCTET_LENGTH"]
19+
self.name = None
20+
self.unsigned = False
21+
self.is_primary = False
22+
self.charset_id = None
23+
self.character_set_name = None
24+
self.collation_name = None
25+
self.enum_values = None
26+
self.set_values = None
27+
self.visibility = False
3328

3429
if self.type == FIELD_TYPE.VARCHAR:
3530
self.max_length = struct.unpack("<H", packet.read(2))[0]
@@ -43,13 +38,8 @@ def __parse_column_definition(self, column_type, column_schema, packet):
4338
self.fsp = packet.read_uint8()
4439
elif self.type == FIELD_TYPE.TIME2:
4540
self.fsp = packet.read_uint8()
46-
elif (
47-
self.type == FIELD_TYPE.TINY
48-
and column_schema["COLUMN_TYPE"] == "tinyint(1)"
49-
):
50-
self.type_is_bool = True
5141
elif self.type == FIELD_TYPE.VAR_STRING or self.type == FIELD_TYPE.STRING:
52-
self.__read_string_metadata(packet, column_schema)
42+
self.__read_string_metadata(packet)
5343
elif self.type == FIELD_TYPE.BLOB:
5444
self.length_size = packet.read_uint8()
5545
elif self.type == FIELD_TYPE.GEOMETRY:
@@ -65,27 +55,15 @@ def __parse_column_definition(self, column_type, column_schema, packet):
6555
self.bits = (bytes * 8) + bits
6656
self.bytes = int((self.bits + 7) / 8)
6757

68-
def __read_string_metadata(self, packet, column_schema):
58+
def __read_string_metadata(self, packet):
6959
metadata = (packet.read_uint8() << 8) + packet.read_uint8()
7060
real_type = metadata >> 8
7161
if real_type == FIELD_TYPE.SET or real_type == FIELD_TYPE.ENUM:
7262
self.type = real_type
7363
self.size = metadata & 0x00FF
74-
self.__read_enum_metadata(column_schema)
7564
else:
7665
self.max_length = (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00FF)
7766

78-
def __read_enum_metadata(self, column_schema):
79-
enums = column_schema["COLUMN_TYPE"]
80-
if self.type == FIELD_TYPE.ENUM:
81-
self.enum_values = [""] + enums.replace("enum(", "").replace(
82-
")", ""
83-
).replace("'", "").split(",")
84-
else:
85-
self.set_values = (
86-
enums.replace("set(", "").replace(")", "").replace("'", "").split(",")
87-
)
88-
8967
def __eq__(self, other):
9068
return self.data == other.data
9169

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/usr/bin/env bash
2+
3+
usage(){
4+
echo "Usage: bash .extract_charset_list.sh (mysql|mariadb) >> charset_list.csv"
5+
}
6+
7+
dbms=$1
8+
if [ -z "$dbms" ]; then
9+
usage
10+
exit 1
11+
fi
12+
13+
SQL_QUERY="SELECT id, character_set_name, collation_name, is_default
14+
FROM information_schema.collations ORDER BY id;"
15+
16+
mysql -N -s -e "$SQL_QUERY" | python3 -c "import sys
17+
dbms = sys.argv[1]
18+
for line in sys.stdin:
19+
_id, name, collation, is_default = line.split(chr(9))
20+
if _id == 'NULL':
21+
continue
22+
is_default = True if is_default.strip() == 'Yes' else False
23+
print(f'{_id},{name},{collation},{is_default},{dbms}')
24+
" "$dbms"

pymysqlreplication/constants/BINLOG.py

+2
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,13 @@
4444
INTVAR_INSERT_ID_EVENT = 0x02
4545

4646
# MariaDB events
47+
4748
MARIADB_ANNOTATE_ROWS_EVENT = 0xA0
4849
MARIADB_BINLOG_CHECKPOINT_EVENT = 0xA1
4950
MARIADB_GTID_EVENT = 0xA2
5051
MARIADB_GTID_GTID_LIST_EVENT = 0xA3
5152
MARIADB_START_ENCRYPTION_EVENT = 0xA4
5253

54+
5355
# Common-Footer
5456
BINLOG_CHECKSUM_LEN = 4
+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from collections import defaultdict
2+
import os
3+
4+
5+
class Charset:
6+
def __init__(self, id, name, collation, is_default=False, dbms="mysql"):
7+
self.id, self.name, self.collation = id, name, collation
8+
self.is_default = is_default
9+
self.dbms = dbms
10+
11+
def __repr__(self):
12+
return (
13+
f"Charset(id={self.id}, name={self.name!r}, collation={self.collation!r})"
14+
)
15+
16+
@property
17+
def encoding(self):
18+
name = self.name
19+
if name in ("utf8mb4", "utf8mb3"):
20+
return "utf8"
21+
if name == "latin1":
22+
return "cp1252"
23+
if name == "koi8r":
24+
return "koi8_r"
25+
if name == "koi8u":
26+
return "koi8_u"
27+
return name
28+
29+
@property
30+
def is_binary(self):
31+
return self.id == 63
32+
33+
34+
class Charsets:
35+
def __init__(self):
36+
self._by_id = defaultdict(dict) # key: mysql / mariadb
37+
self._by_name = defaultdict(dict) # key: mysql / mariadb
38+
39+
def add(self, _charset):
40+
self._by_id[_charset.dbms][_charset.id] = _charset
41+
if _charset.is_default:
42+
self._by_name[_charset.dbms][_charset.name] = _charset
43+
44+
def by_id(self, id, dbms="mysql"):
45+
return self._by_id.get(dbms, {}).get(id)
46+
47+
def by_name(self, name, dbms="mysql"):
48+
if name == "utf8":
49+
name = "utf8mb4"
50+
return self._by_name.get(dbms, {}).get(name.lower())
51+
52+
53+
charsets = Charsets()
54+
charset_by_name = charsets.by_name
55+
charset_by_id = charsets.by_id
56+
57+
with open(
58+
os.path.join(os.path.dirname(os.path.abspath(__file__)), "charset_list.csv"), "r"
59+
) as f:
60+
f.readline() # pass header
61+
for line in f:
62+
lines = line.rstrip("\n").split(",")
63+
if len(lines) != 5:
64+
continue
65+
66+
_id, _name, _collation, _is_default, _dbms = lines
67+
charsets.add(Charset(_id, _name, _collation, _is_default, _dbms))

0 commit comments

Comments
 (0)