Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit bf73d97

Browse files
committedSep 2, 2023
Merge branch 'main' of https://github.com/23-OSSCA-python-mysql-replication/python-mysql-replication into feature/typing-packet-gtid-binlogstream
2 parents b9b241b + 19fe16c commit bf73d97

File tree

7 files changed

+645
-127
lines changed

7 files changed

+645
-127
lines changed
 

‎README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ Featured
6363

6464
[Streaming Changes in a Database with Amazon Kinesis](https://aws.amazon.com/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/) (by Emmanuel Espina, Amazon Web Services)
6565

66+
[Near Zero Downtime Migration from MySQL to DynamoDB](https://aws.amazon.com/ko/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/) (by YongSeong Lee, Amazon Web Services)
6667

6768
Projects using this library
6869
===========================

‎pymysqlreplication/binlogstream.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
1717
HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
1818
MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent, RowsQueryLogEvent,
19-
MariadbGtidListEvent, MariadbBinLogCheckPointEvent)
19+
MariadbGtidListEvent, MariadbBinLogCheckPointEvent, UserVarEvent,
20+
PreviousGtidsEvent)
2021
from .exceptions import BinLogNotEnabled
2122
from .gtid import GtidSet
2223
from .packet import BinLogPacketWrapper
@@ -148,7 +149,8 @@ def __init__(self, connection_settings: Dict, server_id: int,
148149
slave_heartbeat: Optional[float] = None,
149150
is_mariadb: bool = False,
150151
annotate_rows_event: bool = False,
151-
ignore_decode_errors: bool = False) -> None:
152+
ignore_decode_errors: bool = False,
153+
verify_checksum = False) -> None:
152154
"""
153155
Attributes:
154156
ctl_connection_settings[Dict]: Connection settings for cluster holding
@@ -188,6 +190,7 @@ def __init__(self, connection_settings: Dict, server_id: int,
188190
used with 'is_mariadb'
189191
ignore_decode_errors[bool]: If true, any decode errors encountered
190192
when reading column data will be ignored.
193+
verify_checksum[bool]: If true, verify events read from the binary log by examining checksums.
191194
"""
192195

193196
self.__connection_settings: Dict = connection_settings
@@ -210,6 +213,7 @@ def __init__(self, connection_settings: Dict, server_id: int,
210213
only_events, ignored_events, filter_non_implemented_events)
211214
self.__fail_on_table_metadata_unavailable: bool = fail_on_table_metadata_unavailable
212215
self.__ignore_decode_errors: bool = ignore_decode_errors
216+
self.__verify_checksum: bool = verify_checksum
213217

214218
# We can't filter on packet level TABLE_MAP and rotate event because
215219
# we need them for handling other operations
@@ -541,7 +545,8 @@ def fetchone(self) -> Union[BinLogPacketWrapper, None]:
541545
self.__ignored_schemas,
542546
self.__freeze_schema,
543547
self.__fail_on_table_metadata_unavailable,
544-
self.__ignore_decode_errors)
548+
self.__ignore_decode_errors,
549+
self.__verify_checksum,)
545550

546551
if binlog_event.event_type == ROTATE_EVENT:
547552
self.log_pos = binlog_event.event.position
@@ -632,7 +637,9 @@ def _allowed_event_list(self, only_events: Optional[List[str]], ignored_events:
632637
RandEvent,
633638
MariadbStartEncryptionEvent,
634639
MariadbGtidListEvent,
635-
MariadbBinLogCheckPointEvent
640+
MariadbBinLogCheckPointEvent,
641+
UserVarEvent,
642+
PreviousGtidsEvent
636643
))
637644
if ignored_events is not None:
638645
for e in ignored_events:

‎pymysqlreplication/event.py

Lines changed: 330 additions & 75 deletions
Large diffs are not rendered by default.

