Skip to content

Distinguish ambiguous column value of None #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 72 additions & 12 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.__ignored_tables = kwargs["ignored_tables"]
self.__only_schemas = kwargs["only_schemas"]
self.__ignored_schemas = kwargs["ignored_schemas"]
self.__none_sources = {}

# Header
self.table_id = self._read_table_id()
Expand Down Expand Up @@ -130,10 +131,15 @@ def _read_column_data(self, cols_bitmap):
def __read_values_name(
self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, i
):
name = self.table_map[self.table_id].columns[i].name
if BitGet(cols_bitmap, i) == 0:
# This block is only executed when binlog_row_image = MINIMAL.
# When binlog_row_image = FULL, this block does not execute.
self.__none_sources[name] = "cols_bitmap"
return None

if self._is_null(null_bitmap, null_bitmap_index):
self.__none_sources[name] = "null"
return None

if column.type == FIELD_TYPE.TINY:
Expand Down Expand Up @@ -177,17 +183,26 @@ def __read_values_name(
elif column.type == FIELD_TYPE.BLOB:
return self.__read_string(column.length_size, column)
elif column.type == FIELD_TYPE.DATETIME:
return self.__read_datetime()
ret = self.__read_datetime()
if ret is None:
self.__none_sources[name] = "out of datetime range"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be a constant this will allow people to code behavior based on that. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julien-duponchelle Thanks for your review!

Do you mean that setting constant like
OUTDATETIME = "out of datetime range"

and set __none_source like below?
self.__none_source[name] = OUTDATETIME

If this is right, I'm going to set constant by enum and set values with that :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julien-duponchelle
I added pymysqlreplication/constants/NONE_SOURCE.py
and changed row_event.py to handle values according to the variables in NONE_SORUCE.py.

Please let me know if there's anything need to change :)

return ret
elif column.type == FIELD_TYPE.TIME:
return self.__read_time()
elif column.type == FIELD_TYPE.DATE:
return self.__read_date()
ret = self.__read_date()
if ret is None:
self.__none_sources[name] = "out of date range"
return ret
elif column.type == FIELD_TYPE.TIMESTAMP:
return datetime.datetime.utcfromtimestamp(self.packet.read_uint32())

# For new date format:
elif column.type == FIELD_TYPE.DATETIME2:
return self.__read_datetime2(column)
ret = self.__read_datetime2(column)
if ret is None:
self.__none_sources[name] = "out of datetime2 range"
return ret
elif column.type == FIELD_TYPE.TIME2:
return self.__read_time2(column)
elif column.type == FIELD_TYPE.TIMESTAMP2:
Expand All @@ -211,11 +226,16 @@ def __read_values_name(
elif column.type == FIELD_TYPE.SET:
bit_mask = self.packet.read_uint_by_size(column.size)
if column.set_values:
return {
ret = {
val
for idx, val in enumerate(column.set_values)
if bit_mask & (1 << idx)
} or None
}
if not ret:
self.__none_sources[column.name] = "empty set"
return None
return ret
self.__none_sources[column.name] = "empty set"
return None
elif column.type == FIELD_TYPE.BIT:
return self.__read_bit(column)
Expand Down Expand Up @@ -462,6 +482,16 @@ def __read_binary_slice(self, binary, start, size, data_length):
mask = (1 << size) - 1
return binary & mask

def _get_none_sources(self, column_data):
result = {}
for column_name, value in column_data.items():
if (column_name is None) or (value is not None):
continue

source = self.__none_sources.get(column_name, "null")
result[column_name] = source
return result

def _dump(self):
super()._dump()
print("Table: %s.%s" % (self.schema, self.table))
Expand Down Expand Up @@ -505,6 +535,8 @@ def _fetch_one_row(self):
row = {}

row["values"] = self._read_column_data(self.columns_present_bitmap)
row["none_sources"] = self._get_none_sources(row["values"])

return row

def _dump(self):
Expand All @@ -513,7 +545,13 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["values"]:
print("*", key, ":", row["values"][key])
none_source = (
row["none_sources"][key] if key in row["none_sources"] else ""
)
if none_source:
print("*", key, ":", row["values"][key], f"({none_source})")
else:
print("*", key, ":", row["values"][key])


class WriteRowsEvent(RowsEvent):
Expand All @@ -533,6 +571,8 @@ def _fetch_one_row(self):
row = {}

row["values"] = self._read_column_data(self.columns_present_bitmap)
row["none_sources"] = self._get_none_sources(row["values"])

return row

def _dump(self):
Expand All @@ -541,7 +581,13 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["values"]:
print("*", key, ":", row["values"][key])
none_source = (
row["none_sources"][key] if key in row["none_sources"] else ""
)
if none_source:
print("*", key, ":", row["values"][key], f"({none_source})")
else:
print("*", key, ":", row["values"][key])


class UpdateRowsEvent(RowsEvent):
Expand Down Expand Up @@ -571,8 +617,9 @@ def _fetch_one_row(self):
row = {}

row["before_values"] = self._read_column_data(self.columns_present_bitmap)

row["before_none_sources"] = self._get_none_sources(row["before_values"])
row["after_values"] = self._read_column_data(self.columns_present_bitmap2)
row["after_none_sources"] = self._get_none_sources(row["after_values"])
return row

def _dump(self):
Expand All @@ -581,10 +628,23 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["before_values"]:
print(
"*%s:%s=>%s"
% (key, row["before_values"][key], row["after_values"][key])
)
if key in row["before_none_sources"]:
before_value_info = "%s(%s)" % (
row["before_values"][key],
row["before_none_sources"][key],
)
else:
before_value_info = row["before_values"][key]

if key in row["after_none_sources"]:
after_value_info = "%s(%s)" % (
row["after_values"][key],
row["after_none_sources"][key],
)
else:
after_value_info = row["after_values"][key]

print(f"*{key}:{before_value_info}=>{after_value_info}")


class OptionalMetaData:
Expand Down
84 changes: 84 additions & 0 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"TestStatementConnectionSetting",
"TestRowsQueryLogEvents",
"TestOptionalMetaData",
"TestColumnValueNoneSources",
]


Expand Down Expand Up @@ -1736,6 +1737,89 @@ def tearDown(self):
super(TestOptionalMetaData, self).tearDown()


class TestColumnValueNoneSources(base.PyMySQLReplicationVersion8TestCase):
def setUp(self):
super(TestColumnValueNoneSources, self).setUp()
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=(TableMapEvent,),
)
if not self.isMySQL8014AndMore():
self.skipTest(
"Mysql version is under 8.0.14 - pass TestColumnValueNoneSources"
)
self.execute("SET GLOBAL binlog_row_metadata='FULL';")

