Skip to content

Mysql 8.0.14 version support Table map Event optional metaData , Sync Column #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
82087ab
feat: optional meta data
sean-k1 Aug 11, 2023
4cca2b1
feat: optional meta data implement
sean-k1 Aug 14, 2023
33b7458
feat: numeric_count , count parsing add
sean-k1 Aug 14, 2023
aa39b69
refactor : python code lint
sean-k1 Aug 14, 2023
32abf97
Fix optional meta data over-read
dongwook-chan Aug 14, 2023
7a6e38a
feat: fix bug signedness order
sean-k1 Aug 15, 2023
bf05806
refactor : name changed
sean-k1 Aug 15, 2023
1c5f8e4
feat: fix bug if column length > =8
sean-k1 Aug 15, 2023
cc47798
feat : dump log add
sean-k1 Aug 15, 2023
1677a0b
refactor : read tlv format
sean-k1 Aug 15, 2023
3f68928
feat: add _read_primary_keys_with_prefix & updates
mjs1995 Aug 17, 2023
f44c82c
refactor: field_type -> MetadataFieldType
mjs1995 Aug 17, 2023
a36c949
refactor: fix to store optional_metadata as a variable
mikaniz Aug 19, 2023
79fa897
Test: Add test_set_str_value, test_enum_str_value
starcat37 Aug 19, 2023
819550a
fix: fix dump location of optional_metadata
mikaniz Aug 19, 2023
183f926
Test: Add primary keys simple, prefix
mjs1995 Aug 19, 2023
6a6eee8
Test: Add signedness
heehehe Aug 19, 2023
bb498dd
fix: fix enum_and_set_default_column_charset_list from dict to list
mikaniz Aug 19, 2023
986b42a
Test: add percona:8.0.14 for optional metadata
heehehe Aug 19, 2023
fb8b6ee
fix: percona version change to 8.0
heehehe Aug 19, 2023
4b91f46
test: add isMySQL80AndMore in optional metadata test
heehehe Aug 19, 2023
ab6d4eb
test: add isMySQL8014AndMore
heehehe Aug 19, 2023
fb4a29d
fix: fix binlog_row_metadata setting location
mikaniz Aug 19, 2023
206c4ee
test: add column_charset
mikaniz Aug 19, 2023
4a46f8b
Test: add default_charset
heehehe Aug 19, 2023
bf723f9
test: add mysql8 for optional metadata
mjs1995 Aug 19, 2023
2fd6f59
Test: add column_name and refactor other tests
heehehe Aug 19, 2023
45110e3
Merge branch 'feature/optional-meta-data' of github.com:23-OSSCA-pyth…
heehehe Aug 19, 2023
d7c2c73
fix: modify isMySQL8014AndMore
heehehe Aug 21, 2023
68fa675
chore: fix conflicts
heehehe Aug 21, 2023
0abac0d
refactoring , parsing column
sean-k1 Aug 21, 2023
7f17682
fix: test pass
sean-k1 Aug 21, 2023
53e0e3e
fix: json name error
sean-k1 Aug 21, 2023
a51d806
feat : sync column
sean-k1 Aug 21, 2023
386f3b8
feat: sync column data first version
sean-k1 Aug 21, 2023
bce50ee
feat: add _get_field_type_key
mjs1995 Aug 22, 2023
ea7a805
feat : add charset
sean-k1 Aug 23, 2023
a274e76
feat: get column_schema info from column info
sean-k1 Aug 23, 2023
3b64531
schema default value change
sean-k1 Aug 23, 2023
4a54c00
refactor : code convention
sean-k1 Aug 24, 2023
e066e08
restore column schema from optional meta data
sean-k1 Aug 24, 2023
bdad54e
delete print
sean-k1 Aug 24, 2023
8732ce1
enum refactoring
sean-k1 Aug 24, 2023
73cad3f
refactor delete unused code
sean-k1 Aug 24, 2023
91615e0
fix: reverse_field_type & remove None from dump
mjs1995 Aug 24, 2023
b01de68
feat: add mariadb charset
heehehe Aug 24, 2023
62766db
fix: modify for conflicts
heehehe Aug 24, 2023
2f7ea60
fix : decode error ignore unknown decode type
sean-k1 Aug 25, 2023
59402f9
only optional metadata info
sean-k1 Aug 25, 2023
9c1ec39
Revert "only optional metadata info "
sean-k1 Aug 25, 2023
2a4f768
ignore decode error
sean-k1 Aug 25, 2023
c99603f
force encode utf-8 type
sean-k1 Aug 25, 2023
9fed78c
error packet goes on
sean-k1 Aug 25, 2023
ca130cc
remove: delete mysql8-related settings
heehehe Aug 25, 2023
1b38ce4
test: add geometry, enum_and_set, visibility testcases
heehehe Aug 25, 2023
acf2d08
fix : Adding the missing column.
sean-k1 Aug 25, 2023
4dcb905
fix: charset error
sean-k1 Aug 25, 2023
0e4496c
refactor: variable name and protect None
sean-k1 Aug 26, 2023
0f41520
test: test add when table dropped
sean-k1 Aug 26, 2023
63e7a9a
fix : restore column schema from optional meta data
sean-k1 Aug 26, 2023
96a60eb
fix: protect when database column schema length and column_count are …
sean-k1 Aug 26, 2023
e5d532d
refactor: add charset_list.csv for adding charsets
heehehe Aug 26, 2023
f612114
Merge branch 'feature/optional-meta-data' of github.com:23-OSSCA-pyth…
heehehe Aug 26, 2023
c3c64d8
fix: modify .extract_charset_list.sh typo
heehehe Aug 26, 2023
f150e56
test add : when alter drop column case
sean-k1 Aug 26, 2023
d99b8ff
test add column comment dropped
sean-k1 Aug 26, 2023
8328ce0
fix: test case drop table example
sean-k1 Aug 26, 2023
e2e40e0
fix: resolve conflicts with a19a5a5
heehehe Aug 27, 2023
3bc4e08
fix: modify process of reading charset_list.csv
heehehe Aug 28, 2023
b882122
setting BinlogStream class parameter optional_meta_data
sean-k1 Sep 4, 2023
725f035
Merge remote-tracking branch 'origin/feature/optional-meta-data' into…
sean-k1 Sep 4, 2023
ee27a5f
Revert "setting BinlogStream class parameter optional_meta_data "
sean-k1 Sep 4, 2023
a2c6817
Merge branch 'main' into feature/optional-meta-data
sean-k1 Sep 4, 2023
d0e73e0
Revert "Revert "setting BinlogStream class parameter optional_meta_da…
sean-k1 Sep 4, 2023
4aa1b27
resolve conflict test case error
sean-k1 Sep 4, 2023
9b332f3
Merge branch 'main' into feature/optional-meta-data
sean-k1 Sep 13, 2023
e2f5b26
resolve conflict missing
sean-k1 Sep 13, 2023
ba3bbf1
black lint
sean-k1 Sep 13, 2023
9ae0f82
remove testcase
sean-k1 Sep 15, 2023
4f94af2
remove set, enum
sean-k1 Sep 16, 2023
b0633b7
fix: testcase
sean-k1 Sep 16, 2023
0a0e79f
table init changed
sean-k1 Sep 16, 2023
3a75370
remove ununsed variable
sean-k1 Sep 16, 2023
36926e1
check possible optional metadata version And delete get Table informa…
sean-k1 Sep 18, 2023
b78b922
cherry pick from column_schema delete
sean-k1 Sep 18, 2023
be89380
Column Values add
sean-k1 Sep 18, 2023
8a0280b
sync column
sean-k1 Sep 18, 2023
8949087
TestCase resolved
sean-k1 Sep 18, 2023
985d1d6
delete print debug and print optional meta data conditional
sean-k1 Sep 18, 2023
b9a2b66
docker test 8.0
sean-k1 Sep 18, 2023
16122b2
visibility None case
sean-k1 Sep 18, 2023
b02b58e
for visibility test
sean-k1 Sep 18, 2023
91eacbc
Column read String Mysql 5version
sean-k1 Sep 18, 2023
84c0a32
Revert "docker test 8.0"
sean-k1 Sep 18, 2023
bc1ee8c
fix : testcase 8 version
sean-k1 Sep 18, 2023
3e839c7
Add when column name list length 0
sean-k1 Sep 18, 2023
30b5fe2
mysql 8.0.23 env Test
sean-k1 Sep 18, 2023
6a03460
Revert "mysql 8.0.23 env Test"
sean-k1 Sep 18, 2023
9c781b5
enum catch out of index
sean-k1 Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 28 additions & 40 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ def __init__(
report_slave=None,
slave_uuid=None,
pymysql_wrapper=None,
fail_on_table_metadata_unavailable=False,
slave_heartbeat=None,
is_mariadb=False,
annotate_rows_event=False,
Expand Down Expand Up @@ -210,9 +209,6 @@ def __init__(
report_slave: Report slave in SHOW SLAVE HOSTS.
slave_uuid: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or
SHOW REPLICAS(MySQL 8.0.22+) depends on your MySQL version.
fail_on_table_metadata_unavailable: Should raise exception if we
can't get table information on
row_events
slave_heartbeat: (seconds) Should master actively send heartbeat on
connection. This also reduces traffic in GTID
replication on replication resumption (in case
Expand Down Expand Up @@ -249,9 +245,9 @@ def __init__(
self.__allowed_events = self._allowed_event_list(
only_events, ignored_events, filter_non_implemented_events
)
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
self.__ignore_decode_errors = ignore_decode_errors
self.__verify_checksum = verify_checksum
self.__optional_meta_data = False

# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
Expand Down Expand Up @@ -295,7 +291,6 @@ def close(self):
if self.__connected_ctl:
# break reference cycle between stream reader and underlying
# mysql connection object
self._ctl_connection._get_table_information = None
self._ctl_connection.close()
self.__connected_ctl = False

Expand All @@ -306,9 +301,9 @@ def __connect_to_ctl(self):
self._ctl_connection_settings["cursorclass"] = DictCursor
self._ctl_connection_settings["autocommit"] = True
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
self._ctl_connection._get_table_information = self.__get_table_information
self._ctl_connection._get_dbms = self.__get_dbms
self.__connected_ctl = True
self.__check_optional_meta_data()

def __checksum_enabled(self):
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
Expand Down Expand Up @@ -553,6 +548,30 @@ def __set_mariadb_settings(self):

return prelude

def __check_optional_meta_data(self):
cur = self._ctl_connection.cursor()
cur.execute("SHOW VARIABLES LIKE 'BINLOG_ROW_METADATA';")
value = cur.fetchone()
if value is None: # BinLog Variable Not exist It means Not Supported Version
logging.log(
logging.WARN,
"""
Before using MARIADB 10.5.0 and MYSQL 8.0.14 versions,
use python-mysql-replication version Before 1.0 version """,
)
else:
value = value.get("Value", "")
if value.upper() != "FULL":
logging.log(
logging.WARN,
"""
Setting The Variable Value BINLOG_ROW_METADATA = FULL
By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
""",
)
else:
self.__optional_meta_data = True

def fetchone(self):
while True:
if self.end_log_pos and self.is_past_end_log_pos:
Expand Down Expand Up @@ -596,9 +615,9 @@ def fetchone(self):
self.__only_schemas,
self.__ignored_schemas,
self.__freeze_schema,
self.__fail_on_table_metadata_unavailable,
self.__ignore_decode_errors,
self.__verify_checksum,
self.__optional_meta_data,
)

if binlog_event.event_type == ROTATE_EVENT:
Expand Down Expand Up @@ -715,44 +734,13 @@ def _allowed_event_list(
pass
return frozenset(events)

def __get_table_information(self, schema, table):
for i in range(1, 3):
try:
if not self.__connected_ctl:
self.__connect_to_ctl()

cur = self._ctl_connection.cursor()
cur.execute(
"""
SELECT
COLUMN_NAME, COLLATION_NAME, CHARACTER_SET_NAME,
COLUMN_COMMENT, COLUMN_TYPE, COLUMN_KEY, ORDINAL_POSITION,
DATA_TYPE, CHARACTER_OCTET_LENGTH
FROM
information_schema.columns
WHERE
table_schema = %s AND table_name = %s
""",
(schema, table),
)
result = sorted(cur.fetchall(), key=lambda x: x["ORDINAL_POSITION"])
cur.close()

return result
except pymysql.OperationalError as error:
code, message = error.args
if code in MYSQL_EXPECTED_ERROR_CODES:
self.__connected_ctl = False
continue
else:
raise error

def __get_dbms(self):
if not self.__connected_ctl:
self.__connect_to_ctl()

cur = self._ctl_connection.cursor()
cur.execute("SELECT VERSION();")

version_info = cur.fetchone().get("VERSION()", "")

if "MariaDB" in version_info:
Expand Down
48 changes: 13 additions & 35 deletions pymysqlreplication/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,22 @@ class Column(object):
"""Definition of a column"""

def __init__(self, *args, **kwargs):
if len(args) == 3:
if len(args) == 2:
self.__parse_column_definition(*args)
else:
self.__dict__.update(kwargs)

def __parse_column_definition(self, column_type, column_schema, packet):
def __parse_column_definition(self, column_type, packet):
self.type = column_type
self.name = column_schema["COLUMN_NAME"]
self.collation_name = column_schema["COLLATION_NAME"]
self.character_set_name = column_schema["CHARACTER_SET_NAME"]
self.comment = column_schema["COLUMN_COMMENT"]
self.unsigned = column_schema["COLUMN_TYPE"].find("unsigned") != -1
self.zerofill = column_schema["COLUMN_TYPE"].find("zerofill") != -1
self.type_is_bool = False
self.is_primary = column_schema["COLUMN_KEY"] == "PRI"

# Check for fixed-length binary type. When that's the case then we need
# to zero-pad the values to full length at read time.
self.fixed_binary_length = None
if column_schema["DATA_TYPE"] == "binary":
self.fixed_binary_length = column_schema["CHARACTER_OCTET_LENGTH"]
self.name = None
self.unsigned = False
self.is_primary = False
self.charset_id = None
self.character_set_name = None
self.collation_name = None
self.enum_values = None
self.set_values = None
self.visibility = False

if self.type == FIELD_TYPE.VARCHAR:
self.max_length = struct.unpack("<H", packet.read(2))[0]
Expand All @@ -43,13 +38,8 @@ def __parse_column_definition(self, column_type, column_schema, packet):
self.fsp = packet.read_uint8()
elif self.type == FIELD_TYPE.TIME2:
self.fsp = packet.read_uint8()
elif (
self.type == FIELD_TYPE.TINY
and column_schema["COLUMN_TYPE"] == "tinyint(1)"
):
self.type_is_bool = True
elif self.type == FIELD_TYPE.VAR_STRING or self.type == FIELD_TYPE.STRING:
self.__read_string_metadata(packet, column_schema)
self.__read_string_metadata(packet)
elif self.type == FIELD_TYPE.BLOB:
self.length_size = packet.read_uint8()
elif self.type == FIELD_TYPE.GEOMETRY:
Expand All @@ -65,27 +55,15 @@ def __parse_column_definition(self, column_type, column_schema, packet):
self.bits = (bytes * 8) + bits
self.bytes = int((self.bits + 7) / 8)

def __read_string_metadata(self, packet, column_schema):
def __read_string_metadata(self, packet):
metadata = (packet.read_uint8() << 8) + packet.read_uint8()
real_type = metadata >> 8
if real_type == FIELD_TYPE.SET or real_type == FIELD_TYPE.ENUM:
self.type = real_type
self.size = metadata & 0x00FF
self.__read_enum_metadata(column_schema)
else:
self.max_length = (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00FF)

def __read_enum_metadata(self, column_schema):
enums = column_schema["COLUMN_TYPE"]
if self.type == FIELD_TYPE.ENUM:
self.enum_values = [""] + enums.replace("enum(", "").replace(
")", ""
).replace("'", "").split(",")
else:
self.set_values = (
enums.replace("set(", "").replace(")", "").replace("'", "").split(",")
)

def __eq__(self, other):
return self.data == other.data

Expand Down
24 changes: 24 additions & 0 deletions pymysqlreplication/constants/.extract_charset_list.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

usage(){
echo "Usage: bash .extract_charset_list.sh (mysql|mariadb) >> charset_list.csv"
}

dbms=$1
if [ -z "$dbms" ]; then
usage
exit 1
fi

SQL_QUERY="SELECT id, character_set_name, collation_name, is_default
FROM information_schema.collations ORDER BY id;"

mysql -N -s -e "$SQL_QUERY" | python3 -c "import sys
dbms = sys.argv[1]
for line in sys.stdin:
_id, name, collation, is_default = line.split(chr(9))
if _id == 'NULL':
continue
is_default = True if is_default.strip() == 'Yes' else False
print(f'{_id},{name},{collation},{is_default},{dbms}')
" "$dbms"
2 changes: 2 additions & 0 deletions pymysqlreplication/constants/BINLOG.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@
INTVAR_INSERT_ID_EVENT = 0x02

# MariaDB events

MARIADB_ANNOTATE_ROWS_EVENT = 0xA0
MARIADB_BINLOG_CHECKPOINT_EVENT = 0xA1
MARIADB_GTID_EVENT = 0xA2
MARIADB_GTID_GTID_LIST_EVENT = 0xA3
MARIADB_START_ENCRYPTION_EVENT = 0xA4


# Common-Footer
BINLOG_CHECKSUM_LEN = 4
67 changes: 67 additions & 0 deletions pymysqlreplication/constants/CHARSET.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from collections import defaultdict
import os


class Charset:
def __init__(self, id, name, collation, is_default=False, dbms="mysql"):
self.id, self.name, self.collation = id, name, collation
self.is_default = is_default
self.dbms = dbms

def __repr__(self):
return (
f"Charset(id={self.id}, name={self.name!r}, collation={self.collation!r})"
)

@property
def encoding(self):
name = self.name
if name in ("utf8mb4", "utf8mb3"):
return "utf8"
if name == "latin1":
return "cp1252"
if name == "koi8r":
return "koi8_r"
if name == "koi8u":
return "koi8_u"
return name

@property
def is_binary(self):
return self.id == 63


class Charsets:
def __init__(self):
self._by_id = defaultdict(dict) # key: mysql / mariadb
self._by_name = defaultdict(dict) # key: mysql / mariadb

def add(self, _charset):
self._by_id[_charset.dbms][_charset.id] = _charset
if _charset.is_default:
self._by_name[_charset.dbms][_charset.name] = _charset

def by_id(self, id, dbms="mysql"):
return self._by_id.get(dbms, {}).get(id)

def by_name(self, name, dbms="mysql"):
if name == "utf8":
name = "utf8mb4"
return self._by_name.get(dbms, {}).get(name.lower())


charsets = Charsets()
charset_by_name = charsets.by_name
charset_by_id = charsets.by_id

with open(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "charset_list.csv"), "r"
) as f:
f.readline() # pass header
for line in f:
lines = line.rstrip("\n").split(",")
if len(lines) != 5:
continue

_id, _name, _collation, _is_default, _dbms = lines
charsets.add(Charset(_id, _name, _collation, _is_default, _dbms))
Loading