Skip to content

Commit b15ba97

Browse files
authored
Merge branch 'main' into feature/partial-update-row-event
2 parents b37ec37 + 5742e9d commit b15ba97

File tree

3 files changed

+163
-9
lines changed

3 files changed

+163
-9
lines changed

Diff for: pymysqlreplication/constants/NONE_SOURCE.py

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
NULL = "null"
2+
OUT_OF_DATE_RANGE = "out of date range"
3+
OUT_OF_DATETIME_RANGE = "out of datetime range"
4+
OUT_OF_DATETIME2_RANGE = "out of datetime2 range"
5+
EMPTY_SET = "empty set"
6+
COLS_BITMAP = "cols bitmap"

Diff for: pymysqlreplication/row_event.py

+73-9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from .constants import FIELD_TYPE
1111
from .constants import BINLOG
1212
from .constants import CHARSET
13+
from .constants import NONE_SOURCE
1314
from .column import Column
1415
from .table import Table
1516
from .bitmap import BitCount, BitGet
@@ -23,6 +24,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
2324
self.__ignored_tables = kwargs["ignored_tables"]
2425
self.__only_schemas = kwargs["only_schemas"]
2526
self.__ignored_schemas = kwargs["ignored_schemas"]
27+
self.__none_sources = {}
2628

2729
# Header
2830
self.table_id = self._read_table_id()
@@ -176,10 +178,15 @@ def __read_values_name(
176178
unsigned,
177179
i,
178180
):
181+
name = self.table_map[self.table_id].columns[i].name
179182
if BitGet(cols_bitmap, i) == 0:
183+
# This block is only executed when binlog_row_image = MINIMAL.
184+
# When binlog_row_image = FULL, this block does not execute.
185+
self.__none_sources[name] = NONE_SOURCE.COLS_BITMAP
180186
return None
181187

182188
if self._is_null(null_bitmap, null_bitmap_index):
189+
self.__none_sources[name] = NONE_SOURCE.NULL
183190
return None
184191

