Skip to content

Distinguish ambiguous column value of None #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pymysqlreplication/constants/NONE_SOURCE.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
NULL = "null"
OUT_OF_DATE_RANGE = "out of date range"
OUT_OF_DATETIME_RANGE = "out of datetime range"
OUT_OF_DATETIME2_RANGE = "out of datetime2 range"
EMPTY_SET = "empty set"
COLS_BITMAP = "cols bitmap"
82 changes: 73 additions & 9 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .constants import FIELD_TYPE
from .constants import BINLOG
from .constants import CHARSET
from .constants import NONE_SOURCE
from .column import Column
from .table import Table
from .bitmap import BitCount, BitGet
Expand All @@ -23,6 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
self.__ignored_tables = kwargs["ignored_tables"]
self.__only_schemas = kwargs["only_schemas"]
self.__ignored_schemas = kwargs["ignored_schemas"]
self.__none_sources = {}

# Header
self.table_id = self._read_table_id()
Expand Down Expand Up @@ -143,10 +145,15 @@ def _read_column_data(self, cols_bitmap):
def __read_values_name(
self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, i
):
name = self.table_map[self.table_id].columns[i].name
if BitGet(cols_bitmap, i) == 0:
# This block is only executed when binlog_row_image = MINIMAL.
# When binlog_row_image = FULL, this block does not execute.
self.__none_sources[name] = NONE_SOURCE.COLS_BITMAP
return None

if self._is_null(null_bitmap, null_bitmap_index):
self.__none_sources[name] = NONE_SOURCE.NULL
return None

if column.type == FIELD_TYPE.TINY:
Expand Down Expand Up @@ -190,17 +197,26 @@ def __read_values_name(
elif column.type == FIELD_TYPE.BLOB:
return self.__read_string(column.length_size, column)
elif column.type == FIELD_TYPE.DATETIME:
return self.__read_datetime()
ret = self.__read_datetime()
if ret is None:
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME_RANGE
return ret
elif column.type == FIELD_TYPE.TIME:
return self.__read_time()
elif column.type == FIELD_TYPE.DATE:
return self.__read_date()
ret = self.__read_date()
if ret is None:
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATE_RANGE
return ret
elif column.type == FIELD_TYPE.TIMESTAMP:
return datetime.datetime.utcfromtimestamp(self.packet.read_uint32())

# For new date format:
elif column.type == FIELD_TYPE.DATETIME2:
return self.__read_datetime2(column)
ret = self.__read_datetime2(column)
if ret is None:
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME2_RANGE
return ret
elif column.type == FIELD_TYPE.TIME2:
return self.__read_time2(column)
elif column.type == FIELD_TYPE.TIMESTAMP2:
Expand All @@ -224,11 +240,16 @@ def __read_values_name(
elif column.type == FIELD_TYPE.SET:
bit_mask = self.packet.read_uint_by_size(column.size)
if column.set_values:
return {
ret = {
val
for idx, val in enumerate(column.set_values)
if bit_mask & (1 << idx)
} or None
}
if not ret:
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
return None
return ret
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
return None
elif column.type == FIELD_TYPE.BIT:
return self.__read_bit(column)
Expand Down Expand Up @@ -475,6 +496,16 @@ def __read_binary_slice(self, binary, start, size, data_length):
mask = (1 << size) - 1
return binary & mask

def _get_none_sources(self, column_data):
result = {}
for column_name, value in column_data.items():
if (column_name is None) or (value is not None):
continue

source = self.__none_sources.get(column_name, "null")
result[column_name] = source
return result

def _dump(self):
super()._dump()
print(f"Table: {self.schema}.{self.table}")
Expand Down Expand Up @@ -517,6 +548,8 @@ def _fetch_one_row(self):
row = {}

row["values"] = self._read_column_data(self.columns_present_bitmap)
row["none_sources"] = self._get_none_sources(row["values"])

return row

def _dump(self):
Expand All @@ -525,7 +558,13 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["values"]:
print(f"* {key} : {row['values'][key]}")
none_source = (
row["none_sources"][key] if key in row["none_sources"] else ""
)
if none_source:
print(f"* {key} : {row['values'][key]} ({none_source})")
else:
print(f"* {key} : {row['values'][key]}")


class WriteRowsEvent(RowsEvent):
Expand All @@ -545,6 +584,8 @@ def _fetch_one_row(self):
row = {}

row["values"] = self._read_column_data(self.columns_present_bitmap)
row["none_sources"] = self._get_none_sources(row["values"])

return row

def _dump(self):
Expand All @@ -553,7 +594,13 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["values"]:
print(f"* {key} : {row['values'][key]}")
none_source = (
row["none_sources"][key] if key in row["none_sources"] else ""
)
if none_source:
print(f"* {key} : row['values'][key] ({none_source})")
else:
print(f"* {key} : {row['values'][key]}")


class UpdateRowsEvent(RowsEvent):
Expand Down Expand Up @@ -583,8 +630,9 @@ def _fetch_one_row(self):
row = {}

row["before_values"] = self._read_column_data(self.columns_present_bitmap)

row["before_none_sources"] = self._get_none_sources(row["before_values"])
row["after_values"] = self._read_column_data(self.columns_present_bitmap2)
row["after_none_sources"] = self._get_none_sources(row["after_values"])
return row

def _dump(self):
Expand All @@ -593,7 +641,23 @@ def _dump(self):
for row in self.rows:
print("--")
for key in row["before_values"]:
print(f"*{key}:{row['before_values'][key]}=>{row['after_values'][key]}")
if key in row["before_none_sources"]:
before_value_info = "%s(%s)" % (
row["before_values"][key],
row["before_none_sources"][key],
)
else:
before_value_info = row["before_values"][key]

if key in row["after_none_sources"]:
after_value_info = "%s(%s)" % (
row["after_values"][key],
row["after_none_sources"][key],
)
else:
after_value_info = row["after_values"][key]

print(f"*{key}:{before_value_info}=>{after_value_info}")


class OptionalMetaData:
Expand Down
84 changes: 84 additions & 0 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"TestStatementConnectionSetting",
"TestRowsQueryLogEvents",
"TestOptionalMetaData",
"TestColumnValueNoneSources",
]


Expand Down Expand Up @@ -1754,6 +1755,89 @@ def tearDown(self):
super(TestOptionalMetaData, self).tearDown()


class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase):
def setUp(self):
super(TestColumnValueNoneSources, self).setUp()
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=(TableMapEvent,),
)
if not self.isMySQL8014AndMore():
self.skipTest(
"Mysql version is under 8.0.14 - pass TestColumnValueNoneSources"
)
self.execute("SET GLOBAL binlog_row_metadata='FULL';")

