Skip to content

Commit 45063c9

Browse files
Arthur Gautierbaloo
Arthur Gautier
authored andcommitted
binlogstream: cleanup blocking code
Signed-off-by: Arthur Gautier <[email protected]>
1 parent a3f7902 commit 45063c9

File tree

1 file changed

+40
-23
lines changed

1 file changed

+40
-23
lines changed

Diff for: pymysqlreplication/binlogstream.py

+40-23
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,10 @@ class BinLogStreamReader(object):
127127
"""
128128
report_slave = None
129129

130-
def __init__(self, connection_settings, server_id, ctl_connection_settings=None, resume_stream=False,
131-
blocking=False, only_events=None, log_file=None, log_pos=None,
132-
filter_non_implemented_events=True,
130+
def __init__(self, connection_settings, server_id,
131+
ctl_connection_settings=None, resume_stream=False,
132+
blocking=False, only_events=None, log_file=None,
133+
log_pos=None, filter_non_implemented_events=True,
133134
ignored_events=None, auto_position=None,
134135
only_tables=None, ignored_tables=None,
135136
only_schemas=None, ignored_schemas=None,
@@ -140,30 +141,36 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
140141
slave_heartbeat=None):
141142
"""
142143
Attributes:
143-
ctl_connection_settings: Connection settings for cluster holding schema information
144+
ctl_connection_settings: Connection settings for cluster holding
145+
schema information
144146
resume_stream: Start for event from position or the latest event of
145147
binlog or from older available event
146-
blocking: Read on stream is blocking
148+
blocking: When master has finished reading/sending binlog it will
149+
send EOF instead of blocking connection.
147150
only_events: Array of allowed events
148151
ignored_events: Array of ignored events
149152
log_file: Set replication start log file
150-
log_pos: Set replication start log pos (resume_stream should be true)
153+
log_pos: Set replication start log pos (resume_stream should be
154+
true)
151155
auto_position: Use master_auto_position gtid to set position
152156
only_tables: An array with the tables you want to watch (only works
153157
in binlog_format ROW)
154158
ignored_tables: An array with the tables you want to skip
155159
only_schemas: An array with the schemas you want to watch
156160
ignored_schemas: An array with the schemas you want to skip
157161
freeze_schema: If true do not support ALTER TABLE. It's faster.
158-
skip_to_timestamp: Ignore all events until reaching specified timestamp.
162+
skip_to_timestamp: Ignore all events until reaching specified
163+
timestamp.
159164
report_slave: Report slave in SHOW SLAVE HOSTS.
160165
slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
161-
fail_on_table_metadata_unavailable: Should raise exception if we can't get
162-
table information on row_events
166+
fail_on_table_metadata_unavailable: Should raise exception if we
167+
can't get table information on
168+
row_events
163169
slave_heartbeat: (seconds) Should master actively send heartbeat on
164-
connection. This also reduces traffic in GTID replication
165-
on replication resumption (in case many event to skip in
166-
binlog). See MASTER_HEARTBEAT_PERIOD in mysql documentation
170+
connection. This also reduces traffic in GTID
171+
replication on replication resumption (in case
172+
many event to skip in binlog). See
173+
MASTER_HEARTBEAT_PERIOD in mysql documentation
167174
for semantics
168175
"""
169176

@@ -320,10 +327,10 @@ def __connect_to_stream(self):
320327
else:
321328
prelude += struct.pack('<I', 4)
322329

323-
if self.__blocking:
324-
prelude += struct.pack('<h', 0)
325-
else:
326-
prelude += struct.pack('<h', 1)
330+
flags = 0
331+
if not self.__blocking:
332+
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
333+
prelude += struct.pack('<H', flags)
327334

328335
prelude += struct.pack('<I', self.__server_id)
329336
prelude += self.log_file.encode()
@@ -347,19 +354,21 @@ def __connect_to_stream(self):
347354
# is sent to the master
348355
# n_sid ulong 8bytes == which size is the gtid_set
349356
# | sid uuid 16bytes UUID as a binary
350-
# | n_intervals ulong 8bytes == how many intervals are sent for this gtid
357+
# | n_intervals ulong 8bytes == how many intervals are sent
358+
# | for this gtid
351359
# | | start ulong 8bytes Start position of this interval
352360
# | | stop ulong 8bytes Stop position of this interval
353361

354362
# A gtid set looks like:
355363
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
356364
# 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
357365
#
358-
# In this particular gtid set, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
366+
# In this particular gtid set,
367+
# 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10
359368
# is the first member of the set, it is called a gtid.
360369
# In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid
361-
# and have two intervals, 1-3 and 8-10, 1 is the start position of the first interval
362-
# 3 is the stop position of the first interval.
370+
# and have two intervals, 1-3 and 8-10, 1 is the start position of
371+
# the first interval 3 is the stop position of the first interval.
363372

364373
gtid_set = GtidSet(self.auto_position)
365374
encoded_data_size = gtid_set.encoded_length
@@ -371,11 +380,19 @@ def __connect_to_stream(self):
371380
8 + # binlog_pos_info_size
372381
4) # encoded_data_size
373382

374-
prelude = b'' + struct.pack('<i', header_size + encoded_data_size) \
383+
prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\
375384
+ int2byte(COM_BINLOG_DUMP_GTID)
376385

377-
# binlog_flags = 0 (2 bytes)
378-
prelude += struct.pack('<H', 0)
386+
flags = 0
387+
if not self.__blocking:
388+
flags |= 0x01 # BINLOG_DUMP_NON_BLOCK
389+
flags |= 0x04 # BINLOG_THROUGH_GTID
390+
391+
# binlog_flags (2 bytes)
392+
# see:
393+
# https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
394+
prelude += struct.pack('<H', flags)
395+
379396
# server_id (4 bytes)
380397
prelude += struct.pack('<I', self.__server_id)
381398
# binlog_name_info_size (4 bytes)

0 commit comments

Comments
 (0)