185192
if column.type == FIELD_TYPE.TINY:
@@ -223,17 +230,26 @@ def __read_values_name(
223230
elif column.type == FIELD_TYPE.BLOB:
224231
return self.__read_string(column.length_size, column)
225232
elif column.type == FIELD_TYPE.DATETIME:
226-
return self.__read_datetime()
233+
ret = self.__read_datetime()
234+
if ret is None:
235+
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME_RANGE
236+
return ret
227237
elif column.type == FIELD_TYPE.TIME:
228238
return self.__read_time()
229239
elif column.type == FIELD_TYPE.DATE:
230-
return self.__read_date()
240+
ret = self.__read_date()
241+
if ret is None:
242+
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATE_RANGE
243+
return ret
231244
elif column.type == FIELD_TYPE.TIMESTAMP:
232245
return datetime.datetime.utcfromtimestamp(self.packet.read_uint32())
233246

234247
# For new date format:
235248
elif column.type == FIELD_TYPE.DATETIME2:
236-
return self.__read_datetime2(column)
249+
ret = self.__read_datetime2(column)
250+
if ret is None:
251+
self.__none_sources[name] = NONE_SOURCE.OUT_OF_DATETIME2_RANGE
252+
return ret
237253
elif column.type == FIELD_TYPE.TIME2:
238254
return self.__read_time2(column)
239255
elif column.type == FIELD_TYPE.TIMESTAMP2:
@@ -257,11 +273,16 @@ def __read_values_name(
257273
elif column.type == FIELD_TYPE.SET:
258274
bit_mask = self.packet.read_uint_by_size(column.size)
259275
if column.set_values:
260-
return {
276+
ret = {
261277
val
262278
for idx, val in enumerate(column.set_values)
263279
if bit_mask & (1 << idx)
264-
} or None
280+
}
281+
if not ret:
282+
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
283+
return None
284+
return ret
285+
self.__none_sources[column.name] = NONE_SOURCE.EMPTY_SET
265286
return None
266287
elif column.type == FIELD_TYPE.BIT:
267288
return self.__read_bit(column)
@@ -515,6 +536,16 @@ def _json_column_count(self):
515536
count += 1
516537
return count
517538

539+
def _get_none_sources(self, column_data):
540+
result = {}
541+
for column_name, value in column_data.items():
542+
if (column_name is None) or (value is not None):
543+
continue
544+
545+
source = self.__none_sources.get(column_name, "null")
546+
result[column_name] = source
547+
return result
548+
518549
def _dump(self):
519550
super()._dump()
520551
print(f"Table: {self.schema}.{self.table}")
@@ -557,6 +588,8 @@ def _fetch_one_row(self):
557588
row = {}
558589

559590
row["values"] = self._read_column_data(self.columns_present_bitmap)
591+
row["none_sources"] = self._get_none_sources(row["values"])
592+
560593
return row
561594

562595
def _dump(self):
@@ -565,7 +598,13 @@ def _dump(self):
565598
for row in self.rows:
566599
print("--")
567600
for key in row["values"]:
568-
print(f"* {key} : {row['values'][key]}")
601+
none_source = (
602+
row["none_sources"][key] if key in row["none_sources"] else ""
603+
)
604+
if none_source:
605+
print(f"* {key} : {row['values'][key]} ({none_source})")
606+
else:
607+
print(f"* {key} : {row['values'][key]}")
569608

570609

571610
class WriteRowsEvent(RowsEvent):
@@ -585,6 +624,8 @@ def _fetch_one_row(self):
585624
row = {}
586625

587626
row["values"] = self._read_column_data(self.columns_present_bitmap)
627+
row["none_sources"] = self._get_none_sources(row["values"])
628+
588629
return row
589630

590631
def _dump(self):
@@ -593,7 +634,13 @@ def _dump(self):
593634
for row in self.rows:
594635
print("--")
595636
for key in row["values"]:
596-
print(f"* {key} : {row['values'][key]}")
637+
none_source = (
638+
row["none_sources"][key] if key in row["none_sources"] else ""
639+
)
640+
if none_source:
641+
print(f"* {key} : row['values'][key] ({none_source})")
642+
else:
643+
print(f"* {key} : {row['values'][key]}")
597644

598645

599646
class UpdateRowsEvent(RowsEvent):
@@ -623,8 +670,9 @@ def _fetch_one_row(self):
623670
row = {}
624671

625672
row["before_values"] = self._read_column_data(self.columns_present_bitmap)
626-
673+
row["before_none_sources"] = self._get_none_sources(row["before_values"])
627674
row["after_values"] = self._read_column_data(self.columns_present_bitmap2)
675+
row["after_none_sources"] = self._get_none_sources(row["after_values"])
628676
return row
629677

630678
def _dump(self):
@@ -633,7 +681,23 @@ def _dump(self):
633681
for row in self.rows:
634682
print("--")
635683
for key in row["before_values"]:
636-
print(f"*{key}:{row['before_values'][key]}=>{row['after_values'][key]}")
684+
if key in row["before_none_sources"]:
685+
before_value_info = "%s(%s)" % (
686+
row["before_values"][key],
687+
row["before_none_sources"][key],
688+
)
689+
else:
690+
before_value_info = row["before_values"][key]
691+
692+
if key in row["after_none_sources"]:
693+
after_value_info = "%s(%s)" % (
694+
row["after_values"][key],
695+
row["after_none_sources"][key],
696+
)
697+
else:
698+
after_value_info = row["after_values"][key]
699+
700+
print(f"*{key}:{before_value_info}=>{after_value_info}")
637701

638702

639703
class OptionalMetaData:

Diff for: pymysqlreplication/tests/test_basic.py

+84
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"TestStatementConnectionSetting",
2222
"TestRowsQueryLogEvents",
2323
"TestOptionalMetaData",
24+
"TestColumnValueNoneSources",
2425
]
2526

2627

@@ -1754,6 +1755,89 @@ def tearDown(self):
17541755
super(TestOptionalMetaData, self).tearDown()
17551756

17561757

1758+
class TestColumnValueNoneSources(base.PyMySQLReplicationTestCase):
1759+
def setUp(self):
1760+
super(TestColumnValueNoneSources, self).setUp()
1761+
self.stream.close()
1762+
self.stream = BinLogStreamReader(
1763+
self.database,
1764+
server_id=1024,
1765+
only_events=(TableMapEvent,),
1766+
)
1767+
if not self.isMySQL8014AndMore():
1768+
self.skipTest(
1769+
"Mysql version is under 8.0.14 - pass TestColumnValueNoneSources"
1770+
)
1771+
self.execute("SET GLOBAL binlog_row_metadata='FULL';")
1772+
1773+
def test_get_none(self):
1774+
self.stream.close()
1775+
self.stream = BinLogStreamReader(
1776+
self.database,
1777+
server_id=1024,
1778+
resume_stream=False,
1779+
only_events=[WriteRowsEvent],
1780+
)
1781+
query = "CREATE TABLE null_operation_update_example (col1 INT, col2 INT);"
1782+
self.execute(query)
1783+
query = (
1784+
"INSERT INTO null_operation_update_example (col1, col2) VALUES (NULL, 1);"
1785+
)
1786+
self.execute(query)
1787+
self.execute("COMMIT")
1788+
write_rows_event = self.stream.fetchone()
1789+
self.assertIsInstance(write_rows_event, WriteRowsEvent)
1790+
1791+
none_sources = write_rows_event.rows[0].get("none_sources")
1792+
if none_sources:
1793+
self.assertEqual(none_sources["col1"], "null")
1794+
1795+
def test_get_none_invalid(self):
1796+
self.execute("SET SESSION SQL_MODE='ALLOW_INVALID_DATES'")
1797+
self.execute(
1798+
"CREATE TABLE test_table (col0 INT, col1 VARCHAR(10), col2 DATETIME, col3 DATE, col4 SET('a', 'b', 'c'))"
1799+
)
1800+
self.execute(
1801+
"INSERT INTO test_table VALUES (NULL, NULL, '0000-00-00 00:00:00', NULL, NULL)"
1802+
)
1803+
self.resetBinLog()
1804+
self.execute(
1805+
"UPDATE test_table SET col1 = NULL, col2 = NULL, col3='0000-00-00', col4='d' WHERE col0 IS NULL"
1806+
)
1807+
self.execute("COMMIT")
1808+
1809+
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
1810+
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
1811+
self.assertIsInstance(self.stream.fetchone(), PreviousGtidsEvent)
1812+
self.assertIsInstance(self.stream.fetchone(), GtidEvent)
1813+
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
1814+
self.assertIsInstance(self.stream.fetchone(), TableMapEvent)
1815+
1816+
event = self.stream.fetchone()
1817+
if self.isMySQL56AndMore():
1818+
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2)
1819+
else:
1820+
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1)
1821+
self.assertIsInstance(event, UpdateRowsEvent)
1822+
1823+
before_none_sources = event.rows[0].get("before_none_sources")
1824+
after_none_sources = event.rows[0].get("after_none_sources")
1825+
1826+
if before_none_sources:
1827+
self.assertEqual(before_none_sources["col0"], "null")
1828+
self.assertEqual(before_none_sources["col1"], "null")
1829+
self.assertEqual(before_none_sources["col2"], "out of datetime2 range")
1830+
self.assertEqual(before_none_sources["col3"], "null")
1831+
self.assertEqual(before_none_sources["col4"], "null")
1832+
1833+
if after_none_sources:
1834+
self.assertEqual(after_none_sources["col0"], "null")
1835+
self.assertEqual(after_none_sources["col1"], "null")
1836+
self.assertEqual(after_none_sources["col2"], "null")
1837+
self.assertEqual(after_none_sources["col3"], "out of date range")
1838+
self.assertEqual(after_none_sources["col4"], "empty set")
1839+
1840+
17571841
if __name__ == "__main__":
17581842
import unittest
17591843

0 commit comments

Comments
 (0)