From f312fcb0a82aa87d4469235136f540c73af5348d Mon Sep 17 00:00:00 2001 From: Arthur Gautier Date: Sat, 28 Jan 2017 00:07:23 +0000 Subject: [PATCH 1/2] Implement Heartbeat I ended up debugging a slow startup with pymysqlreplication, the stream "stucked" for a few seconds before seeing first events coming in. For that purpose I ended up implementing HeartbeatLogEvent and documenting observed mysql behavior. I also implemented the heartbeat feature, which is required for the mysqld to filter events in gtid mode. Signed-off-by: Arthur Gautier --- pymysqlreplication/binlogstream.py | 31 +++++++++++++++++++++++++++--- pymysqlreplication/event.py | 31 ++++++++++++++++++++++++++++++ pymysqlreplication/packet.py | 1 + 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py index 5ae78dd4..2b5e0f0a 100644 --- a/pymysqlreplication/binlogstream.py +++ b/pymysqlreplication/binlogstream.py @@ -14,7 +14,7 @@ QueryEvent, RotateEvent, FormatDescriptionEvent, XidEvent, GtidEvent, StopEvent, BeginLoadQueryEvent, ExecuteLoadQueryEvent, - NotImplementedEvent) + HeartbeatLogEvent, NotImplementedEvent) from .row_event import ( UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) @@ -134,7 +134,8 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None, freeze_schema=False, skip_to_timestamp=None, report_slave=None, slave_uuid=None, pymysql_wrapper=None, - fail_on_table_metadata_unavailable=False): + fail_on_table_metadata_unavailable=False, + slave_heartbeat=None): """ Attributes: ctl_connection_settings: Connection settings for cluster holding schema information @@ -154,6 +155,11 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None, slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS. fail_on_table_metadata_unavailable: Should raise exception if we can't get table information on row_events + slave_heartbeat: (seconds) Should master actively send heartbeat on + connection. This also reduces traffic in GTID replication + on replication resumption (in case many event to skip in + binlog). See MASTER_HEARTBEAT_PERIOD in mysql documentation + for semantics """ self.__connection_settings = connection_settings @@ -192,6 +198,7 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None, if report_slave: self.report_slave = ReportSlave(report_slave) self.slave_uuid = slave_uuid + self.slave_heartbeat = slave_heartbeat if pymysql_wrapper: self.pymysql_wrapper = pymysql_wrapper @@ -268,6 +275,22 @@ def __connect_to_stream(self): cur.execute("set @slave_uuid= '%s'" % self.slave_uuid) cur.close() + if self.slave_heartbeat: + # 4294967 is documented as the max value for heartbeats + net_timeout = float(self.__connection_settings.get('read_timeout', + 4294967)) + # If heartbeat is too low, the connection will disconnect before, + # this is also the behavior in mysql + heartbeat = float(min(net_timeout/2., self.slave_heartbeat)) + if heartbeat > 4294967: + heartbeat = 4294967 + + # master_heartbeat_period is nanoseconds + heartbeat = int(heartbeat * 1000000000) + cur = self._stream_connection.cursor() + cur.execute("set @master_heartbeat_period= %d" % heartbeat) + cur.close() + self._register_slave() if not self.auto_position: @@ -450,7 +473,9 @@ def _allowed_event_list(self, only_events, ignored_events, WriteRowsEvent, DeleteRowsEvent, TableMapEvent, - NotImplementedEvent)) + HeartbeatLogEvent, + NotImplementedEvent, + )) if ignored_events is not None: for e in ignored_events: events.remove(e) diff --git a/pymysqlreplication/event.py b/pymysqlreplication/event.py index 2c708776..d39f312b 100644 --- a/pymysqlreplication/event.py +++ b/pymysqlreplication/event.py @@ -119,6 +119,37 @@ def _dump(self): print("Transaction ID: %d" % (self.xid)) +class HeartbeatLogEvent(BinLogEvent): + """A Heartbeat event + Heartbeats are sent by the master only if there are no unsent events in the + binary log file for a period longer than the interval defined by + MASTER_HEARTBEAT_PERIOD connection setting. + + A mysql server will also play those to the slave for each skipped + events in the log. I (baloo) believe the intention is to make the slave + bump its position so that if a disconnection occurs, the slave only + reconnects from the last skipped position (see Binlog_sender::send_events + in sql/rpl_binlog_sender.cc). That makes 106 bytes of data for skipped + event in the binlog. *this is also the case with GTID replication*. To + mitigate such behavior, you are expected to keep the binlog small (see + max_binlog_size, defaults to 1G). + In any case, the timestamp is 0 (as in 1970-01-01T00:00:00). + + Attributes: + ident: Name of the current binlog + """ + + def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs): + super(HeartbeatLogEvent, self).__init__(from_packet, event_size, + table_map, ctl_connection, + **kwargs) + self.ident = self.packet.read(event_size).decode() + + def _dump(self): + super(HeartbeatLogEvent, self)._dump() + print("Current binlog: %s" % (self.ident)) + + class QueryEvent(BinLogEvent): '''This evenement is trigger when a query is run of the database. Only replicated queries are logged.''' diff --git a/pymysqlreplication/packet.py b/pymysqlreplication/packet.py index ebb03d81..6e34713a 100644 --- a/pymysqlreplication/packet.py +++ b/pymysqlreplication/packet.py @@ -70,6 +70,7 @@ class BinLogPacketWrapper(object): constants.STOP_EVENT: event.StopEvent, constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent, constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent, + constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent, # row_event constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent, constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent, From f12a190f5b53784c1256ae7101078e7f5ccaf870 Mon Sep 17 00:00:00 2001 From: Arthur Gautier Date: Tue, 31 Jan 2017 20:23:15 +0000 Subject: [PATCH 2/2] Fixup tests Signed-off-by: Arthur Gautier --- pymysqlreplication/tests/test_basic.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pymysqlreplication/tests/test_basic.py b/pymysqlreplication/tests/test_basic.py index bf520f74..3f6d43cf 100644 --- a/pymysqlreplication/tests/test_basic.py +++ b/pymysqlreplication/tests/test_basic.py @@ -25,9 +25,9 @@ def ignoredEvents(self): return [GtidEvent] def test_allowed_event_list(self): - self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 13) - self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 12) - self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 12) + self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 14) + self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 13) + self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 13) self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1) def test_read_query_event(self): @@ -714,7 +714,8 @@ def test_read_query_event(self): self.stream.close() self.stream = BinLogStreamReader( - self.database, server_id=1024, blocking=True, auto_position=gtid) + self.database, server_id=1024, blocking=True, auto_position=gtid, + ignored_events=[HeartbeatLogEvent]) self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent) @@ -770,7 +771,8 @@ def test_position_gtid(self): self.stream.close() self.stream = BinLogStreamReader( - self.database, server_id=1024, blocking=True, auto_position=gtid) + self.database, server_id=1024, blocking=True, auto_position=gtid, + ignored_events=[HeartbeatLogEvent]) self.assertIsInstance(self.stream.fetchone(), RotateEvent) self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)