‎pymysqlreplication/packet.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,15 @@ class BinLogPacketWrapper(object):
6565
constants.XID_EVENT: event.XidEvent,
6666
constants.INTVAR_EVENT: event.IntvarEvent,
6767
constants.GTID_LOG_EVENT: event.GtidEvent,
68+
constants.PREVIOUS_GTIDS_LOG_EVENT: event.PreviousGtidsEvent,
6869
constants.STOP_EVENT: event.StopEvent,
6970
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
7071
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
7172
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
7273
constants.XA_PREPARE_EVENT: event.XAPrepareEvent,
7374
constants.ROWS_QUERY_LOG_EVENT: event.RowsQueryLogEvent,
7475
constants.RAND_EVENT: event.RandEvent,
76+
constants.USER_VAR_EVENT: event.UserVarEvent,
7577
# row_event
7678
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
7779
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,
@@ -83,11 +85,8 @@ class BinLogPacketWrapper(object):
8385

8486
#5.6 GTID enabled replication events
8587
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
86-
constants.ANONYMOUS_GTID_LOG_EVENT: event.NotImplementedEvent,
87-
constants.PREVIOUS_GTIDS_LOG_EVENT: event.NotImplementedEvent,
8888
# MariaDB GTID
8989
constants.MARIADB_ANNOTATE_ROWS_EVENT: event.MariadbAnnotateRowsEvent,
90-
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.NotImplementedEvent,
9190
constants.MARIADB_BINLOG_CHECKPOINT_EVENT: event.MariadbBinLogCheckPointEvent,
9291
constants.MARIADB_GTID_EVENT: event.MariadbGtidEvent,
9392
constants.MARIADB_GTID_GTID_LIST_EVENT: event.MariadbGtidListEvent,
@@ -105,7 +104,8 @@ def __init__(self, from_packet, table_map,
105104
ignored_schemas,
106105
freeze_schema,
107106
fail_on_table_metadata_unavailable,
108-
ignore_decode_errors):
107+
ignore_decode_errors,
108+
verify_checksum,):
109109
# -1 because we ignore the ok byte
110110
self.read_bytes = 0
111111
# Used when we want to override a value in the data buffer
@@ -135,6 +135,7 @@ def __init__(self, from_packet, table_map,
135135
if use_checksum:
136136
event_size_without_header = self.event_size - 23
137137
else:
138+
verify_checksum = False
138139
event_size_without_header = self.event_size - 19
139140

140141
self.event = None
@@ -151,7 +152,8 @@ def __init__(self, from_packet, table_map,
151152
ignored_schemas=ignored_schemas,
152153
freeze_schema=freeze_schema,
153154
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable,
154-
ignore_decode_errors=ignore_decode_errors)
155+
ignore_decode_errors=ignore_decode_errors,
156+
verify_checksum=verify_checksum)
155157
if self.event._processed == False:
156158
self.event = None
157159

‎pymysqlreplication/tests/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ class PyMySQLReplicationTestCase(base):
1818
def ignoredEvents(self):
1919
return []
2020

21-
def setUp(self):
21+
def setUp(self, charset="utf8"):
2222
# default
2323
self.database = {
2424
"host": os.environ.get("MYSQL_5_7") or "localhost",
2525
"user": "root",
2626
"passwd": "",
2727
"port": 3306,
2828
"use_unicode": True,
29-
"charset": "utf8",
29+
"charset": charset,
3030
"db": "pymysqlreplication_test"
3131
}
3232

‎pymysqlreplication/tests/test_basic.py

Lines changed: 293 additions & 40 deletions
Large diffs are not rendered by default.

‎pymysqlreplication/tests/test_data_type.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def encode_value(v):
3232

3333
class TestDataType(base.PyMySQLReplicationTestCase):
3434
def ignoredEvents(self):
35-
return [GtidEvent]
35+
return [GtidEvent, PreviousGtidsEvent]
3636

3737
def create_and_insert_value(self, create_query, insert_query):
3838
self.execute(create_query)

0 commit comments

Comments
 (0)
Please sign in to comment.