Skip to content

add binlog row minimal and noblob image support #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Dec 29, 2014
37 changes: 37 additions & 0 deletions pymysqlreplication/bitmap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-

bitCountInByte = [
0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7,
4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8,
]

# Calculate totol bit counts in a bitmap
def BitCount(bitmap):
n = 0
for i in range(0, len(bitmap)):
bit = bitmap[i]
if type(bit) is str:
bit = ord(bit)
n += bitCountInByte[bit]
return n

# Get the bit set at offset position in bitmap
def BitGet(bitmap, position):
bit = bitmap[int(position / 8)]
if type(bit) is str:
bit = ord(bit)
return bit & (1 << (position & 7))
31 changes: 20 additions & 11 deletions pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .constants import BINLOG
from .column import Column
from .table import Table

from .bitmap import BitCount, BitGet

class RowsEvent(BinLogEvent):
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
Expand Down Expand Up @@ -60,18 +60,28 @@ def __is_null(self, null_bitmap, position):
bit = ord(bit)
return bit & (1 << (position % 8))

def _read_column_data(self, null_bitmap):
def _read_column_data(self, cols_bitmap):
"""Use for WRITE, UPDATE and DELETE events.
Return an array of column data
"""
values = {}

# null bitmap length = (bits set in 'columns-present-bitmap'+7)/8
# See http://dev.mysql.com/doc/internals/en/rows-event.html
null_bitmap = self.packet.read((BitCount(cols_bitmap) + 7) / 8)

nullBitmapIndex = 0
nb_columns = len(self.columns)
for i in range(0, nb_columns):
column = self.columns[i]
name = self.table_map[self.table_id].columns[i].name
unsigned = self.table_map[self.table_id].columns[i].unsigned
if self.__is_null(null_bitmap, i):

if BitGet(cols_bitmap, i) == 0:
values[name] = None
continue

if self.__is_null(null_bitmap, nullBitmapIndex):
values[name] = None
elif column.type == FIELD_TYPE.TINY:
if unsigned:
Expand Down Expand Up @@ -153,6 +163,9 @@ def _read_column_data(self, null_bitmap):
else:
raise NotImplementedError("Unknown MySQL column type: %d" %
(column.type))

nullBitmapIndex += 1

return values

def __add_fsp_to_time(self, time, column):
Expand Down Expand Up @@ -394,8 +407,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
def _fetch_one_row(self):
row = {}

null_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
row["values"] = self._read_column_data(null_bitmap)
row["values"] = self._read_column_data(self.columns_present_bitmap)
return row

def _dump(self):
Expand Down Expand Up @@ -423,8 +435,7 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
def _fetch_one_row(self):
row = {}

null_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
row["values"] = self._read_column_data(null_bitmap)
row["values"] = self._read_column_data(self.columns_present_bitmap)
return row

def _dump(self):
Expand Down Expand Up @@ -459,12 +470,10 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)

def _fetch_one_row(self):
row = {}
null_bitmap = self.packet.read((self.number_of_columns + 7) / 8)

row["before_values"] = self._read_column_data(null_bitmap)
row["before_values"] = self._read_column_data(self.columns_present_bitmap)

null_bitmap = self.packet.read((self.number_of_columns + 7) / 8)
row["after_values"] = self._read_column_data(null_bitmap)
row["after_values"] = self._read_column_data(self.columns_present_bitmap2)
return row

def _dump(self):
Expand Down
92 changes: 92 additions & 0 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,98 @@ def test_update_row_event(self):
self.assertEqual(event.rows[0]["after_values"]["id"], 1)
self.assertEqual(event.rows[0]["after_values"]["data"], "World")

def test_minimal_image_write_row_event(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
query = "SET SESSION binlog_row_image = 'minimal'"
self.execute(query)
query = "INSERT INTO test (data) VALUES('Hello World')"
self.execute(query)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
#QueryEvent for the Create Table
self.assertIsInstance(self.stream.fetchone(), QueryEvent)
#QueryEvent for the BEGIN
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

self.assertIsInstance(self.stream.fetchone(), TableMapEvent)

event = self.stream.fetchone()
if self.isMySQL56AndMore():
self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V2)
else:
self.assertEqual(event.event_type, WRITE_ROWS_EVENT_V1)
self.assertIsInstance(event, WriteRowsEvent)
self.assertEqual(event.rows[0]["values"]["id"], 1)
self.assertEqual(event.rows[0]["values"]["data"], "Hello World")
self.assertEqual(event.schema, "pymysqlreplication_test")
self.assertEqual(event.table, "test")
self.assertEqual(event.columns[1].name, 'data')

def test_minimal_image_delete_row_event(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
query = "INSERT INTO test (data) VALUES('Hello World')"
self.execute(query)
query = "SET SESSION binlog_row_image = 'minimal'"
self.execute(query)
self.resetBinLog()

query = "DELETE FROM test WHERE id = 1"
self.execute(query)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)

#QueryEvent for the BEGIN
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

self.assertIsInstance(self.stream.fetchone(), TableMapEvent)

event = self.stream.fetchone()
if self.isMySQL56AndMore():
self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V2)
else:
self.assertEqual(event.event_type, DELETE_ROWS_EVENT_V1)
self.assertIsInstance(event, DeleteRowsEvent)
self.assertEqual(event.rows[0]["values"]["id"], 1)
self.assertEqual(event.rows[0]["values"]["data"], None)

def test_minimal_image_update_row_event(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
query = "INSERT INTO test (data) VALUES('Hello')"
self.execute(query)
query = "SET SESSION binlog_row_image = 'minimal'"
self.execute(query)
self.resetBinLog()

query = "UPDATE test SET data = 'World' WHERE id = 1"
self.execute(query)
self.execute("COMMIT")

self.assertIsInstance(self.stream.fetchone(), RotateEvent)
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)

#QueryEvent for the BEGIN
self.assertIsInstance(self.stream.fetchone(), QueryEvent)

self.assertIsInstance(self.stream.fetchone(), TableMapEvent)

event = self.stream.fetchone()
if self.isMySQL56AndMore():
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V2)
else:
self.assertEqual(event.event_type, UPDATE_ROWS_EVENT_V1)
self.assertIsInstance(event, UpdateRowsEvent)
self.assertEqual(event.rows[0]["before_values"]["id"], 1)
self.assertEqual(event.rows[0]["before_values"]["data"], None)
self.assertEqual(event.rows[0]["after_values"]["id"], None)
self.assertEqual(event.rows[0]["after_values"]["data"], "World")

def test_log_pos(self):
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
self.execute(query)
Expand Down