Skip to content

Decoupling table mapping to allow replaying all binlog events #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ _build

# Pyenv
.python-version

# Mac System files
.DS_Store

# Virtualenv folder
env/
52 changes: 48 additions & 4 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pymysql.util import int2byte

from .packet import BinLogPacketWrapper
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT, QUERY_EVENT
from .gtid import GtidSet
from .event import (
QueryEvent, RotateEvent, FormatDescriptionEvent,
Expand Down Expand Up @@ -133,7 +133,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
only_tables=None, only_schemas=None,
freeze_schema=False, skip_to_timestamp=None,
report_slave=None, slave_uuid=None,
pymysql_wrapper=None):
pymysql_wrapper=None, schema_db_connection_settings=None):
"""
Attributes:
resume_stream: Start for event from position or the latest event of
Expand All @@ -150,6 +150,10 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
skip_to_timestamp: Ignore all events until reaching specified timestamp.
report_slave: Report slave in SHOW SLAVE HOSTS.
slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
schema_db_connection_settings: dict. Connection settings of second
MySQL instance on which DDL statements shall be executed.
This enables the replay of those events that occured before the
streaming process has bee initialised.
"""
self.__connection_settings = connection_settings
self.__connection_settings["charset"] = "utf8"
Expand All @@ -159,6 +163,17 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
self.__resume_stream = resume_stream
self.__blocking = blocking

if schema_db_connection_settings is not None:
self._ctl_connection_settings = dict(
self.schema_db_connection_settings)
self.is_table_mapping_decoupled = True
mandatory_events = [QueryEvent, TableMapEvent, RotateEvent]
else:
self._ctl_connection_settings = dict(
self.__connection_settings)
self.is_table_mapping_decoupled = False
mandatory_events = [QueryEvent, TableMapEvent, RotateEvent]

self.__only_tables = only_tables
self.__only_schemas = only_schemas
self.__freeze_schema = freeze_schema
Expand All @@ -168,7 +183,7 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
self.__allowed_events_in_packet = frozenset(
[TableMapEvent, RotateEvent]).union(self.__allowed_events)
mandatory_events).union(self.__allowed_events)

self.__server_id = server_id
self.__use_checksum = False
Expand Down Expand Up @@ -201,7 +216,6 @@ def close(self):
self.__connected_ctl = False

def __connect_to_ctl(self):
self._ctl_connection_settings = dict(self.__connection_settings)
self._ctl_connection_settings["db"] = "information_schema"
self._ctl_connection_settings["cursorclass"] = DictCursor
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
Expand Down Expand Up @@ -393,6 +407,19 @@ def fetchone(self):
if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
continue

if self.is_table_mapping_decoupled is True and \
binlog_event.event_type == QUERY_EVENT:
binlog_event.event.parse_query()
if binlog_event.event.ddl_sql is not None:
binlog_event.event.execute_ddl_statement()
# If our DDL statement has altered a table,
# remove that table from the table_map.
if binlog_event.event.ddl_table is not None:
self.remove_altered_table_from_map(
binlog_event.event.ddl_schema,
binlog_event.event.ddl_table
)

if binlog_event.event_type == TABLE_MAP_EVENT and \
binlog_event.event is not None:
self.table_map[binlog_event.event.table_id] = \
Expand Down Expand Up @@ -476,5 +503,22 @@ def __get_table_information(self, schema, table):
else:
raise error

def remove_altered_table_from_map(self, schema, table):
"""
After schema-changing Query Events make sure that
the corresponding entry is removed from the table_map
dict. This way, we ensure that a table mapping will
be performed for this table before the next row event is
processed for it.
"""

ids_to_remove = []
for k, v in self.table_map.items():
if v.schema == schema and table == v.table:
ids_to_remove.append(k)

for key in ids_to_remove:
del self.table_map[key]

def __iter__(self):
return iter(self.fetchone, None)
119 changes: 112 additions & 7 deletions pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

import struct
import datetime
import sqlparse

from pymysql.util import byte2int, int2byte


