Skip to content

Commit 8a0280b

Browse files
committed
sync column
1 parent be89380 commit 8a0280b

File tree

3 files changed

+21
-50
lines changed

3 files changed

+21
-50
lines changed

pymysqlreplication/binlogstream.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ def __init__(
182182
ignore_decode_errors=False,
183183
verify_checksum=False,
184184
enable_logging=True,
185-
optional_meta_data=False,
186185
):
187186
"""
188187
Attributes:
@@ -248,7 +247,7 @@ def __init__(
248247
)
249248
self.__ignore_decode_errors = ignore_decode_errors
250249
self.__verify_checksum = verify_checksum
251-
self.__optional_meta_data = optional_meta_data
250+
self.__optional_meta_data = False
252251

253252
# We can't filter on packet level TABLE_MAP and rotate event because
254253
# we need them for handling other operations
@@ -570,6 +569,8 @@ def __check_optional_meta_data(self):
570569
By Applying this, provide properly mapped column information on UPDATE,DELETE,INSERT.
571570
""",
572571
)
572+
else:
573+
self.__optional_meta_data = True
573574

574575
def fetchone(self):
575576
while True:

pymysqlreplication/row_event.py

+15-46
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,17 @@ def __read_values_name(
207207
elif column.type == FIELD_TYPE.YEAR:
208208
return self.packet.read_uint8() + 1900
209209
elif column.type == FIELD_TYPE.ENUM:
210-
self.packet.read_uint_by_size(column.size)
211-
# unsupported
212-
return None
210+
return column.enum_values[self.packet.read_uint_by_size(column.size)]
213211
elif column.type == FIELD_TYPE.SET:
214-
self.packet.read_uint_by_size(column.size)
215-
# unsupported
216-
return None
212+
bit_mask = self.packet.read_uint_by_size(column.size)
213+
return (
214+
set(
215+
val
216+
for idx, val in enumerate(column.set_values)
217+
if bit_mask & 2**idx
218+
)
219+
or None
220+
)
217221
elif column.type == FIELD_TYPE.BIT:
218222
return self.__read_bit(column)
219223
elif column.type == FIELD_TYPE.GEOMETRY:
@@ -463,6 +467,7 @@ def _dump(self):
463467
"Column Name Information Flag: %s"
464468
% self.table_map[self.table_id].column_name_flag
465469
)
470+
print(self.table_map[self.table_id].data)
466471

467472
def _fetch_rows(self):
468473
self.__rows = []
@@ -802,9 +807,6 @@ def _get_optional_meta_data(self):
802807
return optional_metadata
803808

804809
def _sync_column_info(self):
805-
column_schemas = []
806-
if len(self.optional_metadata.column_name_list) == 0:
807-
return
808810
if not self.__optional_meta_data:
809811
# If optional_meta_data is False Do not sync Event Time Column Schemas
810812
return
@@ -815,43 +817,18 @@ def _sync_column_info(self):
815817
set_pos = 0
816818

817819
for column_idx in range(self.column_count):
818-
column_schema = {
819-
"COLUMN_NAME": None,
820-
"COLLATION_NAME": None,
821-
"CHARACTER_SET_NAME": None,
822-
"CHARACTER_OCTET_LENGTH": None,
823-
"DATA_TYPE": None, # not sufficient data
824-
"COLUMN_COMMENT": "", # we don't know this Info from optional metadata info
825-
"COLUMN_TYPE": None, # not sufficient data
826-
"COLUMN_KEY": "",
827-
"ORDINAL_POSITION": None,
828-
}
829820
column_type = self.columns[column_idx].type
830821
column_name = self.optional_metadata.column_name_list[column_idx]
831-
data_type = self._get_field_type_key(column_type)
832822
column_data: Column = self.columns[column_idx]
833823
column_data.name = column_name
834824

835-
column_schema["COLUMN_NAME"] = column_name
836-
column_schema["ORDINAL_POSITION"] = column_idx + 1
837-
838-
if data_type is not None:
839-
data_type = data_type.lower()
840-
column_schema["DATA_TYPE"] = data_type
841-
842-
if "max_length" in column_data.data:
843-
max_length = column_data.max_length
844-
column_schema["CHARACTER_OCTET_LENGTH"] = str(max_length)
845-
846825
if self._is_character_column(column_type, dbms=self.dbms):
847826
charset_id = self.optional_metadata.charset_collation_list[charset_pos]
848827
charset_pos += 1
849828

850829
encode_name, collation_name, charset_name = find_charset(
851830
charset_id, dbms=self.dbms
852831
)
853-
column_schema["COLLATION_NAME"] = collation_name
854-
column_schema["CHARACTER_SET_NAME"] = charset_name
855832

856833
self.columns[column_idx].collation_name = collation_name
857834
self.columns[column_idx].character_set_name = encode_name
@@ -865,8 +842,6 @@ def _sync_column_info(self):
865842
encode_name, collation_name, charset_name = find_charset(
866843
charset_id, dbms=self.dbms
867844
)
868-
column_schema["COLLATION_NAME"] = collation_name
869-
column_schema["CHARACTER_SET_NAME"] = charset_name
870845

871846
self.columns[column_idx].collation_name = collation_name
872847
self.columns[column_idx].character_set_name = encode_name
@@ -875,17 +850,11 @@ def _sync_column_info(self):
875850
enum_column_info = self.optional_metadata.set_enum_str_value_list[
876851
enum_pos
877852
]
878-
enum_values = ",".join(enum_column_info)
879-
enum_format = f"enum({enum_values})"
880-
column_schema["COLUMN_TYPE"] = enum_format
881853
self.columns[column_idx].enum_values = [""] + enum_column_info
882854
enum_pos += 1
883855

884856
if self._is_set_column(column_type):
885857
set_column_info = self.optional_metadata.set_str_value_list[set_pos]
886-
set_values = ",".join(set_column_info)
887-
set_format = f"set({set_values})"
888-
column_schema["COLUMN_TYPE"] = set_format
889858
self.columns[column_idx].set_values = set_column_info
890859
set_pos += 1
891860

@@ -896,12 +865,12 @@ def _sync_column_info(self):
896865
self.columns[column_idx].unsigned = True
897866

898867
if column_idx in self.optional_metadata.simple_primary_key_list:
899-
column_schema["COLUMN_KEY"] = "PRI"
900-
901-
column_schemas.append(column_schema)
868+
self.columns[column_idx].is_primary = True
869+
if self.optional_metadata.visibility_list[column_idx]:
870+
self.columns[column_idx].visibility = True
902871

903872
self.table_obj = Table(
904-
column_schemas, self.table_id, self.schema, self.table, self.columns
873+
self.table_id, self.schema, self.table, self.columns, column_name_flag=True
905874
)
906875

907876
def _convert_include_non_numeric_column(self, signedness_bool_list):

pymysqlreplication/table.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22

33

44
class Table(object):
5-
def __init__(self, table_id, schema, table, columns, primary_key=None):
6-
self.column_name_flag = False
5+
def __init__(
6+
self, table_id, schema, table, columns, primary_key=None, column_name_flag=False
7+
):
78
if primary_key is None:
89
primary_key = [c.data["name"] for c in columns if c.data["is_primary"]]
910
if len(primary_key) == 0:

0 commit comments

Comments
 (0)