From 5c1846c799c9dcc99680611909ba0fd5a828ace5 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 9 Sep 2023 20:13:35 +0900 Subject: [PATCH 01/15] feat: add category_of_none --- pymysqlreplication/row_event.py | 54 ++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index fcd138d3..ada72031 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -459,6 +459,24 @@ def __read_binary_slice(self, binary, start, size, data_length): mask = ((1 << size) - 1) return binary & mask + def _categorize_none(self, column_data): + result = {} + for column_name, value in column_data.items(): + if value is not None: + continue + + category = "null" + + column_type = [col.type for col in self.columns if col.name == column_name][0] + if column_type in (FIELD_TYPE.DATETIME, FIELD_TYPE.DATE, FIELD_TYPE.DATETIME2): + category = "out of datetime range" + elif column_type == FIELD_TYPE.SET: + category = "empty set" + + result[column_name] = category + + return result + def _dump(self): super()._dump() print("Table: %s.%s" % (self.schema, self.table)) @@ -498,6 +516,8 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) + row["category_of_none"] = self._categorize_none(row["values"]) + return row def _dump(self): @@ -506,8 +526,8 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key]) - + print("*", key, ":", row["values"][key], + "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -526,6 +546,8 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) + row["category_of_none"] = self._categorize_none(row["values"]) + return row def _dump(self): @@ -534,7 +556,8 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key]) + print("*", key, ":", row["values"][key], + "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") class UpdateRowsEvent(RowsEvent): @@ -561,9 +584,15 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _fetch_one_row(self): row = {} - row["before_values"] = self._read_column_data(self.columns_present_bitmap) + changes = {} + updated_columns = [] + column_types = {} + row["before_values"] = self._read_column_data(self.columns_present_bitmap) row["after_values"] = self._read_column_data(self.columns_present_bitmap2) + row["before_category_of_none"] = self._categorize_none(row["before_values"]) + row["after_category_of_none"] = self._categorize_none(row["after_values"]) + return row def _dump(self): @@ -573,10 +602,21 @@ def _dump(self): for row in self.rows: print("--") for key in row["before_values"]: - print("*%s:%s=>%s" % (key, - row["before_values"][key], - row["after_values"][key])) + if key in row["before_category_of_none"]: + before_value_info = "%s(%s)" % (row["before_values"][key], + row["before_category_of_none"][key]) + else: + before_value_info = row["before_values"][key] + + if key in row["after_category_of_none"]: + after_value_info = "%s(%s)" % (row["after_values"][key], + row["after_category_of_none"][key]) + else: + after_value_info = row["after_values"][key] + print("*%s:%s=>%s" % (key, + before_value_info, + after_value_info)) class TableMapEvent(BinLogEvent): """This event describes the structure of a table. From 7d5da6774c321a6f890b42841d1cc8b7c54fc4a8 Mon Sep 17 00:00:00 2001 From: sean Date: Thu, 14 Sep 2023 16:54:09 +0900 Subject: [PATCH 02/15] categorize test when null value --- pymysqlreplication/tests/test_basic.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index f03b5663..e01d1839 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -574,6 +574,24 @@ def create_binlog_packet_wrapper(pkt): self.assertEqual(binlog_event.event._is_event_valid, True) self.assertNotEqual(wrong_event.event._is_event_valid, True) + def test_categorize_none(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + resume_stream=False, + only_events = [WriteRowsEvent] + ) + query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" + self.execute(query) + query = "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + self.execute(query) + self.execute("COMMIT") + write_rows_event = self.stream.fetchone() + self.assertIsInstance(write_rows_event, WriteRowsEvent) + self.assertEqual(write_rows_event.rows[0]['category_of_none']['col1'], 'null') + + class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def ignoredEvents(self): From 6f5939eab5c8094b168416a65806d8a224f1614b Mon Sep 17 00:00:00 2001 From: mjs Date: Thu, 14 Sep 2023 20:43:44 +0900 Subject: [PATCH 03/15] feat: add self.none_sources --- pymysqlreplication/row_event.py | 48 ++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index ada72031..fd6a1a78 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -24,6 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] self.__ignored_schemas = kwargs["ignored_schemas"] + self.none_sources = {} #Header self.table_id = self._read_table_id() @@ -123,11 +124,14 @@ def _read_column_data(self, cols_bitmap): def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, zerofill, fixed_binary_length, i): + name = self.table_map[self.table_id].columns[i].name if BitGet(cols_bitmap, i) == 0: + self.none_sources[name] = 'cols_bitmap' return None if self._is_null(null_bitmap, null_bitmap_index): + self.none_sources[name] = 'null' return None if column.type == FIELD_TYPE.TINY: @@ -182,18 +186,27 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap elif column.type == FIELD_TYPE.BLOB: return self.__read_string(column.length_size, column) elif column.type == FIELD_TYPE.DATETIME: - return self.__read_datetime() + ret = self.__read_datetime() + if ret is None: + self.none_sources[name] = 'out of datetime range' + return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: - return self.__read_date() + ret = self.__read_date() + if ret is None: + self.none_sources[name] = 'out of date range' + return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.fromtimestamp( self.packet.read_uint32()) # For new date format: elif column.type == FIELD_TYPE.DATETIME2: - return self.__read_datetime2(column) + ret = self.__read_datetime2(column) + if ret is None: + self.none_sources[name] = 'out of datetime2 range' + return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) elif column.type == FIELD_TYPE.TIMESTAMP2: @@ -217,10 +230,14 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap # We read set columns as a bitmap telling us which options # are enabled bit_mask = self.packet.read_uint_by_size(column.size) - return set( + set_value = set( val for idx, val in enumerate(column.set_values) if bit_mask & 2 ** idx - ) or None + ) + if not set_value: + self.none_sources[column.name] = "empty set" + return None + return set_value elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) @@ -465,16 +482,8 @@ def _categorize_none(self, column_data): if value is not None: continue - category = "null" - - column_type = [col.type for col in self.columns if col.name == column_name][0] - if column_type in (FIELD_TYPE.DATETIME, FIELD_TYPE.DATE, FIELD_TYPE.DATETIME2): - category = "out of datetime range" - elif column_type == FIELD_TYPE.SET: - category = "empty set" - + category = self.none_sources.get(column_name, "null") result[column_name] = category - return result def _dump(self): @@ -584,15 +593,10 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _fetch_one_row(self): row = {} - changes = {} - updated_columns = [] - column_types = {} - row["before_values"] = self._read_column_data(self.columns_present_bitmap) - row["after_values"] = self._read_column_data(self.columns_present_bitmap2) - row["before_category_of_none"] = self._categorize_none(row["before_values"]) - row["after_category_of_none"] = self._categorize_none(row["after_values"]) - + row['before_category_of_none'] = self._categorize_none(row["before_values"]) + row['after_values'] = self._read_column_data(self.columns_present_bitmap2) + row['after_category_of_none'] = self._categorize_none(row["after_values"]) return row def _dump(self): From a74e3ff957f2786920e155570818d0e5d4ea8aa9 Mon Sep 17 00:00:00 2001 From: mjs Date: Sat, 16 Sep 2023 13:08:15 +0900 Subject: [PATCH 04/15] feat: add categorize test when invalid mode --- pymysqlreplication/tests/test_basic.py | 29 ++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index e01d1839..e8818ae0 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -591,6 +591,35 @@ def test_categorize_none(self): self.assertIsInstance(write_rows_event, WriteRowsEvent) self.assertEqual(write_rows_event.rows[0]['category_of_none']['col1'], 'null') + def test_categorize_none_invalid(self): + self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") + self.execute("CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))") + self.execute("INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)") + self.resetBinLog() + self.execute("UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL") + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) + self.assertIsInstance(event, UpdateRowsEvent) + self.assertEqual(event.rows[0]["before_category_of_none"]["col0"], 'null') + self.assertEqual(event.rows[0]["before_category_of_none"]["col1"], 'null') + self.assertEqual(event.rows[0]["before_category_of_none"]["col2"], 'out of datetime2 range') + self.assertEqual(event.rows[0]["before_category_of_none"]["col3"], 'null') + self.assertEqual(event.rows[0]["before_category_of_none"]["col4"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col0"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col1"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col2"], 'null') + self.assertEqual(event.rows[0]["after_category_of_none"]["col3"], 'out of date range') + self.assertEqual(event.rows[0]["after_category_of_none"]["col4"], 'empty set') class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From b5076730d4077583f89bc2c4ab5985ce3f4723f2 Mon Sep 17 00:00:00 2001 From: starcat37 Date: Mon, 18 Sep 2023 09:19:49 +0900 Subject: [PATCH 05/15] docs: add binlog_row_image comment in read_values_name --- pymysqlreplication/row_event.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index fd6a1a78..780ed04a 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -127,6 +127,8 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap name = self.table_map[self.table_id].columns[i].name if BitGet(cols_bitmap, i) == 0: + # This block is only executed when binlog_row_image = MINIMAL. + # When binlog_row_image = FULL, this block does not execute. self.none_sources[name] = 'cols_bitmap' return None From c9733f9cc3dd685a5bc135786d8f98e186c7377f Mon Sep 17 00:00:00 2001 From: starcat37 Date: Wed, 20 Sep 2023 18:39:52 +0900 Subject: [PATCH 06/15] refactor: change the name of variables and functions --- pymysqlreplication/row_event.py | 28 +++++++++++++------------- pymysqlreplication/tests/test_basic.py | 28 +++++++++++++------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 780ed04a..42b9eb97 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -478,14 +478,14 @@ def __read_binary_slice(self, binary, start, size, data_length): mask = ((1 << size) - 1) return binary & mask - def _categorize_none(self, column_data): + def _get_none_sources(self, column_data): result = {} for column_name, value in column_data.items(): if value is not None: continue - category = self.none_sources.get(column_name, "null") - result[column_name] = category + source = self.none_sources.get(column_name, "null") + result[column_name] = source return result def _dump(self): @@ -527,7 +527,7 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) - row["category_of_none"] = self._categorize_none(row["values"]) + row["none_sources"] = self._get_none_sources(row["values"]) return row @@ -538,7 +538,7 @@ def _dump(self): print("--") for key in row["values"]: print("*", key, ":", row["values"][key], - "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") + "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -557,7 +557,7 @@ def _fetch_one_row(self): row = {} row["values"] = self._read_column_data(self.columns_present_bitmap) - row["category_of_none"] = self._categorize_none(row["values"]) + row["none_sources"] = self._get_none_sources(row["values"]) return row @@ -568,7 +568,7 @@ def _dump(self): print("--") for key in row["values"]: print("*", key, ":", row["values"][key], - "(%s)" % row["category_of_none"][key] if key in row["category_of_none"] else "") + "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") class UpdateRowsEvent(RowsEvent): @@ -596,9 +596,9 @@ def _fetch_one_row(self): row = {} row["before_values"] = self._read_column_data(self.columns_present_bitmap) - row['before_category_of_none'] = self._categorize_none(row["before_values"]) - row['after_values'] = self._read_column_data(self.columns_present_bitmap2) - row['after_category_of_none'] = self._categorize_none(row["after_values"]) + row["before_none_source"] = self._get_none_sources(row["before_values"]) + row["after_values"] = self._read_column_data(self.columns_present_bitmap2) + row["after_none_source"] = self._get_none_sources(row["after_values"]) return row def _dump(self): @@ -608,15 +608,15 @@ def _dump(self): for row in self.rows: print("--") for key in row["before_values"]: - if key in row["before_category_of_none"]: + if key in row["before_none_source"]: before_value_info = "%s(%s)" % (row["before_values"][key], - row["before_category_of_none"][key]) + row["before_none_source"][key]) else: before_value_info = row["before_values"][key] - if key in row["after_category_of_none"]: + if key in row["after_none_source"]: after_value_info = "%s(%s)" % (row["after_values"][key], - row["after_category_of_none"][key]) + row["after_none_source"][key]) else: after_value_info = row["after_values"][key] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index e8818ae0..ce89aea4 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -574,13 +574,13 @@ def create_binlog_packet_wrapper(pkt): self.assertEqual(binlog_event.event._is_event_valid, True) self.assertNotEqual(wrong_event.event._is_event_valid, True) - def test_categorize_none(self): + def test_get_none(self): self.stream.close() self.stream = BinLogStreamReader( self.database, server_id=1024, resume_stream=False, - only_events = [WriteRowsEvent] + only_events=[WriteRowsEvent] ) query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" self.execute(query) @@ -589,9 +589,9 @@ def test_categorize_none(self): self.execute("COMMIT") write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - self.assertEqual(write_rows_event.rows[0]['category_of_none']['col1'], 'null') + self.assertEqual(write_rows_event.rows[0]['none_sources']['col1'], 'null') - def test_categorize_none_invalid(self): + def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") self.execute("CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))") self.execute("INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)") @@ -610,16 +610,16 @@ def test_categorize_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - self.assertEqual(event.rows[0]["before_category_of_none"]["col0"], 'null') - self.assertEqual(event.rows[0]["before_category_of_none"]["col1"], 'null') - self.assertEqual(event.rows[0]["before_category_of_none"]["col2"], 'out of datetime2 range') - self.assertEqual(event.rows[0]["before_category_of_none"]["col3"], 'null') - self.assertEqual(event.rows[0]["before_category_of_none"]["col4"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col0"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col1"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col2"], 'null') - self.assertEqual(event.rows[0]["after_category_of_none"]["col3"], 'out of date range') - self.assertEqual(event.rows[0]["after_category_of_none"]["col4"], 'empty set') + self.assertEqual(event.rows[0]["before_none_source"]["col0"], 'null') + self.assertEqual(event.rows[0]["before_none_source"]["col1"], 'null') + self.assertEqual(event.rows[0]["before_none_source"]["col2"], 'out of datetime2 range') + self.assertEqual(event.rows[0]["before_none_source"]["col3"], 'null') + self.assertEqual(event.rows[0]["before_none_source"]["col4"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col0"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col1"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col2"], 'null') + self.assertEqual(event.rows[0]["after_none_source"]["col3"], 'out of date range') + self.assertEqual(event.rows[0]["after_none_source"]["col4"], 'empty set') class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From edb52420e8fba369acfa82d857018cfe51679c30 Mon Sep 17 00:00:00 2001 From: mikaniz Date: Thu, 21 Sep 2023 23:18:28 +0900 Subject: [PATCH 07/15] refactor: change the name of variables and the string formatting --- pymysqlreplication/row_event.py | 40 ++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 42b9eb97..e16086fd 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -24,7 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) self.__ignored_tables = kwargs["ignored_tables"] self.__only_schemas = kwargs["only_schemas"] self.__ignored_schemas = kwargs["ignored_schemas"] - self.none_sources = {} + self.__none_sources = {} #Header self.table_id = self._read_table_id() @@ -129,11 +129,11 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap if BitGet(cols_bitmap, i) == 0: # This block is only executed when binlog_row_image = MINIMAL. # When binlog_row_image = FULL, this block does not execute. - self.none_sources[name] = 'cols_bitmap' + self.__none_sources[name] = 'cols_bitmap' return None if self._is_null(null_bitmap, null_bitmap_index): - self.none_sources[name] = 'null' + self.__none_sources[name] = 'null' return None if column.type == FIELD_TYPE.TINY: @@ -190,14 +190,14 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap elif column.type == FIELD_TYPE.DATETIME: ret = self.__read_datetime() if ret is None: - self.none_sources[name] = 'out of datetime range' + self.__none_sources[name] = 'out of datetime range' return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: ret = self.__read_date() if ret is None: - self.none_sources[name] = 'out of date range' + self.__none_sources[name] = 'out of date range' return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.fromtimestamp( @@ -207,7 +207,7 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap elif column.type == FIELD_TYPE.DATETIME2: ret = self.__read_datetime2(column) if ret is None: - self.none_sources[name] = 'out of datetime2 range' + self.__none_sources[name] = 'out of datetime2 range' return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) @@ -232,14 +232,14 @@ def __read_values_name(self, column, null_bitmap, null_bitmap_index, cols_bitmap # We read set columns as a bitmap telling us which options # are enabled bit_mask = self.packet.read_uint_by_size(column.size) - set_value = set( + ret = set( val for idx, val in enumerate(column.set_values) if bit_mask & 2 ** idx ) - if not set_value: - self.none_sources[column.name] = "empty set" + if not ret: + self.__none_sources[column.name] = "empty set" return None - return set_value + return ret elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) @@ -484,7 +484,7 @@ def _get_none_sources(self, column_data): if value is not None: continue - source = self.none_sources.get(column_name, "null") + source = self.__none_sources.get(column_name, "null") result[column_name] = source return result @@ -537,8 +537,11 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key], - "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") + none_source = row["none_sources"][key] if key in row["none_sources"] else "" + if none_source: + print("*", key, ":", row["values"][key], f"({none_source})") + else: + print("*", key, ":", row["values"][key]) class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -567,8 +570,11 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - print("*", key, ":", row["values"][key], - "(%s)" % row["none_sources"][key] if key in row["none_sources"] else "") + none_source = row["none_sources"][key] if key in row["none_sources"] else "" + if none_source: + print("*", key, ":", row["values"][key], f"({none_source})") + else: + print("*", key, ":", row["values"][key]) class UpdateRowsEvent(RowsEvent): @@ -620,9 +626,7 @@ def _dump(self): else: after_value_info = row["after_values"][key] - print("*%s:%s=>%s" % (key, - before_value_info, - after_value_info)) + print(f"*{key}:{before_value_info}=>{after_value_info}") class TableMapEvent(BinLogEvent): """This event describes the structure of a table. From 5107c77a25e001c71e7299cc2e01bf3a99b82d10 Mon Sep 17 00:00:00 2001 From: heehehe Date: Fri, 22 Sep 2023 17:11:22 +0900 Subject: [PATCH 08/15] refactor: modify scripts by black --- pymysqlreplication/row_event.py | 31 +++++++++++------- pymysqlreplication/tests/test_basic.py | 44 ++++++++++++++++---------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 0198ec56..9fa78bab 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -137,11 +137,11 @@ def __read_values_name( if BitGet(cols_bitmap, i) == 0: # This block is only executed when binlog_row_image = MINIMAL. # When binlog_row_image = FULL, this block does not execute. - self.__none_sources[name] = 'cols_bitmap' + self.__none_sources[name] = "cols_bitmap" return None if self._is_null(null_bitmap, null_bitmap_index): - self.__none_sources[name] = 'null' + self.__none_sources[name] = "null" return None if column.type == FIELD_TYPE.TINY: @@ -187,14 +187,14 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME: ret = self.__read_datetime() if ret is None: - self.__none_sources[name] = 'out of datetime range' + self.__none_sources[name] = "out of datetime range" return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: ret = self.__read_date() if ret is None: - self.__none_sources[name] = 'out of date range' + self.__none_sources[name] = "out of date range" return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.utcfromtimestamp(self.packet.read_uint32()) @@ -203,7 +203,7 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME2: ret = self.__read_datetime2(column) if ret is None: - self.__none_sources[name] = 'out of datetime2 range' + self.__none_sources[name] = "out of datetime2 range" return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) @@ -547,12 +547,15 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - none_source = row["none_sources"][key] if key in row["none_sources"] else "" + none_source = ( + row["none_sources"][key] if key in row["none_sources"] else "" + ) if none_source: print("*", key, ":", row["values"][key], f"({none_source})") else: print("*", key, ":", row["values"][key]) + class WriteRowsEvent(RowsEvent): """This event is triggered when a row in database is added @@ -580,7 +583,9 @@ def _dump(self): for row in self.rows: print("--") for key in row["values"]: - none_source = row["none_sources"][key] if key in row["none_sources"] else "" + none_source = ( + row["none_sources"][key] if key in row["none_sources"] else "" + ) if none_source: print("*", key, ":", row["values"][key], f"({none_source})") else: @@ -626,14 +631,18 @@ def _dump(self): print("--") for key in row["before_values"]: if key in row["before_none_source"]: - before_value_info = "%s(%s)" % (row["before_values"][key], - row["before_none_source"][key]) + before_value_info = "%s(%s)" % ( + row["before_values"][key], + row["before_none_source"][key], + ) else: before_value_info = row["before_values"][key] if key in row["after_none_source"]: - after_value_info = "%s(%s)" % (row["after_values"][key], - row["after_none_source"][key]) + after_value_info = "%s(%s)" % ( + row["after_values"][key], + row["after_none_source"][key], + ) else: after_value_info = row["after_values"][key] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 31312ee3..2bc74834 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -609,23 +609,31 @@ def test_get_none(self): self.database, server_id=1024, resume_stream=False, - only_events=[WriteRowsEvent] + only_events=[WriteRowsEvent], ) query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" self.execute(query) - query = "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + query = ( + "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + ) self.execute(query) self.execute("COMMIT") write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - self.assertEqual(write_rows_event.rows[0]['none_sources']['col1'], 'null') + self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") - self.execute("CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))") - self.execute("INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)") + self.execute( + "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" + ) + self.execute( + "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" + ) self.resetBinLog() - self.execute("UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL") + self.execute( + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL" + ) self.execute("COMMIT") self.assertIsInstance(self.stream.fetchone(), RotateEvent) @@ -639,16 +647,20 @@ def test_get_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - self.assertEqual(event.rows[0]["before_none_source"]["col0"], 'null') - self.assertEqual(event.rows[0]["before_none_source"]["col1"], 'null') - self.assertEqual(event.rows[0]["before_none_source"]["col2"], 'out of datetime2 range') - self.assertEqual(event.rows[0]["before_none_source"]["col3"], 'null') - self.assertEqual(event.rows[0]["before_none_source"]["col4"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col0"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col1"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col2"], 'null') - self.assertEqual(event.rows[0]["after_none_source"]["col3"], 'out of date range') - self.assertEqual(event.rows[0]["after_none_source"]["col4"], 'empty set') + self.assertEqual(event.rows[0]["before_none_source"]["col0"], "null") + self.assertEqual(event.rows[0]["before_none_source"]["col1"], "null") + self.assertEqual( + event.rows[0]["before_none_source"]["col2"], "out of datetime2 range" + ) + self.assertEqual(event.rows[0]["before_none_source"]["col3"], "null") + self.assertEqual(event.rows[0]["before_none_source"]["col4"], "null") + self.assertEqual(event.rows[0]["after_none_source"]["col0"], "null") + self.assertEqual(event.rows[0]["after_none_source"]["col1"], "null") + self.assertEqual(event.rows[0]["after_none_source"]["col2"], "null") + self.assertEqual( + event.rows[0]["after_none_source"]["col3"], "out of date range" + ) + self.assertEqual(event.rows[0]["after_none_source"]["col4"], "empty set") class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From 26f036c9e0fc4f48ef3cfb6d146150acc004c895 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 23 Sep 2023 08:04:40 +0900 Subject: [PATCH 09/15] fix: check when none sources exist Since column schema has deleted, we cannot get any none sources when optional_meta_data is False. --- pymysqlreplication/row_event.py | 12 ++++----- pymysqlreplication/tests/test_basic.py | 34 ++++++++++++++------------ 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 9fa78bab..769f7f93 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -619,9 +619,9 @@ def _fetch_one_row(self): row = {} row["before_values"] = self._read_column_data(self.columns_present_bitmap) - row["before_none_source"] = self._get_none_sources(row["before_values"]) + row["before_none_sources"] = self._get_none_sources(row["before_values"]) row["after_values"] = self._read_column_data(self.columns_present_bitmap2) - row["after_none_source"] = self._get_none_sources(row["after_values"]) + row["after_none_sources"] = self._get_none_sources(row["after_values"]) return row def _dump(self): @@ -630,18 +630,18 @@ def _dump(self): for row in self.rows: print("--") for key in row["before_values"]: - if key in row["before_none_source"]: + if key in row["before_none_sources"]: before_value_info = "%s(%s)" % ( row["before_values"][key], - row["before_none_source"][key], + row["before_none_sources"][key], ) else: before_value_info = row["before_values"][key] - if key in row["after_none_source"]: + if key in row["after_none_sources"]: after_value_info = "%s(%s)" % ( row["after_values"][key], - row["after_none_source"][key], + row["after_none_sources"][key], ) else: after_value_info = row["after_values"][key] diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 2bc74834..1019d71c 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -620,7 +620,9 @@ def test_get_none(self): self.execute("COMMIT") write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") + + if write_rows_event.rows[0].get("none_sources"): + self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") @@ -632,7 +634,7 @@ def test_get_none_invalid(self): ) self.resetBinLog() self.execute( - "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00',col4 = 'd' WHERE col0 IS NULL" + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" ) self.execute("COMMIT") @@ -647,20 +649,20 @@ def test_get_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - self.assertEqual(event.rows[0]["before_none_source"]["col0"], "null") - self.assertEqual(event.rows[0]["before_none_source"]["col1"], "null") - self.assertEqual( - event.rows[0]["before_none_source"]["col2"], "out of datetime2 range" - ) - self.assertEqual(event.rows[0]["before_none_source"]["col3"], "null") - self.assertEqual(event.rows[0]["before_none_source"]["col4"], "null") - self.assertEqual(event.rows[0]["after_none_source"]["col0"], "null") - self.assertEqual(event.rows[0]["after_none_source"]["col1"], "null") - self.assertEqual(event.rows[0]["after_none_source"]["col2"], "null") - self.assertEqual( - event.rows[0]["after_none_source"]["col3"], "out of date range" - ) - self.assertEqual(event.rows[0]["after_none_source"]["col4"], "empty set") + + if event.rows[0].get("before_none_sources"): + self.assertEqual(event.rows[0]["before_none_sources"]["col0"], "null") + self.assertEqual(event.rows[0]["before_none_sources"]["col1"], "null") + self.assertEqual(event.rows[0]["before_none_sources"]["col2"], "out of datetime2 range") + self.assertEqual(event.rows[0]["before_none_sources"]["col3"], "null") + self.assertEqual(event.rows[0]["before_none_sources"]["col4"], "null") + + if event.rows[0].get("after_none_sources"): + self.assertEqual(event.rows[0]["after_none_sources"]["col0"], "null") + self.assertEqual(event.rows[0]["after_none_sources"]["col1"], "null") + self.assertEqual(event.rows[0]["after_none_sources"]["col2"], "null") + self.assertEqual(event.rows[0]["after_none_sources"]["col3"], "out of date range") + self.assertEqual(event.rows[0]["after_none_sources"]["col4"], "empty set") class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From ab1c5fbd3df4e7c6437be60a9afccffaab3a8a1c Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 23 Sep 2023 08:14:05 +0900 Subject: [PATCH 10/15] fix: do not add none_sources when column_name is None --- pymysqlreplication/row_event.py | 2 +- pymysqlreplication/tests/test_basic.py | 31 ++++++++++++++------------ 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 769f7f93..cb091b4d 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -487,7 +487,7 @@ def __read_binary_slice(self, binary, start, size, data_length): def _get_none_sources(self, column_data): result = {} for column_name, value in column_data.items(): - if value is not None: + if (column_name is None) or (value is not None): continue source = self.__none_sources.get(column_name, "null") diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 1019d71c..1786e2e8 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -649,20 +649,23 @@ def test_get_none_invalid(self): else: self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) self.assertIsInstance(event, UpdateRowsEvent) - - if event.rows[0].get("before_none_sources"): - self.assertEqual(event.rows[0]["before_none_sources"]["col0"], "null") - self.assertEqual(event.rows[0]["before_none_sources"]["col1"], "null") - self.assertEqual(event.rows[0]["before_none_sources"]["col2"], "out of datetime2 range") - self.assertEqual(event.rows[0]["before_none_sources"]["col3"], "null") - self.assertEqual(event.rows[0]["before_none_sources"]["col4"], "null") - - if event.rows[0].get("after_none_sources"): - self.assertEqual(event.rows[0]["after_none_sources"]["col0"], "null") - self.assertEqual(event.rows[0]["after_none_sources"]["col1"], "null") - self.assertEqual(event.rows[0]["after_none_sources"]["col2"], "null") - self.assertEqual(event.rows[0]["after_none_sources"]["col3"], "out of date range") - self.assertEqual(event.rows[0]["after_none_sources"]["col4"], "empty set") + + before_none_sources = event.rows[0].get("before_none_sources") + after_none_sources = event.rows[0].get("after_none_sources") + + if before_none_sources: + self.assertEqual(before_none_sources["col0"], "null") + self.assertEqual(before_none_sources["col1"], "null") + self.assertEqual(before_none_sources["col2"], "out of datetime2 range") + self.assertEqual(before_none_sources["col3"], "null") + self.assertEqual(before_none_sources["col4"], "null") + + if after_none_sources: + self.assertEqual(after_none_sources["col0"], "null") + self.assertEqual(after_none_sources["col1"], "null") + self.assertEqual(after_none_sources["col2"], "null") + self.assertEqual(after_none_sources["col3"], "out of date range") + self.assertEqual(after_none_sources["col4"], "empty set") class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): From 6a506ebaa3c5e4a3788dcbc8b34c983ca7336996 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 23 Sep 2023 08:42:33 +0900 Subject: [PATCH 11/15] refactor: get none_sources as variable --- pymysqlreplication/tests/test_basic.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 1786e2e8..ed50073a 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -621,8 +621,9 @@ def test_get_none(self): write_rows_event = self.stream.fetchone() self.assertIsInstance(write_rows_event, WriteRowsEvent) - if write_rows_event.rows[0].get("none_sources"): - self.assertEqual(write_rows_event.rows[0]["none_sources"]["col1"], "null") + none_sources = write_rows_event.rows[0].get("none_sources") + if none_sources: + self.assertEqual(none_sources["col1"], "null") def test_get_none_invalid(self): self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") From 154bf8882f5415e54a0555d1401ebc88c28b65df Mon Sep 17 00:00:00 2001 From: heehehe Date: Sun, 24 Sep 2023 00:38:01 +0900 Subject: [PATCH 12/15] feat: add TestColumnValueNoneSources testcase --- pymysqlreplication/tests/test_basic.py | 145 ++++++++++++++----------- 1 file changed, 80 insertions(+), 65 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index ed50073a..77b7c7e2 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -30,6 +30,7 @@ "TestStatementConnectionSetting", "TestRowsQueryLogEvents", "TestOptionalMetaData", + "TestColumnValueNoneSources", ] @@ -603,71 +604,6 @@ def create_binlog_packet_wrapper(pkt): self.assertEqual(binlog_event.event._is_event_valid, True) self.assertNotEqual(wrong_event.event._is_event_valid, True) - def test_get_none(self): - self.stream.close() - self.stream = BinLogStreamReader( - self.database, - server_id=1024, - resume_stream=False, - only_events=[WriteRowsEvent], - ) - query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" - self.execute(query) - query = ( - "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" - ) - self.execute(query) - self.execute("COMMIT") - write_rows_event = self.stream.fetchone() - self.assertIsInstance(write_rows_event, WriteRowsEvent) - - none_sources = write_rows_event.rows[0].get("none_sources") - if none_sources: - self.assertEqual(none_sources["col1"], "null") - - def test_get_none_invalid(self): - self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") - self.execute( - "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" - ) - self.execute( - "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" - ) - self.resetBinLog() - self.execute( - "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" - ) - self.execute("COMMIT") - - self.assertIsInstance(self.stream.fetchone(), RotateEvent) - self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) - self.assertIsInstance(self.stream.fetchone(), QueryEvent) - self.assertIsInstance(self.stream.fetchone(), TableMapEvent) - - event = self.stream.fetchone() - if self.isMySQL56AndMore(): - self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) - else: - self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) - self.assertIsInstance(event, UpdateRowsEvent) - - before_none_sources = event.rows[0].get("before_none_sources") - after_none_sources = event.rows[0].get("after_none_sources") - - if before_none_sources: - self.assertEqual(before_none_sources["col0"], "null") - self.assertEqual(before_none_sources["col1"], "null") - self.assertEqual(before_none_sources["col2"], "out of datetime2 range") - self.assertEqual(before_none_sources["col3"], "null") - self.assertEqual(before_none_sources["col4"], "null") - - if after_none_sources: - self.assertEqual(after_none_sources["col0"], "null") - self.assertEqual(after_none_sources["col1"], "null") - self.assertEqual(after_none_sources["col2"], "null") - self.assertEqual(after_none_sources["col3"], "out of date range") - self.assertEqual(after_none_sources["col4"], "empty set") - class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase): def setUp(self): @@ -1808,6 +1744,85 @@ def tearDown(self): super(TestOptionalMetaData, self).tearDown() +class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): + def setUp(self): + super(TestColumnValueNoneSources, self).setUp() + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + only_events=(TableMapEvent,), + ) + if not self.isMySQL8014AndMore(): + self.skipTest("Mysql version is under 8.0.14 - pass TestOptionalMetaData") + self.execute("SET GLOBAL binlog_row_metadata='FULL';") + + def test_get_none(self): + self.stream.close() + self.stream = BinLogStreamReader( + self.database, + server_id=1024, + resume_stream=False, + only_events=[WriteRowsEvent], + ) + query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);" + self.execute(query) + query = ( + "INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);" + ) + self.execute(query) + self.execute("COMMIT") + write_rows_event = self.stream.fetchone() + self.assertIsInstance(write_rows_event, WriteRowsEvent) + + none_sources = write_rows_event.rows[0].get("none_sources") + if none_sources: + self.assertEqual(none_sources["col1"], "null") + + def test_get_none_invalid(self): + self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'") + self.execute( + "CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))" + ) + self.execute( + "INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)" + ) + self.resetBinLog() + self.execute( + "UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL" + ) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) + self.assertIsInstance(event, UpdateRowsEvent) + + before_none_sources = event.rows[0].get("before_none_sources") + after_none_sources = event.rows[0].get("after_none_sources") + + if before_none_sources: + self.assertEqual(before_none_sources["col0"], "null") + self.assertEqual(before_none_sources["col1"], "null") + self.assertEqual(before_none_sources["col2"], "out of datetime2 range") + self.assertEqual(before_none_sources["col3"], "null") + self.assertEqual(before_none_sources["col4"], "null") + + if after_none_sources: + self.assertEqual(after_none_sources["col0"], "null") + self.assertEqual(after_none_sources["col1"], "null") + self.assertEqual(after_none_sources["col2"], "null") + self.assertEqual(after_none_sources["col3"], "out of date range") + self.assertEqual(after_none_sources["col4"], "empty set") + + if __name__ == "__main__": import unittest From 4c797a14cc95cd3b0456c7ceccbb965996ebc617 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sun, 24 Sep 2023 00:42:55 +0900 Subject: [PATCH 13/15] feat: inherit PyMySQLReplicationVersion8TestCase in TestColumnValueNoneSources --- pymysqlreplication/tests/test_basic.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 050776bf..cb5688cc 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1737,7 +1737,7 @@ def tearDown(self): super(TestOptionalMetaData, self).tearDown() -class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): +class TestColumnValueNoneSources(base.PyMySQLReplicationVersion8TestCase): def setUp(self): super(TestColumnValueNoneSources, self).setUp() self.stream.close() @@ -1747,7 +1747,9 @@ def setUp(self): only_events=(TableMapEvent,), ) if not self.isMySQL8014AndMore(): - self.skipTest("Mysql version is under 8.0.14 - pass TestOptionalMetaData") + self.skipTest( + "Mysql version is under 8.0.14 - pass TestColumnValueNoneSources" + ) self.execute("SET GLOBAL binlog_row_metadata='FULL';") def test_get_none(self): @@ -1788,6 +1790,8 @@ def test_get_none_invalid(self): self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent) + self.assertIsInstance(self.stream.fetchone(), GtidEvent) self.assertIsInstance(self.stream.fetchone(), QueryEvent) self.assertIsInstance(self.stream.fetchone(), TableMapEvent) From be89cc511bca9f87ad514d9250564159a497e671 Mon Sep 17 00:00:00 2001 From: heehehe Date: Fri, 29 Sep 2023 23:07:22 +0900 Subject: [PATCH 14/15] feat: add constants/NONE_SOURCE.py --- pymysqlreplication/constants/NONE_SOURCE.py | 6 ++++++ pymysqlreplication/row_event.py | 17 +++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) create mode 100644 pymysqlreplication/constants/NONE_SOURCE.py diff --git a/pymysqlreplication/constants/NONE_SOURCE.py b/pymysqlreplication/constants/NONE_SOURCE.py new file mode 100644 index 00000000..68391003 --- /dev/null +++ b/pymysqlreplication/constants/NONE_SOURCE.py @@ -0,0 +1,6 @@ +NULL = "null" +OUT_OF_DATE_RANGE = "out of date range" +OUT_OF_DATETIME_RANGE = "out of datetime range" +OUT_OF_DATETIME2_RANGE = "out of datetime2 range" +EMPTY_SET = "empty set" +COLS_BITMAP = "cols bitmap" diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index d3fd2ed0..83fd2d25 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -9,6 +9,7 @@ from .constants import FIELD_TYPE from .constants import BINLOG from .constants import CHARSET +from .constants import NONE_SOURCE from .column import Column from .table import Table from .bitmap import BitCount, BitGet @@ -135,11 +136,11 @@ def __read_values_name( if BitGet(cols_bitmap, i) == 0: # This block is only executed when binlog_row_image = MINIMAL. # When binlog_row_image = FULL, this block does not execute. - self.__none_sources[name] = "cols_bitmap" + self.__none_sources[name] = NONE_SOURCE.COLS_BITMAP return None if self._is_null(null_bitmap, null_bitmap_index): - self.__none_sources[name] = "null" + self.__none_sources[name] = NONE_SOURCE.NULL return None if column.type == FIELD_TYPE.TINY: @@ -185,14 +186,14 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME: ret = self.__read_datetime() if ret is None: - self.__none_sources[name] = "out of datetime range" + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME_RANGE return ret elif column.type == FIELD_TYPE.TIME: return self.__read_time() elif column.type == FIELD_TYPE.DATE: ret = self.__read_date() if ret is None: - self.__none_sources[name] = "out of date range" + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATE_RANGE return ret elif column.type == FIELD_TYPE.TIMESTAMP: return datetime.datetime.utcfromtimestamp(self.packet.read_uint32()) @@ -201,7 +202,7 @@ def __read_values_name( elif column.type == FIELD_TYPE.DATETIME2: ret = self.__read_datetime2(column) if ret is None: - self.__none_sources[name] = "out of datetime2 range" + self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME2_RANGE return ret elif column.type == FIELD_TYPE.TIME2: return self.__read_time2(column) @@ -232,10 +233,10 @@ def __read_values_name( if bit_mask & (1 << idx) } if not ret: - self.__none_sources[column.name] = "empty set" + self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET return None return ret - self.__none_sources[column.name] = "empty set" + self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET return None elif column.type == FIELD_TYPE.BIT: return self.__read_bit(column) @@ -244,7 +245,7 @@ def __read_values_name( elif column.type == FIELD_TYPE.JSON: return self.packet.read_binary_json(column.length_size) else: - raise NotImplementedError("Unknown MySQL column type: %d" % (column.type)) + raise NotImplementedError("Unknown MySQL column type: %d" % column.type) def __add_fsp_to_time(self, time, column): """Read and add the fractional part of time From f1e9df25c5a82b6b8104458486eae519d1ffe777 Mon Sep 17 00:00:00 2001 From: heehehe Date: Sat, 14 Oct 2023 21:34:58 +0900 Subject: [PATCH 15/15] fix: remove deprecated test class --- pymysqlreplication/tests/test_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 8b1bec37..c1861a17 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -1755,7 +1755,7 @@ def tearDown(self): super(TestOptionalMetaData, self).tearDown() -class TestColumnValueNoneSources(base.PyMySQLReplicationVersion8TestCase): +class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase): def setUp(self): super(TestColumnValueNoneSources, self).setUp() self.stream.close()