Skip to content

Commit 460a702

Browse files
Merge branch '23-OSSCA-python-mysql-replication-feat/footer-crc32'
2 parents edd4ff6 + a2896ac commit 460a702

File tree

4 files changed

+104
-27
lines changed

4 files changed

+104
-27
lines changed

Diff for: pymysqlreplication/binlogstream.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ def __init__(self, connection_settings, server_id,
142142
slave_heartbeat=None,
143143
is_mariadb=False,
144144
annotate_rows_event=False,
145-
ignore_decode_errors=False):
145+
ignore_decode_errors=False,
146+
verify_checksum=False,):
146147
"""
147148
Attributes:
148149
ctl_connection_settings: Connection settings for cluster holding
@@ -184,6 +185,7 @@ def __init__(self, connection_settings, server_id,
184185
used with 'is_mariadb'
185186
ignore_decode_errors: If true, any decode errors encountered
186187
when reading column data will be ignored.
188+
verify_checksum: If true, verify events read from the binary log by examining checksums.
187189
"""
188190

189191
self.__connection_settings = connection_settings
@@ -206,6 +208,7 @@ def __init__(self, connection_settings, server_id,
206208
only_events, ignored_events, filter_non_implemented_events)
207209
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
208210
self.__ignore_decode_errors = ignore_decode_errors
211+
self.__verify_checksum = verify_checksum
209212

210213
# We can't filter on packet level TABLE_MAP and rotate event because
211214
# we need them for handling other operations
@@ -535,7 +538,8 @@ def fetchone(self):
535538
self.__ignored_schemas,
536539
self.__freeze_schema,
537540
self.__fail_on_table_metadata_unavailable,
538-
self.__ignore_decode_errors)
541+
self.__ignore_decode_errors,
542+
self.__verify_checksum,)
539543

540544
if binlog_event.event_type == ROTATE_EVENT:
541545
self.log_pos = binlog_event.event.position

Diff for: pymysqlreplication/event.py

+40-22
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import struct
55
import datetime
66
import decimal
7+
import zlib
8+
79
from pymysqlreplication.constants.STATUS_VAR_KEY import *
810
from pymysqlreplication.exceptions import StatusVariableMismatch
911
from typing import Union, Optional
@@ -18,7 +20,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
1820
ignored_schemas=None,
1921
freeze_schema=False,
2022
fail_on_table_metadata_unavailable=False,
21-
ignore_decode_errors=False):
23+
ignore_decode_errors=False,
24+
verify_checksum=False,):
2225
self.packet = from_packet
2326
self.table_map = table_map
2427
self.event_type = self.packet.event_type
@@ -28,17 +31,32 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
2831
self.mysql_version = mysql_version
2932
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
3033
self._ignore_decode_errors = ignore_decode_errors
34+
self._verify_checksum = verify_checksum
35+
self._is_event_valid = None
3136
# The event have been fully processed, if processed is false
3237
# the event will be skipped
3338
self._processed = True
3439
self.complete = True
40+
self._verify_event()
3541

3642
def _read_table_id(self):
3743
# Table ID is 6 byte
3844
# pad little-endian number
3945
table_id = self.packet.read(6) + b"\x00\x00"
4046
return struct.unpack('<Q', table_id)[0]
4147