def test_get_none(self):
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
resume_stream=False,
only_events=[WriteRowsEvent],
)
query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);"
self.execute(query)
query = (
"INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);"
)
self.execute(query)
self.execute("COMMIT")
write_rows_event = self.stream.fetchone()
self.assertIsInstance(write_rows_event, WriteRowsEvent)

none_sources = write_rows_event.rows[0].get("none_sources")
if none_sources:
self.assertEqual(none_sources["col1"], "null")

def test_get_none_invalid(self):
self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'")
self.execute(
"CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))"
)
self.execute(
"INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)"
)
self.resetBinLog()
self.execute(
"UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL"
)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent)
self.assertIsInstance(self.stream.fetchone(), GtidEvent)
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)

event = self.stream.fetchone()
if self.isMySQL56AndMore():
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2)
else:
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1)
self.assertIsInstance(event, UpdateRowsEvent)

before_none_sources = event.rows[0].get("before_none_sources")
after_none_sources = event.rows[0].get("after_none_sources")

if before_none_sources:
self.assertEqual(before_none_sources["col0"], "null")
self.assertEqual(before_none_sources["col1"], "null")
self.assertEqual(before_none_sources["col2"], "out of datetime2 range")
self.assertEqual(before_none_sources["col3"], "null")
self.assertEqual(before_none_sources["col4"], "null")

if after_none_sources:
self.assertEqual(after_none_sources["col0"], "null")
self.assertEqual(after_none_sources["col1"], "null")
self.assertEqual(after_none_sources["col2"], "null")
self.assertEqual(after_none_sources["col3"], "out of date range")
self.assertEqual(after_none_sources["col4"], "empty set")


if __name__ == "__main__":
import unittest

Expand Down