Skip to content

Commit b78b922

Browse files
committed
cherry pick from column_schema delete
1 parent 36926e1 commit b78b922

File tree

6 files changed

+25
-139
lines changed

6 files changed

+25
-139
lines changed

pymysqlreplication/binlogstream.py

-6
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ def __init__(
176176
report_slave=None,
177177
slave_uuid=None,
178178
pymysql_wrapper=None,
179-
fail_on_table_metadata_unavailable=False,
180179
slave_heartbeat=None,
181180
is_mariadb=False,
182181
annotate_rows_event=False,
@@ -211,9 +210,6 @@ def __init__(
211210
report_slave: Report slave in SHOW SLAVE HOSTS.
212211
slave_uuid: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or
213212
SHOW REPLICAS(MySQL 8.0.22+) depends on your MySQL version.
214-
fail_on_table_metadata_unavailable: Should raise exception if we
215-
can't get table information on
216-
row_events
217213
slave_heartbeat: (seconds) Should master actively send heartbeat on
218214
connection. This also reduces traffic in GTID
219215
replication on replication resumption (in case
@@ -250,7 +246,6 @@ def __init__(
250246
self.__allowed_events = self._allowed_event_list(
251247
only_events, ignored_events, filter_non_implemented_events
252248
)
253-
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
254249
self.__ignore_decode_errors = ignore_decode_errors
255250
self.__verify_checksum = verify_checksum
256251
self.__optional_meta_data = optional_meta_data
@@ -619,7 +614,6 @@ def fetchone(self):
619614
self.__only_schemas,
620615
self.__ignored_schemas,
621616
self.__freeze_schema,
622-
self.__fail_on_table_metadata_unavailable,
623617
self.__ignore_decode_errors,
624618
self.__verify_checksum,
625619
self.__optional_meta_data,

pymysqlreplication/column.py

+8-35
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,17 @@ class Column(object):
99
"""Definition of a column"""
1010

1111
def __init__(self, *args, **kwargs):
12-
if len(args) == 3:
12+
if len(args) == 2:
1313
self.__parse_column_definition(*args)
1414
else:
1515
self.__dict__.update(kwargs)
1616

17-
def __parse_column_definition(self, column_type, column_schema, packet):
17+
def __parse_column_definition(self, column_type, packet):
1818
self.type = column_type
19-
self.name = column_schema["COLUMN_NAME"]
20-
self.collation_name = column_schema["COLLATION_NAME"]
21-
self.character_set_name = column_schema["CHARACTER_SET_NAME"]
22-
self.comment = column_schema["COLUMN_COMMENT"]
23-
self.unsigned = column_schema["COLUMN_TYPE"].find("unsigned") != -1
24-
self.zerofill = column_schema["COLUMN_TYPE"].find("zerofill") != -1
25-
self.type_is_bool = False
26-
self.is_primary = column_schema["COLUMN_KEY"] == "PRI"
27-
28-
# Check for fixed-length binary type. When that's the case then we need
29-
# to zero-pad the values to full length at read time.
30-
self.fixed_binary_length = None
31-
if column_schema["DATA_TYPE"] == "binary":
32-
self.fixed_binary_length = column_schema["CHARACTER_OCTET_LENGTH"]
19+
self.name = None
20+
self.unsigned = False
21+
self.is_primary = False
22+
self.character_set_name = None
3323

3424
if self.type == FIELD_TYPE.VARCHAR:
3525
self.max_length = struct.unpack("<H", packet.read(2))[0]
@@ -43,13 +33,8 @@ def __parse_column_definition(self, column_type, column_schema, packet):
4333
self.fsp = packet.read_uint8()
4434
elif self.type == FIELD_TYPE.TIME2:
4535
self.fsp = packet.read_uint8()
46-
elif (
47-
self.type == FIELD_TYPE.TINY
48-
and column_schema["COLUMN_TYPE"] == "tinyint(1)"
49-
):
50-
self.type_is_bool = True
5136
elif self.type == FIELD_TYPE.VAR_STRING or self.type == FIELD_TYPE.STRING:
52-
self.__read_string_metadata(packet, column_schema)
37+
self.__read_string_metadata(packet)
5338
elif self.type == FIELD_TYPE.BLOB:
5439
self.length_size = packet.read_uint8()
5540
elif self.type == FIELD_TYPE.GEOMETRY:
@@ -65,27 +50,15 @@ def __parse_column_definition(self, column_type, column_schema, packet):
6550
self.bits = (bytes * 8) + bits
6651
self.bytes = int((self.bits + 7) / 8)
6752

68-
def __read_string_metadata(self, packet, column_schema):
53+
def __read_string_metadata(self, packet):
6954
metadata = (packet.read_uint8() << 8) + packet.read_uint8()
7055
real_type = metadata >> 8
7156
if real_type == FIELD_TYPE.SET or real_type == FIELD_TYPE.ENUM:
7257
self.type = real_type
7358
self.size = metadata & 0x00FF
74-
self.__read_enum_metadata(column_schema)
7559
else:
7660
self.max_length = (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00FF)
7761

78-
def __read_enum_metadata(self, column_schema):
79-
enums = column_schema["COLUMN_TYPE"]
80-
if self.type == FIELD_TYPE.ENUM:
81-
self.enum_values = [""] + enums.replace("enum(", "").replace(
82-
")", ""
83-
).replace("'", "").split(",")
84-
else:
85-
self.set_values = (
86-
enums.replace("set(", "").replace(")", "").replace("'", "").split(",")
87-
)
88-
8962
def __eq__(self, other):
9063
return self.data == other.data
9164

pymysqlreplication/event.py

-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ def __init__(
2424
only_schemas=None,
2525
ignored_schemas=None,
2626
freeze_schema=False,
27-
fail_on_table_metadata_unavailable=False,
2827
ignore_decode_errors=False,
2928
verify_checksum=False,
3029
optional_meta_data=False,
@@ -36,7 +35,6 @@ def __init__(
3635
self.event_size = event_size
3736
self._ctl_connection = ctl_connection
3837
self.mysql_version = mysql_version
39-
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
4038
self._ignore_decode_errors = ignore_decode_errors
4139
self._verify_checksum = verify_checksum
4240
self._is_event_valid = None

pymysqlreplication/packet.py

-2
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ def __init__(
104104
only_schemas,
105105
ignored_schemas,
106106
freeze_schema,
107-
fail_on_table_metadata_unavailable,
108107
ignore_decode_errors,
109108
verify_checksum,
110109
optional_meta_data,
@@ -158,7 +157,6 @@ def __init__(
158157
only_schemas=only_schemas,
159158
ignored_schemas=ignored_schemas,
160159
freeze_schema=freeze_schema,
161-
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable,
162160
ignore_decode_errors=ignore_decode_errors,
163161
verify_checksum=verify_checksum,
164162
optional_meta_data=optional_meta_data,

pymysqlreplication/row_event.py

+15-90
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from enum import Enum
99

1010
from .event import BinLogEvent
11-
from .exceptions import TableMetadataUnavailableError
1211
from .constants import FIELD_TYPE
1312
from .constants import BINLOG
1413
from .constants import CHARSET
@@ -92,14 +91,6 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
9291
# Body
9392
self.number_of_columns = self.packet.read_length_coded_binary()
9493
self.columns = self.table_map[self.table_id].columns
95-
column_schemas = self.table_map[self.table_id].column_schemas
96-
97-
if (
98-
len(column_schemas) == 0
99-
): # could not read the table metadata, probably already dropped
100-
self.complete = False
101-
if self._fail_on_table_metadata_unavailable:
102-
raise TableMetadataUnavailableError(self.table)
10394

10495
@staticmethod
10596
def _is_null(null_bitmap, position):
@@ -124,19 +115,13 @@ def _read_column_data(self, cols_bitmap):
124115
column = self.columns[i]
125116
name = self.table_map[self.table_id].columns[i].name
126117
unsigned = self.table_map[self.table_id].columns[i].unsigned
127-
zerofill = self.table_map[self.table_id].columns[i].zerofill
128-
fixed_binary_length = (
129-
self.table_map[self.table_id].columns[i].fixed_binary_length
130-
)
131118

132119
values[name] = self.__read_values_name(
133120
column,
134121
null_bitmap,
135122
null_bitmap_index,
136123
cols_bitmap,
137124
unsigned,
138-
zerofill,
139-
fixed_binary_length,
140125
i,
141126
)
142127

@@ -146,15 +131,7 @@ def _read_column_data(self, cols_bitmap):
146131
return values
147132

148133
def __read_values_name(
149-
self,
150-
column,
151-
null_bitmap,
152-
null_bitmap_index,
153-
cols_bitmap,
154-
unsigned,
155-
zerofill,
156-
fixed_binary_length,
157-
i,
134+
self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, i
158135
):
159136
if BitGet(cols_bitmap, i) == 0:
160137
return None
@@ -165,32 +142,24 @@ def __read_values_name(
165142
if column.type == FIELD_TYPE.TINY:
166143
if unsigned:
167144
ret = struct.unpack("<B", self.packet.read(1))[0]
168-
if zerofill:
169-
ret = format(ret, "03d")
170145
return ret
171146
else:
172147
return struct.unpack("<b", self.packet.read(1))[0]
173148
elif column.type == FIELD_TYPE.SHORT:
174149
if unsigned:
175150
ret = struct.unpack("<H", self.packet.read(2))[0]
176-
if zerofill:
177-
ret = format(ret, "05d")
178151
return ret
179152
else:
180153
return struct.unpack("<h", self.packet.read(2))[0]
181154
elif column.type == FIELD_TYPE.LONG:
182155
if unsigned:
183156
ret = struct.unpack("<I", self.packet.read(4))[0]
184-
if zerofill:
185-
ret = format(ret, "010d")
186157
return ret
187158
else:
188159
return struct.unpack("<i", self.packet.read(4))[0]
189160
elif column.type == FIELD_TYPE.INT24:
190161
if unsigned:
191162
ret = self.packet.read_uint24()
192-
if zerofill:
193-
ret = format(ret, "08d")
194163
return ret
195164
else:
196165
return self.packet.read_int24()
@@ -205,12 +174,6 @@ def __read_values_name(
205174
else self.__read_string(1, column)
206175
)
207176

208-
if fixed_binary_length and len(ret) < fixed_binary_length:
209-
# Fixed-length binary fields are stored in the binlog
210-
# without trailing zeros and must be padded with zeros up
211-
# to the specified length at read time.
212-
nr_pad = fixed_binary_length - len(ret)
213-
ret += b"\x00" * nr_pad
214177
return ret
215178
elif column.type == FIELD_TYPE.NEWDECIMAL:
216179
return self.__read_new_decimal(column)
@@ -238,8 +201,6 @@ def __read_values_name(
238201
elif column.type == FIELD_TYPE.LONGLONG:
239202
if unsigned:
240203
ret = self.packet.read_uint64()
241-
if zerofill:
242-
ret = format(ret, "020d")
243204
return ret
244205
else:
245206
return self.packet.read_int64()
@@ -498,6 +459,10 @@ def _dump(self):
498459
print("Table: %s.%s" % (self.schema, self.table))
499460
print("Affected columns: %d" % self.number_of_columns)
500461
print("Changed rows: %d" % (len(self.rows)))
462+
print(
463+
"Column Name Information Flag: %s"
464+
% self.table_map[self.table_id].column_name_flag
465+
)
501466

502467
def _fetch_rows(self):
503468
self.__rows = []
@@ -537,6 +502,7 @@ def _fetch_one_row(self):
537502
def _dump(self):
538503
super()._dump()
539504
print("Values:")
505+
print(self.table.data)
540506
for row in self.rows:
541507
print("--")
542508
for key in row["values"]:
@@ -702,61 +668,20 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
702668
self.column_count = self.packet.read_length_coded_binary()
703669

704670
self.columns = []
705-
706-
if self.table_id in table_map:
707-
self.column_schemas = table_map[self.table_id].column_schemas
708-
elif self.__optional_meta_data:
709-
self.column_schemas = []
710-
else:
711-
self.column_schemas = self._ctl_connection._get_table_information(
712-
self.schema, self.table
713-
)
714-
715671
self.dbms = self._ctl_connection._get_dbms()
716-
717-
ordinal_pos_loc = 0
718-
if self.column_count != 0:
719-
# Read columns meta data
720-
column_types = bytearray(self.packet.read(self.column_count))
721-
self.packet.read_length_coded_binary()
722-
for i in range(0, len(column_types)):
723-
column_type = column_types[i]
724-
try:
725-
column_schema = self.column_schemas[ordinal_pos_loc]
726-
727-
# only acknowledge the column definition if the iteration matches with ordinal position of
728-
# the column. this helps in maintaining support for restricted columnar access
729-
if i != (column_schema["ORDINAL_POSITION"] - 1):
730-
# raise IndexError to follow the workflow of dropping columns which are not matching the
731-
# underlying table schema
732-
raise IndexError
733-
734-
ordinal_pos_loc += 1
735-
except IndexError:
736-
# this is a dirty hack to prevent row events containing columns which have been dropped prior
737-
# to pymysqlreplication start, but replayed from binlog from blowing up the service.
738-
# TODO: this does not address the issue if the column other than the last one is dropped
739-
column_schema = {
740-
"COLUMN_NAME": "__dropped_col_{i}__".format(i=i),
741-
"COLLATION_NAME": None,
742-
"CHARACTER_SET_NAME": None,
743-
"CHARACTER_OCTET_LENGTH": None,
744-
"DATA_TYPE": "BLOB",
745-
"COLUMN_COMMENT": None,
746-
"COLUMN_TYPE": "BLOB", # we don't know what it is, so let's not do anything with it.
747-
"COLUMN_KEY": "",
748-
}
749-
col = Column(column_type, column_schema, from_packet)
750-
self.columns.append(col)
751-
752-
self.table_obj = Table(
753-
self.column_schemas, self.table_id, self.schema, self.table, self.columns
754-
)
672+
# Read columns meta data
673+
column_types = bytearray(self.packet.read(self.column_count))
674+
self.packet.read_length_coded_binary()
675+
for i in range(0, len(column_types)):
676+
column_type = column_types[i]
677+
col = Column(column_type, from_packet)
678+
self.columns.append(col)
755679

756680
# ith column is nullable if (i - 1)th bit is set to True, not nullable otherwise
757681
## Refer to definition of and call to row.event._is_null() to interpret bitmap corresponding to columns
758682
self.null_bitmask = self.packet.read((self.column_count + 7) / 8)
759-
# optional meta Data
683+
self.table_obj = Table(self.table_id, self.schema, self.table, self.columns)
684+
table_map[self.table_id] = self.table_obj
760685
self.optional_metadata = self._get_optional_meta_data()
761686

762687
# We exclude 'CHAR' and 'INTERVAL' as they map to 'TINY' and 'ENUM' respectively

pymysqlreplication/table.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22

33

44
class Table(object):
5-
def __init__(
6-
self, table_id, schema, table, columns, primary_key=None, column_name_flag=False
7-
):
5+
def __init__(self, table_id, schema, table, columns, primary_key=None):
6+
self.column_name_flag = False
87
if primary_key is None:
98
primary_key = [c.data["name"] for c in columns if c.data["is_primary"]]
109
if len(primary_key) == 0:
@@ -16,7 +15,6 @@ def __init__(
1615

1716
self.__dict__.update(
1817
{
19-
"column_schemas": column_schemas,
2018
"table_id": table_id,
2119
"schema": schema,
2220
"table": table,

0 commit comments

Comments
 (0)