48+
def _verify_event(self):
49+
if not self._verify_checksum:
50+
return
51+
52+
self.packet.rewind(1)
53+
data = self.packet.read(19 + self.event_size)
54+
footer = self.packet.read(4)
55+
byte_data = zlib.crc32(data).to_bytes(4, byteorder='little')
56+
self._is_event_valid = True if byte_data == footer else False
57+
self.packet.read_bytes -= (19 + self.event_size + 4)
58+
self.packet.rewind(20)
59+
4260
def dump(self):
4361
print("=== %s ===" % (self.__class__.__name__))
4462
print("Date: %s" % (datetime.datetime.utcfromtimestamp(self.timestamp)
@@ -57,7 +75,7 @@ class GtidEvent(BinLogEvent):
5775
"""
5876
GTID change in binlog event
5977
60-
For more information: `[GTID] <https://mariadb.com/kb/en/gtid/>`_ `[see also] <https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Gtid__event.html>`_
78+
For more information: `[GTID] <https://mariadb.com/kb/en/gtid/>`_ `[see also] <https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Gtid__event.html>`_
6179
6280
:ivar commit_flag: 1byte - 00000001 = Transaction may have changes logged with SBR.
6381
In 5.6, 5.7.0-5.7.18, and 8.0.0-8.0.1, this flag is always set. Starting in 5.7.19 and 8.0.2, this flag is cleared if the transaction only contains row events. It is set if any part of the transaction is written in statement format.
@@ -152,8 +170,8 @@ def _dump(self):
152170

153171
class MariadbAnnotateRowsEvent(BinLogEvent):
154172
"""
155-
Annotate rows event
156-
If you want to check this binlog, change the value of the flag(line 382 of the 'binlogstream.py') option to 2
173+
Annotate rows event
174+
If you want to check this binlog, change the value of the flag(line 382 of the 'binlogstream.py') option to 2
157175
https://mariadb.com/kb/en/annotate_rows_event/
158176
159177
:ivar sql_statement: str - The SQL statement
@@ -229,7 +247,7 @@ class XAPrepareEvent(BinLogEvent):
229247
Like Xid_event, it contains XID of the **prepared** transaction.
230248
231249
For more information: `[see details] <https://dev.mysql.com/doc/refman/8.0/en/xa-statements.html>`_.
232-
250+
233251
:ivar one_phase: current XA transaction commit method
234252
:ivar xid_format_id: a number that identifies the format used by the gtrid and bqual values
235253
:ivar xid: serialized XID representation of XA transaction (xid_gtrid + xid_bqual)
@@ -260,14 +278,14 @@ def _dump(self):
260278
class FormatDescriptionEvent(BinLogEvent):
261279
"""
262280
Represents a Format Description Event in the MySQL binary log.
263-
281+
264282
This event is written at the start of a binary log file for binlog version 4.
265283
It provides the necessary information to decode subsequent events in the file.
266284
267285
:ivar binlog_version: int - Version of the binary log format.
268286
:ivar mysql_version_str: str - Server's MySQL version in string format.
269287
"""
270-
288+
271289
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
272290
super().__init__(from_packet, event_size, table_map,
273291
ctl_connection, **kwargs)
@@ -289,7 +307,7 @@ class XidEvent(BinLogEvent):
289307
"""
290308
A COMMIT event generated when COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine occurs.
291309
292-
For more information: `[see details] <https://mariadb.com/kb/en/xid_event/>`_.
310+
For more information: `[see details] <https://mariadb.com/kb/en/xid_event/>`_.
293311
294312
:ivar xid: uint - Transaction ID for 2 Phase Commit.
295313
"""
@@ -312,18 +330,18 @@ class HeartbeatLogEvent(BinLogEvent):
312330
313331
`[see MASTER_HEARTBEAT_PERIOD] <https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html>`_.
314332
315-
A Mysql server also does it for each skipped events in the log.
316-
This is because to make the slave bump its position so that
333+
A Mysql server also does it for each skipped events in the log.
334+
This is because to make the slave bump its position so that
317335
if a disconnection occurs, the slave will only reconnects from the lasted skipped position. (Baloo's idea)
318336
319337
(see Binlog_sender::send_events in sql/rpl_binlog_sender.cc)
320338
321339
Warning:
322-
That makes 106 bytes of data for skipped event in the binlog.
323-
*this is also the case with GTID replication*.
324-
To mitigate such behavior, you are expected to keep the binlog small
325-
(see max_binlog_size, defaults to 1G).
326-
In any case, the timestamp is 0 (as in 1970-01-01T00:00:00).
340+
That makes 106 bytes of data for skipped event in the binlog.
341+
*this is also the case with GTID replication*.
342+
To mitigate such behavior, you are expected to keep the binlog small
343+
(see max_binlog_size, defaults to 1G).
344+
In any case, the timestamp is 0 (as in 1970-01-01T00:00:00).
327345
328346
:ivar ident: Name of the current binlog
329347
"""
@@ -411,7 +429,7 @@ def _read_status_vars_value_for_key(self, key):
411429
elif key == Q_TIME_ZONE_CODE: # 0x05
412430
time_zone_len = self.packet.read_uint8()
413431
if time_zone_len:
414-
self.time_zone = self.packet.read(time_zone_len)
432+
self.time_zone = self.packet.read(time_zone_len)
415433
elif key == Q_CATALOG_NZ_CODE: # 0x06
416434
catalog_len = self.packet.read_uint8()
417435
if catalog_len:
@@ -545,8 +563,8 @@ class IntvarEvent(BinLogEvent):
545563
"""
546564
Stores the value of auto-increment variables.
547565
This event will be created just before a QueryEvent.
548-
549-
:ivar type: int - 1 byte identifying the type of variable stored.
566+
567+
:ivar type: int - 1 byte identifying the type of variable stored.
550568
Can be either LAST_INSERT_ID_EVENT (1) or INSERT_ID_EVENT (2).
551569
:ivar value: int - The value of the variable
552570
"""
@@ -743,10 +761,10 @@ def _dump(self) -> None:
743761

744762
class MariadbStartEncryptionEvent(BinLogEvent):
745763
"""
746-
Since MariaDB 10.1.7,
747-
the START_ENCRYPTION event is written to every binary log file
748-
if encrypt_binlog is set to ON. Prior to enabling this setting,
749-
additional configuration steps are required in MariaDB.
764+
Since MariaDB 10.1.7,
765+
the START_ENCRYPTION event is written to every binary log file
766+
if encrypt_binlog is set to ON. Prior to enabling this setting,
767+
additional configuration steps are required in MariaDB.
750768
(Link: https://mariadb.com/kb/en/encrypting-binary-logs/)
751769
752770
This event is written just once, after the Format Description event

Diff for: pymysqlreplication/packet.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ def __init__(self, from_packet, table_map,
105105
ignored_schemas,
106106
freeze_schema,
107107
fail_on_table_metadata_unavailable,
108-
ignore_decode_errors):
108+
ignore_decode_errors,
109+
verify_checksum,):
109110
# -1 because we ignore the ok byte
110111
self.read_bytes = 0
111112
# Used when we want to override a value in the data buffer
@@ -135,6 +136,7 @@ def __init__(self, from_packet, table_map,
135136
if use_checksum:
136137
event_size_without_header = self.event_size - 23
137138
else:
139+
verify_checksum = False
138140
event_size_without_header = self.event_size - 19
139141

140142
self.event = None
@@ -151,7 +153,8 @@ def __init__(self, from_packet, table_map,
151153
ignored_schemas=ignored_schemas,
152154
freeze_schema=freeze_schema,
153155
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable,
154-
ignore_decode_errors=ignore_decode_errors)
156+
ignore_decode_errors=ignore_decode_errors,
157+
verify_checksum=verify_checksum)
155158
if self.event._processed == False:
156159
self.event = None
157160

Diff for: pymysqlreplication/tests/test_basic.py

+53-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
from pymysqlreplication.event import *
1919
from pymysqlreplication.constants.BINLOG import *
2020
from pymysqlreplication.row_event import *
21+
from pymysqlreplication.packet import BinLogPacketWrapper
22+
from pymysql.protocol import MysqlPacket
2123

22-
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting", "TestRowsQueryLogEvents"]
24+
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings",
25+
"TestGtidBinLogStreamReader", "TestMariadbBinlogStreamReader", "TestStatementConnectionSetting",
26+
"TestRowsQueryLogEvents"]
2327

2428

2529
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
@@ -522,6 +526,54 @@ def test_end_log_pos(self):
522526
self.assertEqual(last_log_pos, 888)
523527
self.assertEqual(last_event_type, TABLE_MAP_EVENT)
524528

529+
def test_event_validation(self):
530+
def create_binlog_packet_wrapper(pkt):
531+
return BinLogPacketWrapper(pkt, self.stream.table_map,
532+
self.stream._ctl_connection, self.stream.mysql_version,
533+
self.stream._BinLogStreamReader__use_checksum,
534+
self.stream._BinLogStreamReader__allowed_events_in_packet,
535+
self.stream._BinLogStreamReader__only_tables,
536+
self.stream._BinLogStreamReader__ignored_tables,
537+
self.stream._BinLogStreamReader__only_schemas,
538+
self.stream._BinLogStreamReader__ignored_schemas,
539+
self.stream._BinLogStreamReader__freeze_schema,
540+
self.stream._BinLogStreamReader__fail_on_table_metadata_unavailable,
541+
self.stream._BinLogStreamReader__ignore_decode_errors,
542+
self.stream._BinLogStreamReader__verify_checksum,)
543+
self.stream.close()
544+
self.stream = BinLogStreamReader(
545+
self.database,
546+
server_id=1024,
547+
blocking=False,
548+
verify_checksum=True
549+
)
550+
# For event data, refer to the official document example data of mariaDB.
551+
# https://mariadb.com/kb/en/query_event/#example-with-crc32
552+
correct_event_data = (
553+
# OK value
554+
b"\x00"
555+
# Header
556+
b"q\x17(Z\x02\x8c'\x00\x00U\x00\x00\x00\x01\t\x00\x00\x00\x00"
557+
# Content
558+
b"f\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1a\x00"
559+
b"\x00\x00\x00\x00\x00\x01\x00\x00\x00P\x00\x00"
560+
b"\x00\x00\x06\x03std\x04\x08\x00\x08\x00\x08\x00\x00"
561+
b"TRUNCATE TABLE test.t4"
562+
# CRC 32, 4 Bytes
563+
b"Ji\x9e\xed"
564+
)
565+
# Assume a bit flip occurred while data was being transmitted q(1001000) -> U(0110111)
566+
modified_byte = b"U"
567+
wrong_event_data = correct_event_data[:1] + modified_byte + correct_event_data[2:]
568+
569+
packet = MysqlPacket(correct_event_data, 0)
570+
wrong_packet = MysqlPacket(wrong_event_data, 0)
571+
self.stream.fetchone() # for '_ctl_connection' parameter
572+
binlog_event = create_binlog_packet_wrapper(packet)
573+
wrong_event = create_binlog_packet_wrapper(wrong_packet)
574+
self.assertEqual(binlog_event.event._is_event_valid, True)
575+
self.assertNotEqual(wrong_event.event._is_event_valid, True)
576+
525577

526578
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):
527579
def ignoredEvents(self):

0 commit comments

Comments
 (0)