Skip to content

Commit 7e007c9

Browse files
Merge pull request #395 from paulvic/paul/decode-errors
Add support for ignoring string decode errors
2 parents 7c141d3 + 00e6742 commit 7e007c9

File tree

6 files changed

+54
-6
lines changed

6 files changed

+54
-6
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ def __init__(self, connection_settings, server_id,
140140
pymysql_wrapper=None,
141141
fail_on_table_metadata_unavailable=False,
142142
slave_heartbeat=None,
143-
is_mariadb=False):
143+
is_mariadb=False,
144+
ignore_decode_errors=False):
144145
"""
145146
Attributes:
146147
ctl_connection_settings: Connection settings for cluster holding
@@ -177,6 +178,8 @@ def __init__(self, connection_settings, server_id,
177178
for semantics
178179
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
179180
to point to Mariadb specific GTID.
181+
ignore_decode_errors: If true, any decode errors encountered
182+
when reading column data will be ignored.
180183
"""
181184

182185
self.__connection_settings = connection_settings
@@ -198,6 +201,7 @@ def __init__(self, connection_settings, server_id,
198201
self.__allowed_events = self._allowed_event_list(
199202
only_events, ignored_events, filter_non_implemented_events)
200203
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
204+
self.__ignore_decode_errors = ignore_decode_errors
201205

202206
# We can't filter on packet level TABLE_MAP and rotate event because
203207
# we need them for handling other operations
@@ -502,7 +506,8 @@ def fetchone(self):
502506
self.__only_schemas,
503507
self.__ignored_schemas,
504508
self.__freeze_schema,
505-
self.__fail_on_table_metadata_unavailable)
509+
self.__fail_on_table_metadata_unavailable,
510+
self.__ignore_decode_errors)
506511

507512
if binlog_event.event_type == ROTATE_EVENT:
508513
self.log_pos = binlog_event.event.position

pymysqlreplication/event.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection,
1313
only_schemas=None,
1414
ignored_schemas=None,
1515
freeze_schema=False,
16-
fail_on_table_metadata_unavailable=False):
16+
fail_on_table_metadata_unavailable=False,
17+
ignore_decode_errors=False):
1718
self.packet = from_packet
1819
self.table_map = table_map
1920
self.event_type = self.packet.event_type
2021
self.timestamp = self.packet.timestamp
2122
self.event_size = event_size
2223
self._ctl_connection = ctl_connection
2324
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
25+
self._ignore_decode_errors = ignore_decode_errors
2426
# The event have been fully processed, if processed is false
2527
# the event will be skipped
2628
self._processed = True

pymysqlreplication/packet.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
9696
only_schemas,
9797
ignored_schemas,
9898
freeze_schema,
99-
fail_on_table_metadata_unavailable):
99+
fail_on_table_metadata_unavailable,
100+
ignore_decode_errors):
100101
# -1 because we ignore the ok byte
101102
self.read_bytes = 0
102103
# Used when we want to override a value in the data buffer
@@ -140,7 +141,8 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
140141
only_schemas=only_schemas,
141142
ignored_schemas=ignored_schemas,
142143
freeze_schema=freeze_schema,
143-
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable)
144+
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable,
145+
ignore_decode_errors=ignore_decode_errors)
144146
if self.event._processed == False:
145147
self.event = None
146148

pymysqlreplication/row_event.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ def __read_string(self, size, column):
247247
string = self.packet.read_length_coded_pascal_string(size)
248248
if column.character_set_name is not None:
249249
encoding = self.charset_to_encoding(column.character_set_name)
250-
string = string.decode(encoding)
250+
decode_errors = "ignore" if self._ignore_decode_errors else "strict"
251+
string = string.decode(encoding, decode_errors)
251252
return string
252253

253254
def __read_bit(self, column):

pymysqlreplication/tests/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ def execute(self, query):
8585
c = self.conn_control.cursor()
8686
c.execute(query)
8787
return c
88+
89+
def execute_with_args(self, query, args):
90+
c = self.conn_control.cursor()
91+
c.execute(query, args)
92+
return c
8893

8994
def resetBinLog(self):
9095
self.execute("RESET MASTER")

pymysqlreplication/tests/test_basic.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,39 @@ def test_drop_table_tablemetadata_unavailable(self):
678678
self.resetBinLog()
679679
assert had_error
680680

681+
def test_ignore_decode_errors(self):
682+
problematic_unicode_string = b'[{"text":"\xed\xa0\xbd \xed\xb1\x8d Some string"}]'
683+
self.stream.close()
684+
self.execute("CREATE TABLE test (data VARCHAR(50) CHARACTER SET utf8mb4)")
685+
self.execute_with_args("INSERT INTO test (data) VALUES (%s)", (problematic_unicode_string))
686+
self.execute("COMMIT")
687+
688+
# Initialize with ignore_decode_errors=False
689+
self.stream = BinLogStreamReader(
690+
self.database,
691+
server_id=1024,
692+
only_events=(WriteRowsEvent,),
693+
ignore_decode_errors=False
694+
)
695+
event = self.stream.fetchone()
696+
event = self.stream.fetchone()
697+
with self.assertRaises(UnicodeError) as exception:
698+
event = self.stream.fetchone()
699+
data = event.rows[0]["values"]["data"]
700+
701+
# Initialize with ignore_decode_errors=True
702+
self.stream = BinLogStreamReader(
703+
self.database,
704+
server_id=1024,
705+
only_events=(WriteRowsEvent,),
706+
ignore_decode_errors=True
707+
)
708+
self.stream.fetchone()
709+
self.stream.fetchone()
710+
event = self.stream.fetchone()
711+
data = event.rows[0]["values"]["data"]
712+
self.assertEqual(data, '[{"text":" Some string"}]')
713+
681714
def test_drop_column(self):
682715
self.stream.close()
683716
self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))")

0 commit comments

Comments
 (0)