Skip to content

Commit 5742e9d

Browse files
authored
Merge pull request #489 from 23-OSSCA-python-mysql-replication/feature/categorize-none-column
Distinguish ambiguous column value of `None`
2 parents 72b53a8 + f1e9df2 commit 5742e9d

File tree

3 files changed

+163
-9
lines changed

3 files changed

+163
-9
lines changed

Diff for: pymysqlreplication/constants/NONE_SOURCE.py

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
NULL = "null"
2+
OUT_OF_DATE_RANGE = "out of date range"
3+
OUT_OF_DATETIME_RANGE = "out of datetime range"
4+
OUT_OF_DATETIME2_RANGE = "out of datetime2 range"
5+
EMPTY_SET = "empty set"
6+
COLS_BITMAP = "cols bitmap"

Diff for: pymysqlreplication/row_event.py

+73-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .constants import FIELD_TYPE
1111
from .constants import BINLOG
1212
from .constants import CHARSET
13+
from .constants import NONE_SOURCE
1314
from .column import Column
1415
from .table import Table
1516
from .bitmap import BitCount, BitGet
@@ -23,6 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
2324
self.__ignored_tables = kwargs["ignored_tables"]
2425
self.__only_schemas = kwargs["only_schemas"]
2526
self.__ignored_schemas = kwargs["ignored_schemas"]
27+
self.__none_sources = {}
2628

2729
# Header
2830
self.table_id = self._read_table_id()
@@ -143,10 +145,15 @@ def _read_column_data(self, cols_bitmap):
143145
def __read_values_name(
144146
self, column, null_bitmap, null_bitmap_index, cols_bitmap, unsigned, i
145147
):
148+
name = self.table_map[self.table_id].columns[i].name
146149
if BitGet(cols_bitmap, i) == 0:
150+
# This block is only executed when binlog_row_image = MINIMAL.
151+
# When binlog_row_image = FULL, this block does not execute.
152+
self.__none_sources[name] = NONE_SOURCE.COLS_BITMAP
147153
return None
148154

149155
if self._is_null(null_bitmap, null_bitmap_index):
156+
self.__none_sources[name] = NONE_SOURCE.NULL
150157
return None
151158

