Skip to content

Commit 4f54009

Browse files
Merge pull request julien-duponchelle#117 from vartec/master
Partial fix for dropped columns blowing up replication
2 parents 647ccee + 783e5a5 commit 4f54009

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

pymysqlreplication/row_event.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,20 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
547547
self.packet.read_length_coded_binary()
548548
for i in range(0, len(column_types)):
549549
column_type = column_types[i]
550-
column_schema = self.column_schemas[i]
550+
try:
551+
column_schema = self.column_schemas[i]
552+
except IndexError:
553+
# this a dirty hack to prevent row evens containing columns which have been dropped prior
554+
# to pymysqlreplication start, but replayed from binlog from blowing up the service.
555+
# TODO: this does not address the issue if the column other than the last one is dropped
556+
column_schema = {
557+
'COLUMN_NAME': '__dropped_col_{i}__'.format(i=i),
558+
'COLLATION_NAME': None,
559+
'CHARACTER_SET_NAME': None,
560+
'COLUMN_COMMENT': None,
561+
'COLUMN_TYPE': 'BLOB', # we don't know what it is, so let's not do anything with it.
562+
'COLUMN_KEY': '',
563+
}
551564
col = Column(byte2int(column_type), column_schema, from_packet)
552565
self.columns.append(col)
553566

pymysqlreplication/tests/test_basic.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,28 @@ def test_drop_table(self):
563563

564564
self.assertEqual([], event.rows)
565565

566+
def test_drop_column(self):
567+
self.stream.close()
568+
self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))")
569+
self.execute("INSERT INTO test_drop_column VALUES (1, 'A value')")
570+
self.execute("COMMIT")
571+
self.execute("ALTER TABLE test_drop_column DROP COLUMN data")
572+
self.execute("INSERT INTO test_drop_column VALUES (2)")
573+
self.execute("COMMIT")
574+
575+
self.stream = BinLogStreamReader(
576+
self.database,
577+
server_id=1024,
578+
only_events=(WriteRowsEvent,),
579+
)
580+
try:
581+
self.stream.fetchone() # insert with two values
582+
self.stream.fetchone() # insert with one value
583+
except Exception as e:
584+
self.fail("raised unexpected exception: {exception}".format(exception=e))
585+
finally:
586+
self.resetBinLog()
587+
566588

567589
class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase):
568590
def setUp(self):

0 commit comments

Comments
 (0)