class BinLogEvent(object):
def __init__(self, from_packet, event_size, table_map, ctl_connection,
only_tables = None,
only_schemas = None,
freeze_schema = False):
only_tables=None,
only_schemas=None,
freeze_schema=False):
self.packet = from_packet
self.table_map = table_map
self.event_type = self.packet.event_type
Expand Down Expand Up @@ -46,9 +47,10 @@ def _dump(self):
class GtidEvent(BinLogEvent):
"""GTID change in binlog event
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(GtidEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
ctl_connection, **kwargs)

self.commit_flag = byte2int(self.packet.read(1)) == 1
self.sid = self.packet.read(16)
Expand Down Expand Up @@ -79,6 +81,7 @@ class RotateEvent(BinLogEvent):
position: Position inside next binlog
next_binlog: Name of next binlog file
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(RotateEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
Expand Down Expand Up @@ -120,6 +123,7 @@ def _dump(self):
class QueryEvent(BinLogEvent):
'''This evenement is trigger when a query is run of the database.
Only replicated queries are logged.'''

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(QueryEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
Expand All @@ -138,7 +142,105 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)

self.query = self.packet.read(event_size - 13 - self.status_vars_length
- self.schema_length - 1).decode("utf-8")
#string[EOF] query
# string[EOF] query

self.ddl_sql = None
self.ddl_schema = None
self.ddl_table = None

self.__only_tables = kwargs["only_tables"]
self.__only_schemas = kwargs["only_schemas"]
self.__sys_schemas = ['mysql', 'information_schema',
'innodb', 'sys', 'performance_schema']

def parse_query(self):
"""Capture and parse schema changing queries.
"""

sql = self.query
parsed = sqlparse.parse(sql)[0]
sql_token_types = []

# A main statement is the starting DDL or DML command in the query.
main_statement_counter = 0

# Keywords can be many things (FROM, WHERE, TABLE, SCHEMA etc.),
# but we only care about the first one in a query,
# which for DDLs should be either 'database', 'schema' or 'table'
keyword_counter = 0
first_keyword = ''

# In the MySQL DDL syntax, the first entity identifier is the one
# which will be modified.
# slparse returns <schema>.<table> combinations in one Identifier
# instance. So we'll have to do some further parsing further down the
# line.
entity_counter = 0
entity_identifier = ''
chars_to_remove = str.maketrans({"'": None, '"': None, "`": None})

# Validate that we're dealing with a DDL statement.
for i, t in enumerate(parsed.tokens):
sql_token_types.append(t.ttype)
if main_statement_counter == 0 and \
t.ttype in [sqlparse.tokens.Keyword.DDL,
sqlparse.tokens.Keyword.DML]:
if t.ttype != sqlparse.tokens.Keyword.DDL:
return
main_statement_counter += 1
continue
if keyword_counter == 0 and \
t.ttype == sqlparse.tokens.Keyword:
if t.value.lower() not in ['database', 'schema', 'table']:
return
first_keyword = t.value.lower()
keyword_counter += 1
continue
if entity_counter == 0 and \
t.__class__.__name__ == 'Identifier':
# Remove all quotes from the entity idetifier.
entity_identifier = str(t.value).translate(chars_to_remove)
entity_counter += 1
break

# Query events are stored together with a 'schema' field in the
# binlogs,
# which holds the name of the schema that was the execution context of
# the query.
context_schema = self.schema.decode('utf-8').translate(
chars_to_remove)

if first_keyword in ['database', 'schema']:
sql_use = ''
self.ddl_schema = entity_identifier
elif first_keyword == 'table':
sql_use = "USE %s;" % context_schema
if '.' in entity_identifier:
self.ddl_schema, self.ddl_table = entity_identifier.split('.')
else:
self.ddl_table = entity_identifier
self.ddl_schema = context_schema

# Do we have a rule defined which tells us to ignore changes
# to this entity?
if self.ddl_schema in self.__sys_schemas:
return
if self.__only_tables is not None \
and self.ddl_table not in self.__only_tables:
return
if self.__only_schemas is not None \
and self.ddl_schema not in self.__only_schemas:
return

self.ddl_sql = sql_use + ' ' + sql

def execute_ddl_statement(self):

with self._ctl_connection.cursor() as cur:
cur.execute(self.ddl_sql)
self._ctl_connection.commit()

return

def _dump(self):
super(QueryEvent, self)._dump()
Expand All @@ -154,9 +256,10 @@ class BeginLoadQueryEvent(BinLogEvent):
file_id
block-data
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(BeginLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
ctl_connection, **kwargs)

# Payload
self.file_id = self.packet.read_uint32()
Expand All @@ -183,9 +286,10 @@ class ExecuteLoadQueryEvent(BinLogEvent):
end_pos
dup_handling_flags
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(ExecuteLoadQueryEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
ctl_connection, **kwargs)

# Post-header
self.slave_proxy_id = self.packet.read_uint32()
Expand Down Expand Up @@ -220,6 +324,7 @@ class IntvarEvent(BinLogEvent):
type
value
"""

def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
super(IntvarEvent, self).__init__(from_packet, event_size, table_map,
ctl_connection, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ def run(self):
"pymysqlreplication.tests"],
cmdclass={"test": TestCommand},
extras_require={'test': tests_require},
install_requires=['pymysql'],
install_requires=['pymysql', 'sqlparse'],
)