Skip to content

Commit 7e5959a

Browse files
authored
Implement Heartbeat (julien-duponchelle#191)
* Implement Heartbeat I ended up debugging a slow startup with pymysqlreplication, the stream "stucked" for a few seconds before seeing first events coming in. For that purpose I ended up implementing HeartbeatLogEvent and documenting observed mysql behavior. I also implemented the heartbeat feature, which is required for the mysqld to filter events in gtid mode. Signed-off-by: Arthur Gautier <[email protected]> * Fixup tests Signed-off-by: Arthur Gautier <[email protected]>
1 parent 32e4e1a commit 7e5959a

File tree

4 files changed

+67
-8
lines changed

4 files changed

+67
-8
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
QueryEvent, RotateEvent, FormatDescriptionEvent,
1515
XidEvent, GtidEvent, StopEvent,
1616
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
17-
NotImplementedEvent)
17+
HeartbeatLogEvent, NotImplementedEvent)
1818
from .row_event import (
1919
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
2020

@@ -134,7 +134,8 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
134134
freeze_schema=False, skip_to_timestamp=None,
135135
report_slave=None, slave_uuid=None,
136136
pymysql_wrapper=None,
137-
fail_on_table_metadata_unavailable=False):
137+
fail_on_table_metadata_unavailable=False,
138+
slave_heartbeat=None):
138139
"""
139140
Attributes:
140141
ctl_connection_settings: Connection settings for cluster holding schema information
@@ -154,6 +155,11 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
154155
slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
155156
fail_on_table_metadata_unavailable: Should raise exception if we can't get
156157
table information on row_events
158+
slave_heartbeat: (seconds) Should master actively send heartbeat on
159+
connection. This also reduces traffic in GTID replication
160+
on replication resumption (in case many event to skip in
161+
binlog). See MASTER_HEARTBEAT_PERIOD in mysql documentation
162+
for semantics
157163
"""
158164

159165
self.__connection_settings = connection_settings
@@ -192,6 +198,7 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
192198
if report_slave:
193199
self.report_slave = ReportSlave(report_slave)
194200
self.slave_uuid = slave_uuid
201+
self.slave_heartbeat = slave_heartbeat
195202

196203
if pymysql_wrapper:
197204
self.pymysql_wrapper = pymysql_wrapper
@@ -268,6 +275,22 @@ def __connect_to_stream(self):
268275
cur.execute("set @slave_uuid= '%s'" % self.slave_uuid)
269276
cur.close()
270277

278+
if self.slave_heartbeat:
279+
# 4294967 is documented as the max value for heartbeats
280+
net_timeout = float(self.__connection_settings.get('read_timeout',
281+
4294967))
282+
# If heartbeat is too low, the connection will disconnect before,
283+
# this is also the behavior in mysql
284+
heartbeat = float(min(net_timeout/2., self.slave_heartbeat))
285+
if heartbeat > 4294967:
286+
heartbeat = 4294967
287+
288+
# master_heartbeat_period is nanoseconds
289+
heartbeat = int(heartbeat * 1000000000)
290+
cur = self._stream_connection.cursor()
291+
cur.execute("set @master_heartbeat_period= %d" % heartbeat)
292+
cur.close()
293+
271294
self._register_slave()
272295

273296
if not self.auto_position:
@@ -450,7 +473,9 @@ def _allowed_event_list(self, only_events, ignored_events,
450473
WriteRowsEvent,
451474
DeleteRowsEvent,
452475
TableMapEvent,
453-
NotImplementedEvent))
476+
HeartbeatLogEvent,
477+
NotImplementedEvent,
478+
))
454479
if ignored_events is not None:
455480
for e in ignored_events:
456481
events.remove(e)

pymysqlreplication/event.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,37 @@ def _dump(self):
119119
print("Transaction ID: %d" % (self.xid))
120120

121121

122+
class HeartbeatLogEvent(BinLogEvent):
123+
"""A Heartbeat event
124+
Heartbeats are sent by the master only if there are no unsent events in the
125+
binary log file for a period longer than the interval defined by
126+
MASTER_HEARTBEAT_PERIOD connection setting.
127+
128+
A mysql server will also play those to the slave for each skipped
129+
events in the log. I (baloo) believe the intention is to make the slave
130+
bump its position so that if a disconnection occurs, the slave only
131+
reconnects from the last skipped position (see Binlog_sender::send_events
132+
in sql/rpl_binlog_sender.cc). That makes 106 bytes of data for skipped
133+
event in the binlog. *this is also the case with GTID replication*. To
134+
mitigate such behavior, you are expected to keep the binlog small (see
135+
max_binlog_size, defaults to 1G).
136+
In any case, the timestamp is 0 (as in 1970-01-01T00:00:00).
137+
138+
Attributes:
139+
ident: Name of the current binlog
140+
"""
141+
142+
def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs):
143+
super(HeartbeatLogEvent, self).__init__(from_packet, event_size,
144+
table_map, ctl_connection,
145+
**kwargs)
146+
self.ident = self.packet.read(event_size).decode()
147+
148+
def _dump(self):
149+
super(HeartbeatLogEvent, self)._dump()
150+
print("Current binlog: %s" % (self.ident))
151+
152+
122153
class QueryEvent(BinLogEvent):
123154
'''This evenement is trigger when a query is run of the database.
124155
Only replicated queries are logged.'''

pymysqlreplication/packet.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class BinLogPacketWrapper(object):
7070
constants.STOP_EVENT: event.StopEvent,
7171
constants.BEGIN_LOAD_QUERY_EVENT: event.BeginLoadQueryEvent,
7272
constants.EXECUTE_LOAD_QUERY_EVENT: event.ExecuteLoadQueryEvent,
73+
constants.HEARTBEAT_LOG_EVENT: event.HeartbeatLogEvent,
7374
# row_event
7475
constants.UPDATE_ROWS_EVENT_V1: row_event.UpdateRowsEvent,
7576
constants.WRITE_ROWS_EVENT_V1: row_event.WriteRowsEvent,

pymysqlreplication/tests/test_basic.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ def ignoredEvents(self):
2525
return [GtidEvent]
2626

2727
def test_allowed_event_list(self):
28-
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 13)
29-
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 12)
30-
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 12)
28+
self.assertEqual(len(self.stream._allowed_event_list(None, None, False)), 14)
29+
self.assertEqual(len(self.stream._allowed_event_list(None, None, True)), 13)
30+
self.assertEqual(len(self.stream._allowed_event_list(None, [RotateEvent], False)), 13)
3131
self.assertEqual(len(self.stream._allowed_event_list([RotateEvent], None, False)), 1)
3232

3333
def test_read_query_event(self):
@@ -714,7 +714,8 @@ def test_read_query_event(self):
714714

715715
self.stream.close()
716716
self.stream = BinLogStreamReader(
717-
self.database, server_id=1024, blocking=True, auto_position=gtid)
717+
self.database, server_id=1024, blocking=True, auto_position=gtid,
718+
ignored_events=[HeartbeatLogEvent])
718719

719720
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
720721
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)
@@ -770,7 +771,8 @@ def test_position_gtid(self):
770771

771772
self.stream.close()
772773
self.stream = BinLogStreamReader(
773-
self.database, server_id=1024, blocking=True, auto_position=gtid)
774+
self.database, server_id=1024, blocking=True, auto_position=gtid,
775+
ignored_events=[HeartbeatLogEvent])
774776

775777
self.assertIsInstance(self.stream.fetchone(), RotateEvent)
776778
self.assertIsInstance(self.stream.fetchone(), FormatDescriptionEvent)

0 commit comments

Comments
 (0)