Skip to content

Commit f6c5166

Browse files
kdparkerbaloo
authored andcommitted
Allow users to define ctl_connection_settings and the option to fail when table information is unavailable (#176)
* Add optional ctl_connection_settings to give separate cluster to check changes in a given cluster rather than default, force failure on failure to get TableMetadata * Made failing table metadata unavailable error an optional parameter of BinLogStreamReader, added charset for ctl_connection_settings * Fill out TableMetadataUnavailableError to receive a table instead of it being a generic exception * Add test for metadata unavailable failure, fix metadata unavailable exception builder * Added tests for ctl_connection_settings, fixed travis for new mysql servers necessary for ctl_connection_settings test (including removing usage of mysql sandbox in mysql57 tests to bring it in line with what's going on in mysql56 tests)
1 parent 1b0fc42 commit f6c5166

13 files changed

+208
-87
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ cache:
1919
- $HOME/bins
2020
before_script:
2121
- env | grep DB
22-
- bash -c "if [ '$DB' = 'mysql57' ]; then sudo ./scripts/install_mysql_sandbox.sh; ./scripts/install_mysql57_on_sandbox.sh; fi"
22+
- bash -c "if [ '$DB' = 'mysql57' ]; then sudo ./scripts/install_mysql57.sh; fi"
2323
- bash -c "if [ '$DB' = 'mysql56' ]; then sudo ./scripts/install_mysql56.sh; fi"
2424
script:
2525
- "nosetests"

docs/developement.rst

+7-1
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,18 @@ To run tests:
4343

4444
python setup.py test
4545

46-
Running mysql in docker:
46+
Running mysql in docker (main):
4747

4848
::
4949

5050
docker run --name python-mysql-replication-tests -e MYSQL_ALLOW_EMPTY_PASSWORD=true -p 3306:3306 --rm percona:latest --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates
5151

52+
Running mysql in docker (for ctl server):
53+
54+
::
55+
56+
docker run --name python-mysql-replication-tests-ctl --expose=3307 -e MYSQL_ALLOW_EMPTY_PASSWORD=true -p 3307:3307 --rm percona:latest --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave-updates -P 3307
57+
5258

5359
Each pull request is tested on Travis CI:
5460
https://travis-ci.org/noplay/python-mysql-replication

pymysqlreplication/binlogstream.py

+16-5
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,18 @@ class BinLogStreamReader(object):
126126
"""
127127
report_slave = None
128128

129-
def __init__(self, connection_settings, server_id, resume_stream=False,
129+
def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False,
130130
blocking=False, only_events=None, log_file=None, log_pos=None,
131131
filter_non_implemented_events=True,
132132
ignored_events=None, auto_position=None,
133133
only_tables=None, only_schemas=None,
134134
freeze_schema=False, skip_to_timestamp=None,
135135
report_slave=None, slave_uuid=None,
136-
pymysql_wrapper=None):
136+
pymysql_wrapper=None,
137+
fail_on_table_metadata_unavailable=False):
137138
"""
138139
Attributes:
140+
ctl_connection_settings: Connection settings for cluster holding schema information
139141
resume_stream: Start for event from position or the latest event of
140142
binlog or from older available event
141143
blocking: Read on stream is blocking
@@ -150,20 +152,27 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
150152
skip_to_timestamp: Ignore all events until reaching specified timestamp.
151153
report_slave: Report slave in SHOW SLAVE HOSTS.
152154
slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
155+
fail_on_table_metadata_unavailable: Should raise exception if we can't get
156+
table information on row_events
153157
"""
158+
154159
self.__connection_settings = connection_settings
155-
self.__connection_settings["charset"] = "utf8"
160+
self.__connection_settings.setdefault("charset", "utf8")
156161

157162
self.__connected_stream = False
158163
self.__connected_ctl = False
159164
self.__resume_stream = resume_stream
160165
self.__blocking = blocking
166+
self._ctl_connection_settings = ctl_connection_settings
167+
if ctl_connection_settings:
168+
self._ctl_connection_settings.setdefault("charset", "utf8")
161169

162170
self.__only_tables = only_tables
163171
self.__only_schemas = only_schemas
164172
self.__freeze_schema = freeze_schema
165173
self.__allowed_events = self._allowed_event_list(
166174
only_events, ignored_events, filter_non_implemented_events)
175+
self.__fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
167176

168177
# We can't filter on packet level TABLE_MAP and rotate event because
169178
# we need them for handling other operations
@@ -201,7 +210,8 @@ def close(self):
201210
self.__connected_ctl = False
202211

203212
def __connect_to_ctl(self):
204-
self._ctl_connection_settings = dict(self.__connection_settings)
213+
if not self._ctl_connection_settings:
214+
self._ctl_connection_settings = dict(self.__connection_settings)
205215
self._ctl_connection_settings["db"] = "information_schema"
206216
self._ctl_connection_settings["cursorclass"] = DictCursor
207217
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
@@ -388,7 +398,8 @@ def fetchone(self):
388398
self.__allowed_events_in_packet,
389399
self.__only_tables,
390400
self.__only_schemas,
391-
self.__freeze_schema)
401+
self.__freeze_schema,
402+
self.__fail_on_table_metadata_unavailable)
392403

393404
if self.skip_to_timestamp and binlog_event.timestamp < self.skip_to_timestamp:
394405
continue

pymysqlreplication/event.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ class BinLogEvent(object):
1010
def __init__(self, from_packet, event_size, table_map, ctl_connection,
1111
only_tables = None,
1212
only_schemas = None,
13-
freeze_schema = False):
13+
freeze_schema = False,
14+
fail_on_table_metadata_unavailable = False):
1415
self.packet = from_packet
1516
self.table_map = table_map
1617
self.event_type = self.packet.event_type
1718
self.timestamp = self.packet.timestamp
1819
self.event_size = event_size
1920
self._ctl_connection = ctl_connection
21+
self._fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable
2022
# The event have been fully processed, if processed is false
2123
# the event will be skipped
2224
self._processed = True

pymysqlreplication/exceptions.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class TableMetadataUnavailableError(Exception):
2+
def __init__(self, table):
3+
Exception.__init__(self,"Unable to find metadata for table {0}".format(table))

pymysqlreplication/packet.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
5454
allowed_events,
5555
only_tables,
5656
only_schemas,
57-
freeze_schema):
57+
freeze_schema,
58+
fail_on_table_metadata_unavailable):
5859
# -1 because we ignore the ok byte
5960
self.read_bytes = 0
6061
# Used when we want to override a value in the data buffer
@@ -95,7 +96,8 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
9596
ctl_connection,
9697
only_tables = only_tables,
9798
only_schemas = only_schemas,
98-
freeze_schema = freeze_schema)
99+
freeze_schema = freeze_schema,
100+
fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable)
99101
if self.event._processed == False:
100102
self.event = None
101103

pymysqlreplication/row_event.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pymysql.charset import charset_to_encoding
99

1010
from .event import BinLogEvent
11+
from .exceptions import TableMetadataUnavailableError
1112
from .constants import FIELD_TYPE
1213
from .constants import BINLOG
1314
from .column import Column
@@ -57,6 +58,8 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
5758

5859
if len(self.columns) == 0: # could not read the table metadata, probably already dropped
5960
self.complete = False
61+
if self._fail_on_table_metadata_unavailable:
62+
raise TableMetadataUnavailableError(self.table)
6063

6164
def __is_null(self, null_bitmap, position):
6265
bit = null_bitmap[int(position / 8)]
@@ -251,7 +254,7 @@ def __read_time2(self, column):
251254

252255
sign = 1 if self.__read_binary_slice(data, 0, 1, 24) else -1
253256
if sign == -1:
254-
# negative integers are stored as 2's compliment
257+
# negative integers are stored as 2's compliment
255258
# hence take 2's compliment again to get the right value.
256259
data = ~data + 1
257260

pymysqlreplication/tests/base.py

+12-24
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,18 @@ def ignoredEvents(self):
2121
def setUp(self):
2222

2323
db = os.environ.get('DB')
24-
if db == 'mysql57':
25-
# mysql 5.7 sandbox running on travis
26-
self.database = {
27-
"host": "localhost",
28-
"user": "root",
29-
"passwd": "msandbox",
30-
"port": 5714,
31-
"use_unicode": True,
32-
"charset": "utf8",
33-
"db": "pymysqlreplication_test"
34-
}
35-
else:
36-
# default
37-
self.database = {
38-
"host": "localhost",
39-
"user": "root",
40-
"passwd": "",
41-
"port": 3306,
42-
"use_unicode": True,
43-
"charset": "utf8",
44-
"db": "pymysqlreplication_test"
45-
}
46-
if os.getenv("TRAVIS") is not None:
47-
self.database["user"] = "travis"
24+
# default
25+
self.database = {
26+
"host": "localhost",
27+
"user": "root",
28+
"passwd": "",
29+
"port": 3306,
30+
"use_unicode": True,
31+
"charset": "utf8",
32+
"db": "pymysqlreplication_test"
33+
}
34+
if os.getenv("TRAVIS") is not None and db == "mysql56":
35+
self.database["user"] = "travis"
4836

4937
self.conn_control = None
5038
db = copy.copy(self.database)

pymysqlreplication/tests/test_basic.py

+83-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# -*- coding: utf-8 -*-
2+
import pymysql
3+
import copy
24
import time
35
import sys
46
import io
@@ -11,10 +13,11 @@
1113
from pymysqlreplication import BinLogStreamReader
1214
from pymysqlreplication.gtid import GtidSet
1315
from pymysqlreplication.event import *
16+
from pymysqlreplication.exceptions import TableMetadataUnavailableError
1417
from pymysqlreplication.constants.BINLOG import *
1518
from pymysqlreplication.row_event import *
1619

17-
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestGtidBinLogStreamReader"]
20+
__all__ = ["TestBasicBinLogStreamReader", "TestMultipleRowBinLogStreamReader", "TestCTLConnectionSettings", "TestGtidBinLogStreamReader"]
1821

1922

2023
class TestBasicBinLogStreamReader(base.PyMySQLReplicationTestCase):
@@ -569,6 +572,29 @@ def test_drop_table(self):
569572

570573
self.assertEqual([], event.rows)
571574

575+
def test_drop_table_tablemetadata_unavailable(self):
576+
self.stream.close()
577+
self.execute("CREATE TABLE test (id INTEGER(11))")
578+
self.execute("INSERT INTO test VALUES (1)")
579+
self.execute("DROP TABLE test")
580+
self.execute("COMMIT")
581+
582+
self.stream = BinLogStreamReader(
583+
self.database,
584+
server_id=1024,
585+
only_events=(WriteRowsEvent,),
586+
fail_on_table_metadata_unavailable=True
587+
)
588+
had_error = False
589+
try:
590+
event = self.stream.fetchone()
591+
except TableMetadataUnavailableError as e:
592+
had_error = True
593+
assert "test" in e.args[0]
594+
finally:
595+
self.resetBinLog()
596+
assert had_error
597+
572598
def test_drop_column(self):
573599
self.stream.close()
574600
self.execute("CREATE TABLE test_drop_column (id INTEGER(11), data VARCHAR(50))")
@@ -581,7 +607,7 @@ def test_drop_column(self):
581607
self.stream = BinLogStreamReader(
582608
self.database,
583609
server_id=1024,
584-
only_events=(WriteRowsEvent,),
610+
only_events=(WriteRowsEvent,)
585611
)
586612
try:
587613
self.stream.fetchone() # insert with two values
@@ -618,6 +644,61 @@ def test_alter_column(self):
618644
self.assertEqual(event.rows[0]["values"]["data"], 'A value')
619645
self.stream.fetchone() # insert with three values
620646

647+
class TestCTLConnectionSettings(base.PyMySQLReplicationTestCase):
648+
649+
def setUp(self):
650+
super(TestCTLConnectionSettings, self).setUp()
651+
self.stream.close()
652+
ctl_db = copy.copy(self.database)
653+
ctl_db["db"] = None
654+
ctl_db["port"] = 3307
655+
self.ctl_conn_control = pymysql.connect(**ctl_db)
656+
self.ctl_conn_control.cursor().execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
657+
self.ctl_conn_control.cursor().execute("CREATE DATABASE pymysqlreplication_test")
658+
self.ctl_conn_control.close()
659+
ctl_db["db"] = "pymysqlreplication_test"
660+
self.ctl_conn_control = pymysql.connect(**ctl_db)
661+
self.stream = BinLogStreamReader(
662+
self.database,
663+
ctl_connection_settings=ctl_db,
664+
server_id=1024,
665+
only_events=(WriteRowsEvent,),
666+
fail_on_table_metadata_unavailable=True
667+
)
668+
669+
def tearDown(self):
670+
super(TestCTLConnectionSettings, self).tearDown()
671+
self.ctl_conn_control.close()
672+
673+
def test_seperate_ctl_settings_table_metadata_unavailable(self):
674+
self.execute("CREATE TABLE test (id INTEGER(11))")
675+
self.execute("INSERT INTO test VALUES (1)")
676+
self.execute("COMMIT")
677+
678+
had_error = False
679+
try:
680+
event = self.stream.fetchone()
681+
except TableMetadataUnavailableError as e:
682+
had_error = True
683+
assert "test" in e.args[0]
684+
finally:
685+
self.resetBinLog()
686+
assert had_error
687+
688+
def test_seperate_ctl_settings_no_error(self):
689+
self.execute("CREATE TABLE test (id INTEGER(11))")
690+
self.execute("INSERT INTO test VALUES (1)")
691+
self.execute("DROP TABLE test")
692+
self.execute("COMMIT")
693+
self.ctl_conn_control.cursor().execute("CREATE TABLE test (id INTEGER(11))")
694+
self.ctl_conn_control.cursor().execute("INSERT INTO test VALUES (1)")
695+
self.ctl_conn_control.cursor().execute("COMMIT")
696+
try:
697+
self.stream.fetchone()
698+
except Exception as e:
699+
self.fail("raised unexpected exception: {exception}".format(exception=e))
700+
finally:
701+
self.resetBinLog()
621702

622703
class TestGtidBinLogStreamReader(base.PyMySQLReplicationTestCase):
623704
def setUp(self):

scripts/install_mysql56.sh

+12-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@ env DEBIAN_FRONTEND=noninteractive apt-get install -o Dpkg::Options::='--force-c
1919

2020
# Cleanup old mysql datas
2121
rm -rf /var/ramfs/mysql/
22+
rm -rf /var/ramfs/mysql-ctl/
2223
mkdir /var/ramfs/mysql/
23-
chown mysql: /var/ramfs/mysql/
24+
mkdir /var/ramfs/mysql-ctl/
25+
chown mysql /var/ramfs/mysql/
26+
chown mysql /var/ramfs/mysql-ctl/
2427

2528
# Config
2629
echo '[mysqld]' | tee /etc/mysql/conf.d/replication.cnf
@@ -31,6 +34,8 @@ echo 'binlog-format = row' | tee -a /etc/mysql/conf.d/replication.cnf
3134
/etc/init.d/mysql stop || true
3235

3336
# Install new datas
37+
mysql_install_db --defaults-file=/etc/mysql/my.cnf --basedir=/usr --datadir=/var/ramfs/mysql-ctl --verbose
38+
3439
mysql_install_db --defaults-file=/etc/mysql/my.cnf --basedir=/usr --datadir=/var/ramfs/mysql --verbose
3540

3641
# Enable GTID
@@ -43,9 +48,15 @@ echo 'log_slave_updates' | tee -a /etc/mysql/conf.d/gtid.cnf
4348
# Start mysql (avoid errors to have logs)
4449
/etc/init.d/mysql start || true
4550
tail -1000 /var/log/syslog
51+
nohup mysqld --log-bin=mysql-ctl-bin.log --server-id 2 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave-updates -P 3307 --socket=/var/run/mysqld/mysqld-ctl.sock --datadir=/var/ramfs/mysql-ctl --pid-file=/var/lib/mysql/mysql-ctl.pid &
52+
#Give it time to start
53+
sleep 5
54+
tail -1000 nohup.out
4655

4756
mysql --version
4857
mysql -e 'SELECT VERSION();'
4958
mysql -u root -e "GRANT ALL PRIVILEGES ON *.* TO ''@'localhost';"
59+
mysql -u root --port=3307 --socket=/var/run/mysqld/mysqld-ctl.sock -e "GRANT ALL PRIVILEGES ON *.* TO ''@'localhost';"
5060

5161
mysql -e 'CREATE DATABASE pymysqlreplication_test;'
62+
mysql -u root --port=3307 --socket=/var/run/mysqld/mysqld-ctl.sock -e "CREATE DATABASE pymysqlreplication_test;"

0 commit comments

Comments
 (0)