def test_get_none(self):
self.stream.close()
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
resume_stream=False,
only_events=[WriteRowsEvent],
)
query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);"
self.execute(query)
query = (
"INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);"
)
self.execute(query)
self.execute("COMMIT")
write_rows_event = self.stream.fetchone()
self.assertIsInstance(write_rows_event, WriteRowsEvent)

none_sources = write_rows_event.rows[0].get("none_sources")
if none_sources:
self.assertEqual(none_sources["col1"], "null")

def test_get_none_invalid(self):
self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'")
self.execute(
"CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))"
)
self.execute(
"INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)"
)
self.resetBinLog()
self.execute(
"UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL"
)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent)
self.assertIsInstance(self.stream.fetchone(), GtidEvent)
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)

event = self.stream.fetchone()
if self.isMySQL56AndMore():
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2)
else:
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1)
self.assertIsInstance(event, UpdateRowsEvent)

before_none_sources = event.rows[0].get("before_none_sources")
after_none_sources = event.rows[0].get("after_none_sources")

if before_none_sources:
self.assertEqual(before_none_sources["col0"], "null")
self.assertEqual(before_none_sources["col1"], "null")
self.assertEqual(before_none_sources["col2"], "out of datetime2 range")
self.assertEqual(before_none_sources["col3"], "null")
self.assertEqual(before_none_sources["col4"], "null")

if after_none_sources:
self.assertEqual(after_none_sources["col0"], "null")
self.assertEqual(after_none_sources["col1"], "null")
self.assertEqual(after_none_sources["col2"], "null")
self.assertEqual(after_none_sources["col3"], "out of date range")
self.assertEqual(after_none_sources["col4"], "empty set")


if __name__ == "__main__":
import unittest

Expand Down