diff --git a/pymysqlreplication/bitmap.py b/pymysqlreplication/bitmap.py new file mode 100644 index 00000000..740894ac --- /dev/null +++ b/pymysqlreplication/bitmap.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- + +bitCountInByte = [ + 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, + 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, + 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, + 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, + 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, + 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, + 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, + 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8, +] + +# Calculate totol bit counts in a bitmap +def BitCount(bitmap): + n = 0 + for i in range(0, len(bitmap)): + bit = bitmap[i] + if type(bit) is str: + bit = ord(bit) + n += bitCountInByte[bit] + return n + +# Get the bit set at offset position in bitmap +def BitGet(bitmap, position): + bit = bitmap[int(position / 8)] + if type(bit) is str: + bit = ord(bit) + return bit & (1 << (position & 7)) diff --git a/pymysqlreplication/row_event.py b/pymysqlreplication/row_event.py index 6e5518e7..e52afaf9 100644 --- a/pymysqlreplication/row_event.py +++ b/pymysqlreplication/row_event.py @@ -11,7 +11,7 @@ from .constants import BINLOG from .column import Column from .table import Table - +from .bitmap import BitCount, BitGet class RowsEvent(BinLogEvent): def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): @@ -60,18 +60,28 @@ def __is_null(self, null_bitmap, position): bit = ord(bit) return bit & (1 << (position % 8)) - def _read_column_data(self, null_bitmap): + def _read_column_data(self, cols_bitmap): """Use for WRITE, UPDATE and DELETE events. Return an array of column data """ values = {} + # null bitmap length = (bits set in 'columns-present-bitmap'+7)/8 + # See http://dev.mysql.com/doc/internals/en/rows-event.html + null_bitmap = self.packet.read((BitCount(cols_bitmap) + 7) / 8) + + nullBitmapIndex = 0 nb_columns = len(self.columns) for i in range(0, nb_columns): column = self.columns[i] name = self.table_map[self.table_id].columns[i].name unsigned = self.table_map[self.table_id].columns[i].unsigned - if self.__is_null(null_bitmap, i): + + if BitGet(cols_bitmap, i) == 0: + values[name] = None + continue + + if self.__is_null(null_bitmap, nullBitmapIndex): values[name] = None elif column.type == FIELD_TYPE.TINY: if unsigned: @@ -153,6 +163,9 @@ def _read_column_data(self, null_bitmap): else: raise NotImplementedError("Unknown MySQL column type: %d" % (column.type)) + + nullBitmapIndex += 1 + return values def __add_fsp_to_time(self, time, column): @@ -394,8 +407,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _fetch_one_row(self): row = {} - null_bitmap = self.packet.read((self.number_of_columns + 7) / 8) - row["values"] = self._read_column_data(null_bitmap) + row["values"] = self._read_column_data(self.columns_present_bitmap) return row def _dump(self): @@ -423,8 +435,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _fetch_one_row(self): row = {} - null_bitmap = self.packet.read((self.number_of_columns + 7) / 8) - row["values"] = self._read_column_data(null_bitmap) + row["values"] = self._read_column_data(self.columns_present_bitmap) return row def _dump(self): @@ -459,12 +470,10 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs) def _fetch_one_row(self): row = {} - null_bitmap = self.packet.read((self.number_of_columns + 7) / 8) - row["before_values"] = self._read_column_data(null_bitmap) + row["before_values"] = self._read_column_data(self.columns_present_bitmap) - null_bitmap = self.packet.read((self.number_of_columns + 7) / 8) - row["after_values"] = self._read_column_data(null_bitmap) + row["after_values"] = self._read_column_data(self.columns_present_bitmap2) return row def _dump(self): diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index 770a3a92..ac804aae 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -221,6 +221,98 @@ def test_update_row_event(self): self.assertEqual(event.rows[0]["after_values"]["id"], 1) self.assertEqual(event.rows[0]["after_values"]["data"], "World") + def test_minimal_image_write_row_event(self): + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + query = "SET SESSION binlog_row_image = 'minimal'" + self.execute(query) + query = "INSERT INTO test (data) VALUES('Hello World')" + self.execute(query) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + #QueryEvent for the Create Table + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + #QueryEvent for the BEGIN + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V1) + self.assertIsInstance(event, WriteRowsEvent) + self.assertEqual(event.rows[0]["values"]["id"], 1) + self.assertEqual(event.rows[0]["values"]["data"], "Hello World") + self.assertEqual(event.schema, "pymysqlreplication_test") + self.assertEqual(event.table, "test") + self.assertEqual(event.columns[1].name, 'data') + + def test_minimal_image_delete_row_event(self): + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + query = "INSERT INTO test (data) VALUES('Hello World')" + self.execute(query) + query = "SET SESSION binlog_row_image = 'minimal'" + self.execute(query) + self.resetBinLog() + + query = "DELETE FROM test WHERE id = 1" + self.execute(query) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + + #QueryEvent for the BEGIN + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V1) + self.assertIsInstance(event, DeleteRowsEvent) + self.assertEqual(event.rows[0]["values"]["id"], 1) + self.assertEqual(event.rows[0]["values"]["data"], None) + + def test_minimal_image_update_row_event(self): + query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" + self.execute(query) + query = "INSERT INTO test (data) VALUES('Hello')" + self.execute(query) + query = "SET SESSION binlog_row_image = 'minimal'" + self.execute(query) + self.resetBinLog() + + query = "UPDATE test SET data = 'World' WHERE id = 1" + self.execute(query) + self.execute("COMMIT") + + self.assertIsInstance(self.stream.fetchone(), RotateEvent) + self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) + + #QueryEvent for the BEGIN + self.assertIsInstance(self.stream.fetchone(), QueryEvent) + + self.assertIsInstance(self.stream.fetchone(), TableMapEvent) + + event = self.stream.fetchone() + if self.isMySQL56AndMore(): + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2) + else: + self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1) + self.assertIsInstance(event, UpdateRowsEvent) + self.assertEqual(event.rows[0]["before_values"]["id"], 1) + self.assertEqual(event.rows[0]["before_values"]["data"], None) + self.assertEqual(event.rows[0]["after_values"]["id"], None) + self.assertEqual(event.rows[0]["after_values"]["data"], "World") + def test_log_pos(self): query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))" self.execute(query)