diff --git a/pymysqlreplication/constants/NONE_SOURCE.py b/pymysqlreplication/constants/NONE_SOURCE.py new file mode 100644 index 00000000..68391003 --- /dev/null +++ b/pymysqlreplication/constants/NONE_SOURCE.py @@ -0,0 +1,6 @@ +NULL = "null" +OUT_OF_DATE_RANGE = "out of date range" +OUT_OF_DATETIME_RANGE = "out of datetime range" +OUT_OF_DATETIME2_RANGE = "out of datetime2 range" +EMPTY_SET = "empty set" +COLS_BITMAP = "cols bitmap" diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 25b30ec4..2c632b6b 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -10,6 +10,7 @@ from .constants import FIELD_TYPE from .constants import BINLOG from .constants import CHARSET +from .constants import NONE_SOURCE from .column import Column from .table import Table from .bitmap import BitCount, BitGet @@ -23,6 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] self.__ignored_schemas = kwargs["ignored_schemas"] + self.__none_sources = {} # Header self.table_id = self._read_table_id() @@ -119,7 +121,13 @@ def _read_column_data(self, cols_bitmap): column = self.columns[i] name = self.table_map[self.table_id].columns[i].name unsigned = self.table_map[self.table_id].columns[i].unsigned - + if not name: + # If you are using mysql 5.7 or mysql 8, but binlog_row_metadata = "MINIMAL", + # we do not know the column information. + # If you know column information, + # mysql 5.7 version Users Use Under 1.0 version + # mysql 8.0 version Users Set binlog_row_metadata = "FULL" + name = "UNKNOWN_COL" + str(i) values[name] = self.__read_values_name( column, null_bitmap, @@ -137,10 +145,15 @@ def _read_column_data(self, cols_bitmap): def __read_values_name( self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, i ): + name = self.table_map[self.table_id].columns[i].name if BitGet(cols_bitmap, i) == 0: + # This block is only executed when binlog_row_image = MINIMAL. + # When binlog_row_image = FULL, this block does not execute. + self.__none_sources[name] = NONE_SOURCE.COLS_BITMAP return None if self._is_null(null_bitmap, null_bitmap_index): + self.__none_sources[name] = NONE_SOURCE.NULL return None if column.type == FIELD_TYPE.TINY: @@ -184,17 +197,26 @@ def __read_values_name( elif column.type == FIELD_TYPE.BLOB: return self.__read_string(column.length_size, column) elif column.type == FIELD_TYPE.DATETIME: - return self.__read_datetime() + ret = self.__read_datetime() + if ret is None: + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME_RANGE + return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: - return self.__read_date() + ret = self.__read_date() + if ret is None: + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATE_RANGE + return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.utcfromtimestamp(self.packet.read_uint32()) # For new date format: elif column.type == FIELD_TYPE.DATETIME2: - return self.__read_datetime2(column) + ret = self.__read_datetime2(column) + if ret is None: + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME2_RANGE + return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) elif column.type == FIELD_TYPE.TIMESTAMP2: @@ -218,11 +240,16 @@ def __read_values_name( elif column.type == FIELD_TYPE.SET: bit_mask = self.packet.read_uint_by_size(column.size) if column.set_values: - return { + ret = { val for idx, val in enumerate(column.set_values) if bit_mask & (1 << idx) - } or None + } + if not ret: + self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET + return None + return ret + self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET return None elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) @@ -469,6 +496,16 @@ def __read_binary_slice(self, binary, start, size, data_length): mask = (1 << size) - 1 return binary & mask + def _get_none_sources(self, column_data): + result = {} + for column_name, value in column_data.items(): + if (column_name is None) or (value is not None): + continue + + source = self.__none_sources.get(column_name, "null") + result[column_name] = source + return result + def _dump(self): super()._dump() print(f"Table: {self.schema}.{self.table}") @@ -511,6 +548,8 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) + row["none_sources"] = self._get_none_sources(row["values"]) + return row def _dump(self): @@ -519,7 +558,13 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print(f"* {key} : {row['values'][key]}") + none_source = ( + row["none_sources"][key] if key in row["none_sources"] else "" + ) + if none_source: + print(f"* {key} : {row['values'][key]} ({none_source})") + else: + print(f"* {key} : {row['values'][key]}") class WriteRowsEvent(RowsEvent): @@ -539,6 +584,8 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) + row["none_sources"] = self._get_none_sources(row["values"]) + return row def _dump(self): @@ -547,7 +594,13 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print(f"* {key} : {row['values'][key]}") + none_source = ( + row["none_sources"][key] if key in row["none_sources"] else "" + ) + if none_source: + print(f"* {key} : row['values'][key] ({none_source})") + else: + print(f"* {key} : {row['values'][key]}") class UpdateRowsEvent(RowsEvent): @@ -577,8 +630,9 @@ def _fetch_one_row(self): row = {} row["before_values"] = self._read_column_data(self.columns_present_bitmap) - + row["before_none_sources"] = self._get_none_sources(row["before_values"]) row["after_values"] = self._read_column_data(self.columns_present_bitmap2) + row["after_none_sources"] = self._get_none_sources(row["after_values"]) return row def _dump(self): @@ -587,7 +641,23 @@ def _dump(self): for row in self.rows: print("--") for key in row["before_values"]: - print(f"*{key}:{row['before_values'][key]}=>{row['after_values'][key]}") + if key in row["before_none_sources"]: + before_value_info = "%s(%s)" % ( + row["before_values"][key], + row["before_none_sources"][key], + ) + else: + before_value_info = row["before_values"][key] + + if key in row["after_none_sources"]: + after_value_info = "%s(%s)" % ( + row["after_values"][key], + row["after_none_sources"][key], + ) + else: + after_value_info = row["after_values"][key] + + print(f"*{key}:{before_value_info}=>{after_value_info}") class OptionalMetaData: diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 80bfe657..c1861a17 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -21,6 +21,7 @@ "TestStatementConnectionSetting", "TestRowsQueryLogEvents", "TestOptionalMetaData", + "TestColumnValueNoneSources", ] @@ -1754,6 +1755,89 @@ def tearDown(self): super(TestOptionalMetaData, self).tearDown() +class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestColumnValueNoneSources, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(TableMapEvent,), + ) + if not self.isMySQL8014AndMore(): + self.skipTest( + "Mysql version is under 8.0.14 - pass TestColumnValueNoneSources" + ) + self.execute("SET GLOBAL binlog_row_metadata='FULL';") + + def test_get_none(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + resume_stream=False, + only_events=[WriteRowsEvent], + ) + query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" + self.execute(query) + query = ( + "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + ) + self.execute(query) + self.execute("COMMIT") + write_rows_event = self.stream.fetchone() + self.assertIsInstance(write_rows_event, WriteRowsEvent) + + none_sources = write_rows_event.rows[0].get("none_sources") + if none_sources: + self.assertEqual(none_sources["col1"], "null") + + def test_get_none_invalid(self): + self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") + self.execute( + "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" + ) + self.execute( + "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" + ) + self.resetBinLog() + self.execute( + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" + ) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent) + self.assertIsInstance(self.stream.fetchone(), GtidEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) + self.assertIsInstance(event, UpdateRowsEvent) + + before_none_sources = event.rows[0].get("before_none_sources") + after_none_sources = event.rows[0].get("after_none_sources") + + if before_none_sources: + self.assertEqual(before_none_sources["col0"], "null") + self.assertEqual(before_none_sources["col1"], "null") + self.assertEqual(before_none_sources["col2"], "out of datetime2 range") + self.assertEqual(before_none_sources["col3"], "null") + self.assertEqual(before_none_sources["col4"], "null") + + if after_none_sources: + self.assertEqual(after_none_sources["col0"], "null") + self.assertEqual(after_none_sources["col1"], "null") + self.assertEqual(after_none_sources["col2"], "null") + self.assertEqual(after_none_sources["col3"], "out of date range") + self.assertEqual(after_none_sources["col4"], "empty set") + + if __name__ == "__main__": import unittest