Skip to content

Allow users to define ctl_connection_settings and the option to fail when table information is unavailable #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 22, 2016
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ cache:
- $HOME/bins
before_script:
- env | grep DB
- bash -c "if [ '$DB' = 'mysql57' ]; then sudo ./scripts/install_mysql_sandbox.sh; ./scripts/install_mysql57_on_sandbox.sh; fi"
- bash -c "if [ '$DB' = 'mysql57' ]; then sudo ./scripts/install_mysql57.sh; fi"
- bash -c "if [ '$DB' = 'mysql56' ]; then sudo ./scripts/install_mysql56.sh; fi"
script:
- "nosetests"
8 changes: 7 additions & 1 deletion docs/developement.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ To run tests:

python setup.py test

Running mysql in docker:
Running mysql in docker (main):

::

docker run --name python-mysql-replication-tests -e MYSQL_ALLOW_EMPTY_PASSWORD=true -p 3306:3306 --rm percona:latest --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates

Running mysql in docker (for ctl server):

::

docker run --name python-mysql-replication-tests-ctl --expose=3307 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -p 3307:3307 --rm percona:latest --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave-updates -P 3307


Each pull request is tested on Travis CI:
https://travis-ci.org/noplay/python-mysql-replication
Expand Down
21 changes: 16 additions & 5 deletions pymysqlreplication/binlogstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,18 @@ class BinLogStreamReader(object):
"""
report_slave = None

def __init__(self, connection_settings, server_id, resume_stream=False,
def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False,
blocking=False, only_events=None, log_file=None, log_pos=None,
filter_non_implemented_events=True,
ignored_events=None, auto_position=None,
only_tables=None, only_schemas=None,
freeze_schema=False, skip_to_timestamp=None,
report_slave=None, slave_uuid=None,
pymysql_wrapper=None):
pymysql_wrapper=None,
fail_on_table_metadata_unavailable=False):
"""
Attributes:
ctl_connection_settings: Connection settings for cluster holding schema information
resume_stream: Start for event from position or the latest event of
binlog or from older available event
blocking: Read on stream is blocking
Expand All @@ -150,20 +152,27 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
skip_to_timestamp: Ignore all events until reaching specified timestamp.
report_slave: Report slave in SHOW SLAVE HOSTS.
slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
fail_on_table_metadata_unavailable: Should raise exception if we can't get
table information on row_events
"""

self.__connection_settings = connection_settings
self.__connection_settings["charset"] = "utf8"
self.__connection_settings.setdefault("charset", "utf8")

self.__connected_stream = False
self.__connected_ctl = False
self.__resume_stream = resume_stream
self.__blocking = blocking
self._ctl_connection_settings = ctl_connection_settings
if ctl_connection_settings:
self._ctl_connection_settings.setdefault("charset", "utf8")

self.__only_tables = only_tables
self.__only_schemas = only_schemas
self.__freeze_schema = freeze_schema
self.__allowed_events = self._allowed_event_list(
only_events, ignored_events, filter_non_implemented_events)
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable

# We can't filter on packet level TABLE_MAP and rotate event because
# we need them for handling other operations
Expand Down Expand Up @@ -201,7 +210,8 @@ def close(self):
self.__connected_ctl = False

def __connect_to_ctl(self):
self._ctl_connection_settings = dict(self.__connection_settings)
if not self._ctl_connection_settings:
self._ctl_connection_settings = dict(self.__connection_settings)
self._ctl_connection_settings["db"] = "information_schema"
self._ctl_connection_settings["cursorclass"] = DictCursor
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
Expand Down Expand Up @@ -388,7 +398,8 @@ def fetchone(self):
self.__allowed_events_in_packet,
self.__only_tables,
self.__only_schemas,
self.__freeze_schema)
self.__freeze_schema,
self.__fail_on_table_metadata_unavailable)

if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
continue
Expand Down
4 changes: 3 additions & 1 deletion pymysqlreplication/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ class BinLogEvent(object):
def __init__(self, from_packet, event_size, table_map, ctl_connection,
only_tables = None,
only_schemas = None,
freeze_schema = False):
freeze_schema = False,
fail_on_table_metadata_unavailable = False):
self.packet = from_packet
self.table_map = table_map
self.event_type = self.packet.event_type
self.timestamp = self.packet.timestamp
self.event_size = event_size
self._ctl_connection = ctl_connection
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
# The event have been fully processed, if processed is false
# the event will be skipped
self._processed = True
Expand Down
3 changes: 3 additions & 0 deletions pymysqlreplication/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class TableMetadataUnavailableError(Exception):
def __init__(self, table):
Exception.__init__(self,"Unable to find metadata for table {0}".format(table))
6 changes: 4 additions & 2 deletions pymysqlreplication/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
allowed_events,
only_tables,
only_schemas,
freeze_schema):
freeze_schema,
fail_on_table_metadata_unavailable):
# -1 because we ignore the ok byte
self.read_bytes = 0
# Used when we want to override a value in the data buffer
Expand Down Expand Up @@ -95,7 +96,8 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
ctl_connection,
only_tables = only_tables,
only_schemas = only_schemas,
freeze_schema = freeze_schema)
freeze_schema = freeze_schema,
fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable)
if self.event._processed == False:
self.event = None

Expand Down
5 changes: 4 additions & 1 deletion pymysqlreplication/row_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pymysql.charset import charset_to_encoding

from .event import BinLogEvent
from .exceptions import TableMetadataUnavailableError
from .constants import FIELD_TYPE
from .constants import BINLOG
from .column import Column
Expand Down Expand Up @@ -57,6 +58,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)

if len(self.columns) == 0: # could not read the table metadata, probably already dropped
self.complete = False
if self._fail_on_table_metadata_unavailable:
raise TableMetadataUnavailableError(self.table)

def __is_null(self, null_bitmap, position):
bit = null_bitmap[int(position / 8)]
Expand Down Expand Up @@ -251,7 +254,7 @@ def __read_time2(self, column):

sign = 1 if self.__read_binary_slice(data, 0, 1, 24) else -1
if sign == -1:
# negative integers are stored as 2's compliment
# negative integers are stored as 2's compliment
# hence take 2's compliment again to get the right value.
data = ~data + 1

Expand Down
36 changes: 12 additions & 24 deletions pymysqlreplication/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,18 @@ def ignoredEvents(self):
def setUp(self):

db = os.environ.get('DB')
if db == 'mysql57':
# mysql 5.7 sandbox running on travis
self.database = {
"host": "localhost",
"user": "root",
"passwd": "msandbox",
"port": 5714,
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}
else:
# default
self.database = {
"host": "localhost",
"user": "root",
"passwd": "",
"port": 3306,
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}
if os.getenv("TRAVIS") is not None:
self.database["user"] = "travis"
# default
self.database = {
"host": "localhost",
"user": "root",
"passwd": "",
"port": 3306,
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}
if os.getenv("TRAVIS") is not None and db == "mysql56":
self.database["user"] = "travis"

self.conn_control = None
db = copy.copy(self.database)
Expand Down
85 changes: 83 additions & 2 deletions pymysqlreplication/tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
import pymysql
import copy
import time
import sys
import io
Expand All @@ -11,10 +13,11 @@
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.gtid import GtidSet
from pymysqlreplication.event import *
from pymysqlreplication.exceptions import TableMetadataUnavailableError
from pymysqlreplication.constants.BINLOG import *
from pymysqlreplication.row_event import *

__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestGtidBinLogStreamReader"]
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"]


class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
Expand Down Expand Up @@ -569,6 +572,29 @@ def test_drop_table(self):

self.assertEqual([], event.rows)

def test_drop_table_tablemetadata_unavailable(self):
self.stream.close()
self.execute("CREATE TABLE test (id INTEGER(11))")
self.execute("INSERT INTO test VALUES (1)")
self.execute("DROP TABLE test")
self.execute("COMMIT")

self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=(WriteRowsEvent,),
fail_on_table_metadata_unavailable=True
)
had_error = False
try:
event = self.stream.fetchone()
except TableMetadataUnavailableError as e:
had_error = True
assert "test" in e.args[0]
finally:
self.resetBinLog()
assert had_error

def test_drop_column(self):
self.stream.close()
self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))")
Expand All @@ -581,7 +607,7 @@ def test_drop_column(self):
self.stream = BinLogStreamReader(
self.database,
server_id=1024,
only_events=(WriteRowsEvent,),
only_events=(WriteRowsEvent,)
)
try:
self.stream.fetchone() # insert with two values
Expand Down Expand Up @@ -618,6 +644,61 @@ def test_alter_column(self):
self.assertEqual(event.rows[0]["values"]["data"], 'A value')
self.stream.fetchone() # insert with three values

class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase):

def setUp(self):
super(TestCTLConnectionSettings, self).setUp()
self.stream.close()
ctl_db = copy.copy(self.database)
ctl_db["db"] = None
ctl_db["port"] = 3307
self.ctl_conn_control = pymysql.connect(**ctl_db)
self.ctl_conn_control.cursor().execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
self.ctl_conn_control.cursor().execute("CREATE DATABASE pymysqlreplication_test")
self.ctl_conn_control.close()
ctl_db["db"] = "pymysqlreplication_test"
self.ctl_conn_control = pymysql.connect(**ctl_db)
self.stream = BinLogStreamReader(
self.database,
ctl_connection_settings=ctl_db,
server_id=1024,
only_events=(WriteRowsEvent,),
fail_on_table_metadata_unavailable=True
)

def tearDown(self):
super(TestCTLConnectionSettings, self).tearDown()
self.ctl_conn_control.close()

def test_seperate_ctl_settings_table_metadata_unavailable(self):
self.execute("CREATE TABLE test (id INTEGER(11))")
self.execute("INSERT INTO test VALUES (1)")
self.execute("COMMIT")

had_error = False
try:
event = self.stream.fetchone()
except TableMetadataUnavailableError as e:
had_error = True
assert "test" in e.args[0]
finally:
self.resetBinLog()
assert had_error

def test_seperate_ctl_settings_no_error(self):
self.execute("CREATE TABLE test (id INTEGER(11))")
self.execute("INSERT INTO test VALUES (1)")
self.execute("DROP TABLE test")
self.execute("COMMIT")
self.ctl_conn_control.cursor().execute("CREATE TABLE test (id INTEGER(11))")
self.ctl_conn_control.cursor().execute("INSERT INTO test VALUES (1)")
self.ctl_conn_control.cursor().execute("COMMIT")
try:
self.stream.fetchone()
except Exception as e:
self.fail("raised unexpected exception: {exception}".format(exception=e))
finally:
self.resetBinLog()

class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase):
def setUp(self):
Expand Down
13 changes: 12 additions & 1 deletion scripts/install_mysql56.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ env DEBIAN_FRONTEND=noninteractive apt-get install -o Dpkg::Options::='--force-c

# Cleanup old mysql datas
rm -rf /var/ramfs/mysql/
rm -rf /var/ramfs/mysql-ctl/
mkdir /var/ramfs/mysql/
chown mysql: /var/ramfs/mysql/
mkdir /var/ramfs/mysql-ctl/
chown mysql /var/ramfs/mysql/
chown mysql /var/ramfs/mysql-ctl/

# Config
echo '[mysqld]' | tee /etc/mysql/conf.d/replication.cnf
Expand All @@ -31,6 +34,8 @@ echo 'binlog-format = row' | tee -a /etc/mysql/conf.d/replication.cnf
/etc/init.d/mysql stop || true

# Install new datas
mysql_install_db --defaults-file=/etc/mysql/my.cnf --basedir=/usr --datadir=/var/ramfs/mysql-ctl --verbose

mysql_install_db --defaults-file=/etc/mysql/my.cnf --basedir=/usr --datadir=/var/ramfs/mysql --verbose

# Enable GTID
Expand All @@ -43,9 +48,15 @@ echo 'log_slave_updates' | tee -a /etc/mysql/conf.d/gtid.cnf
# Start mysql (avoid errors to have logs)
/etc/init.d/mysql start || true
tail -1000 /var/log/syslog
nohup mysqld --log-bin=mysql-ctl-bin.log --server-id 2 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave-updates -P 3307 --socket=/var/run/mysqld/mysqld-ctl.sock --datadir=/var/ramfs/mysql-ctl --pid-file=/var/lib/mysql/mysql-ctl.pid &
#Give it time to start
sleep 5
tail -1000 nohup.out

mysql --version
mysql -e 'SELECT VERSION();'
mysql -u root -e "GRANT ALL PRIVILEGES ON *.* TO ''@'localhost';"
mysql -u root --port=3307 --socket=/var/run/mysqld/mysqld-ctl.sock -e "GRANT ALL PRIVILEGES ON *.* TO ''@'localhost';"

mysql -e 'CREATE DATABASE pymysqlreplication_test;'
mysql -u root --port=3307 --socket=/var/run/mysqld/mysqld-ctl.sock -e "CREATE DATABASE pymysqlreplication_test;"
Loading