152159
if column.type == FIELD_TYPE.TINY:
@@ -190,17 +197,26 @@ def __read_values_name(
190197
elif column.type == FIELD_TYPE.BLOB:
191198
return self.__read_string(column.length_size, column)
192199
elif column.type == FIELD_TYPE.DATETIME:
193-
return self.__read_datetime()
200+
ret = self.__read_datetime()
201+
if ret is None:
202+
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME_RANGE
203+
return ret
194204
elif column.type == FIELD_TYPE.TIME:
195205
return self.__read_time()
196206
elif column.type == FIELD_TYPE.DATE:
197-
return self.__read_date()
207+
ret = self.__read_date()
208+
if ret is None:
209+
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATE_RANGE
210+
return ret
198211
elif column.type == FIELD_TYPE.TIMESTAMP:
199212
return datetime.datetime.utcfromtimestamp(self.packet.read_uint32())
200213

201214
# For new date format:
202215
elif column.type == FIELD_TYPE.DATETIME2:
203-
return self.__read_datetime2(column)
216+
ret = self.__read_datetime2(column)
217+
if ret is None:
218+
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME2_RANGE
219+
return ret
204220
elif column.type == FIELD_TYPE.TIME2:
205221
return self.__read_time2(column)
206222
elif column.type == FIELD_TYPE.TIMESTAMP2:
@@ -224,11 +240,16 @@ def __read_values_name(
224240
elif column.type == FIELD_TYPE.SET:
225241
bit_mask = self.packet.read_uint_by_size(column.size)
226242
if column.set_values:
227-
return {
243+
ret = {
228244
val
229245
for idx, val in enumerate(column.set_values)
230246
if bit_mask & (1 << idx)
231-
} or None
247+
}
248+
if not ret:
249+
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
250+
return None
251+
return ret
252+
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
232253
return None
233254
elif column.type == FIELD_TYPE.BIT:
234255
return self.__read_bit(column)
@@ -475,6 +496,16 @@ def __read_binary_slice(self, binary, start, size, data_length):
475496
mask = (1 << size) - 1
476497
return binary & mask
477498

499+
def _get_none_sources(self, column_data):
500+
result = {}
501+
for column_name, value in column_data.items():
502+
if (column_name is None) or (value is not None):
503+
continue
504+
505+
source = self.__none_sources.get(column_name, "null")
506+
result[column_name] = source
507+
return result
508+
478509
def _dump(self):
479510
super()._dump()
480511
print(f"Table: {self.schema}.{self.table}")
@@ -517,6 +548,8 @@ def _fetch_one_row(self):
517548
row = {}
518549

519550
row["values"] = self._read_column_data(self.columns_present_bitmap)
551+
row["none_sources"] = self._get_none_sources(row["values"])
552+
520553
return row
521554

522555
def _dump(self):
@@ -525,7 +558,13 @@ def _dump(self):
525558
for row in self.rows:
526559
print("--")
527560
for key in row["values"]:
528-
print(f"* {key} : {row['values'][key]}")
561+
none_source = (
562+
row["none_sources"][key] if key in row["none_sources"] else ""
563+
)
564+
if none_source:
565+
print(f"* {key} : {row['values'][key]} ({none_source})")
566+
else:
567+
print(f"* {key} : {row['values'][key]}")
529568

530569

531570
class WriteRowsEvent(RowsEvent):
@@ -545,6 +584,8 @@ def _fetch_one_row(self):
545584
row = {}
546585

547586
row["values"] = self._read_column_data(self.columns_present_bitmap)
587+
row["none_sources"] = self._get_none_sources(row["values"])
588+
548589
return row
549590

550591
def _dump(self):
@@ -553,7 +594,13 @@ def _dump(self):
553594
for row in self.rows:
554595
print("--")
555596
for key in row["values"]:
556-
print(f"* {key} : {row['values'][key]}")
597+
none_source = (
598+
row["none_sources"][key] if key in row["none_sources"] else ""
599+
)
600+
if none_source:
601+
print(f"* {key} : row['values'][key] ({none_source})")
602+
else:
603+
print(f"* {key} : {row['values'][key]}")
557604

558605

559606
class UpdateRowsEvent(RowsEvent):
@@ -583,8 +630,9 @@ def _fetch_one_row(self):
583630
row = {}
584631

585632
row["before_values"] = self._read_column_data(self.columns_present_bitmap)
586-
633+
row["before_none_sources"] = self._get_none_sources(row["before_values"])
587634
row["after_values"] = self._read_column_data(self.columns_present_bitmap2)
635+
row["after_none_sources"] = self._get_none_sources(row["after_values"])
588636
return row
589637

590638
def _dump(self):
@@ -593,7 +641,23 @@ def _dump(self):
593641
for row in self.rows:
594642
print("--")
595643
for key in row["before_values"]:
596-
print(f"*{key}:{row['before_values'][key]}=>{row['after_values'][key]}")
644+
if key in row["before_none_sources"]:
645+
before_value_info = "%s(%s)" % (
646+
row["before_values"][key],
647+
row["before_none_sources"][key],
648+
)
649+
else:
650+
before_value_info = row["before_values"][key]
651+
652+
if key in row["after_none_sources"]:
653+
after_value_info = "%s(%s)" % (
654+
row["after_values"][key],
655+
row["after_none_sources"][key],
656+
)
657+
else:
658+
after_value_info = row["after_values"][key]
659+
660+
print(f"*{key}:{before_value_info}=>{after_value_info}")
597661

598662

599663
class OptionalMetaData:

Diff for: pymysqlreplication/tests/test_basic.py

+84
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"TestStatementConnectionSetting",
2222
"TestRowsQueryLogEvents",
2323
"TestOptionalMetaData",
24+
"TestColumnValueNoneSources",
2425
]
2526

2627

@@ -1754,6 +1755,89 @@ def tearDown(self):
17541755
super(TestOptionalMetaData, self).tearDown()
17551756

17561757

1758+
class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase):
1759+
def setUp(self):
1760+
super(TestColumnValueNoneSources, self).setUp()
1761+
self.stream.close()
1762+
self.stream = BinLogStreamReader(
1763+
self.database,
1764+
server_id=1024,
1765+
only_events=(TableMapEvent,),
1766+
)
1767+
if not self.isMySQL8014AndMore():
1768+
self.skipTest(
1769+
"Mysql version is under 8.0.14 - pass TestColumnValueNoneSources"
1770+
)
1771+
self.execute("SET GLOBAL binlog_row_metadata='FULL';")
1772+
1773+
def test_get_none(self):
1774+
self.stream.close()
1775+
self.stream = BinLogStreamReader(
1776+
self.database,
1777+
server_id=1024,
1778+
resume_stream=False,
1779+
only_events=[WriteRowsEvent],
1780+
)
1781+
query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);"
1782+
self.execute(query)
1783+
query = (
1784+
"INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);"
1785+
)
1786+
self.execute(query)
1787+
self.execute("COMMIT")
1788+
write_rows_event = self.stream.fetchone()
1789+
self.assertIsInstance(write_rows_event, WriteRowsEvent)
1790+
1791+
none_sources = write_rows_event.rows[0].get("none_sources")
1792+
if none_sources:
1793+
self.assertEqual(none_sources["col1"], "null")
1794+
1795+
def test_get_none_invalid(self):
1796+
self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'")
1797+
self.execute(
1798+
"CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))"
1799+
)
1800+
self.execute(
1801+
"INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)"
1802+
)
1803+
self.resetBinLog()
1804+
self.execute(
1805+
"UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL"
1806+
)
1807+
self.execute("COMMIT")
1808+
1809+
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
1810+
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
1811+
self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent)
1812+
self.assertIsInstance(self.stream.fetchone(), GtidEvent)
1813+
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
1814+
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
1815+
1816+
event = self.stream.fetchone()
1817+
if self.isMySQL56AndMore():
1818+
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2)
1819+
else:
1820+
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1)
1821+
self.assertIsInstance(event, UpdateRowsEvent)
1822+
1823+
before_none_sources = event.rows[0].get("before_none_sources")
1824+
after_none_sources = event.rows[0].get("after_none_sources")
1825+
1826+
if before_none_sources:
1827+
self.assertEqual(before_none_sources["col0"], "null")
1828+
self.assertEqual(before_none_sources["col1"], "null")
1829+
self.assertEqual(before_none_sources["col2"], "out of datetime2 range")
1830+
self.assertEqual(before_none_sources["col3"], "null")
1831+
self.assertEqual(before_none_sources["col4"], "null")
1832+
1833+
if after_none_sources:
1834+
self.assertEqual(after_none_sources["col0"], "null")
1835+
self.assertEqual(after_none_sources["col1"], "null")
1836+
self.assertEqual(after_none_sources["col2"], "null")
1837+
self.assertEqual(after_none_sources["col3"], "out of date range")
1838+
self.assertEqual(after_none_sources["col4"], "empty set")
1839+
1840+
17571841
if __name__ == "__main__":
17581842
import unittest
17591843

0 commit comments

Comments
 (0)