Skip to content

Commit b615725

Browse files
committed
improve performance and documentation
* symplify BinLogStreamReader.fetchone * improve documentation of BinLogStreamReader * tests/test_data_type.py: fix syntax errors * tests/test_basic.py: dynamic get an end position instead a fixed one
1 parent d9a2ba2 commit b615725

File tree

3 files changed

+108
-43
lines changed

3 files changed

+108
-43
lines changed

Diff for: pymysqlreplication/binlogstream.py

+89-38
Original file line numberDiff line numberDiff line change
@@ -141,20 +141,48 @@ def __init__(self, connection_settings, server_id,
141141
fail_on_table_metadata_unavailable=False,
142142
slave_heartbeat=None):
143143
"""
144-
Attributes:
144+
Parameters:
145+
connection_settings: a dict of parameters passed to `pymysql.connect`
146+
or `pymysql_wrapper`, of which "db" parameter is not necessary
147+
pymysql_wrapper: custom replacement for `pymysql.connect`
145148
ctl_connection_settings: Connection settings for cluster holding
146-
schema information
147-
resume_stream: Start for event from position or the latest event of
148-
binlog or from older available event
149+
schema information, which could be None, in which case
150+
`connection_settings` will be used as ctl_connection_settings,
151+
except for that "db" will be replaced to "information_schema"
152+
resume_stream: True or False. control the start point of the returned
153+
events, only works when `auto_position` is None.
154+
`fetchone` will fetch data from:
155+
1.the begining of `log_file`: if `resume_stream` is False
156+
2.`log_pos` of `log_file`: if resume_stream is True, and it's
157+
the first time to fetch the data
158+
3.the event right next to the last fetched event: when resume_stream
159+
is True and it's not the first time to fetch data
160+
note: the log position will be set back to the begging of `log_file`
161+
each time the client is disconnected and then reconnected
162+
to the mysql server (OperationalError 2006/2013) if resume_stream
163+
is False. so it's suggested to set resume_stream to True.
164+
149165
blocking: When master has finished reading/sending binlog it will
150166
send EOF instead of blocking connection.
151167
only_events: Array of allowed events
152168
ignored_events: Array of ignored events
153-
log_file: Set replication start log file
169+
log_file: Set replication start log file. if ether `log_file` or
170+
`log_pos` is None, and auto_position is None, then log_pos
171+
and log_file will be set as the values returned by the query
172+
"SHOW MASTER STATUS"
154173
log_pos: Set replication start log pos (resume_stream should be
155-
true)
174+
true). if ether `log_file` or `log_pos` is None, and auto_position
175+
is None, then log_pos and log_file will be set as the values
176+
returned by the query "SHOW MASTER STATUS", and log_pos will
177+
be set as 4 (the start position of any log file) if resume_stream
178+
is a false value
156179
end_log_pos: Set replication end log pos
157-
auto_position: Use master_auto_position gtid to set position
180+
auto_position: a string of replicated GTIDs. all the events except
181+
for thoses included in `auto_position` and those purged by
182+
the source server will be sent to the client. a valid `auto_position`
183+
looks like:
184+
19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10,
185+
1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140
158186
only_tables: An array with the tables you want to watch (only works
159187
in binlog_format ROW)
160188
ignored_tables: An array with the tables you want to skip
@@ -174,13 +202,22 @@ def __init__(self, connection_settings, server_id,
174202
many event to skip in binlog). See
175203
MASTER_HEARTBEAT_PERIOD in mysql documentation
176204
for semantics
205+
206+
Notes:
207+
the log position will be set back to the begging of `log_file`
208+
each time the client is disconnected and then auto-reconnected
209+
to the mysql server (OperationalError 2006/2013) if resume_stream
210+
is False. so it's suggested to set resume_stream to True.
211+
212+
an additional RotateEvent and FormatDescriptionEvent will be
213+
fetched each time the client is disconnected and then auto-
214+
reconnected to the server. (no matter resume_stream is True
215+
or False)
177216
"""
178217

179218
self.__connection_settings = connection_settings
180219
self.__connection_settings.setdefault("charset", "utf8")
181220

182-
self.__connected_stream = False
183-
self.__connected_ctl = False
184221
self.__resume_stream = resume_stream
185222
self.__blocking = blocking
186223
self._ctl_connection_settings = ctl_connection_settings
@@ -226,24 +263,26 @@ def __init__(self, connection_settings, server_id,
226263
self.pymysql_wrapper = pymysql.connect
227264

228265
def close(self):
229-
if self.__connected_stream:
266+
if getattr(self, '_stream_connection', None) and self._stream_connection.open:
230267
self._stream_connection.close()
231-
self.__connected_stream = False
232-
if self.__connected_ctl:
268+
if getattr(self, '_ctl_connection', None):
233269
# break reference cycle between stream reader and underlying
234270
# mysql connection object
235271
self._ctl_connection._get_table_information = None
236-
self._ctl_connection.close()
237-
self.__connected_ctl = False
272+
if self._ctl_connection.open:
273+
self._ctl_connection.close()
238274

239-
def __connect_to_ctl(self):
275+
def __connect_to_ctl(self, force_reconnect=False):
276+
if self.__connected_ctl:
277+
if not force_reconnect:
278+
return
279+
self._ctl_connection.close()
240280
if not self._ctl_connection_settings:
241281
self._ctl_connection_settings = dict(self.__connection_settings)
242282
self._ctl_connection_settings["db"] = "information_schema"
243283
self._ctl_connection_settings["cursorclass"] = DictCursor
244284
self._ctl_connection = self.pymysql_wrapper(**self._ctl_connection_settings)
245285
self._ctl_connection._get_table_information = self.__get_table_information
246-
self.__connected_ctl = True
247286

248287
def __checksum_enabled(self):
249288
"""Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
@@ -274,12 +313,29 @@ def _register_slave(self):
274313
self._stream_connection._next_seq_id = 1
275314
self._stream_connection._read_packet()
276315

277-
def __connect_to_stream(self):
316+
@property
317+
def __connected_stream(self):
318+
return bool(getattr(self, '_stream_connection', None) and \
319+
self._stream_connection.open)
320+
321+
@property
322+
def __connected_ctl(self):
323+
return bool(getattr(self, '_ctl_connection', None) and \
324+
self._ctl_connection.open)
325+
326+
def __connect_to_stream(self, force_reconnect=False):
327+
if self.__connected_stream:
328+
if not force_reconnect:
329+
return
330+
self._stream_connection.close()
331+
278332
# log_pos (4) -- position in the binlog-file to start the stream with
279333
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
280334
# server_id (4) -- server id of this slave
281335
# log_file (string.EOF) -- filename of the binlog on the master
282336
self._stream_connection = self.pymysql_wrapper(**self.__connection_settings)
337+
if pymysql.__version__ < LooseVersion("0.6"):
338+
self._stream_connection._read_packet = self._stream_connection.read_packet
283339

284340
self.__use_checksum = self.__checksum_enabled()
285341

@@ -301,9 +357,7 @@ def __connect_to_stream(self):
301357
4294967))
302358
# If heartbeat is too low, the connection will disconnect before,
303359
# this is also the behavior in mysql
304-
heartbeat = float(min(net_timeout/2., self.slave_heartbeat))
305-
if heartbeat > 4294967:
306-
heartbeat = 4294967
360+
heartbeat = float(min(net_timeout/2., self.slave_heartbeat, 4294967))
307361

308362
# master_heartbeat_period is nanoseconds
309363
heartbeat = int(heartbeat * 1000000000)
@@ -419,29 +473,27 @@ def __connect_to_stream(self):
419473
else:
420474
self._stream_connection._write_bytes(prelude)
421475
self._stream_connection._next_seq_id = 1
422-
self.__connected_stream = True
423476

424-
def fetchone(self):
477+
def fetchone(self, force_reconnect=False):
478+
self.__prefetch(force_reconnect=force_reconnect)
479+
return self.__fetchone()
480+
481+
def __prefetch(self, force_reconnect=False):
482+
self.__connect_to_ctl(force_reconnect=force_reconnect)
483+
self.__connect_to_stream(force_reconnect=force_reconnect)
484+
485+
def __fetchone(self):
486+
# let `__fetchone` be as light weight as possible.
425487
while True:
426488
if self.end_log_pos and self.is_past_end_log_pos:
427489
return None
428490

429-
if not self.__connected_stream:
430-
self.__connect_to_stream()
431-
432-
if not self.__connected_ctl:
433-
self.__connect_to_ctl()
434-
435491
try:
436-
if pymysql.__version__ < LooseVersion("0.6"):
437-
pkt = self._stream_connection.read_packet()
438-
else:
439-
pkt = self._stream_connection._read_packet()
492+
pkt = self._stream_connection._read_packet()
440493
except pymysql.OperationalError as error:
441494
code, message = error.args
442495
if code in MYSQL_EXPECTED_ERROR_CODES:
443-
self._stream_connection.close()
444-
self.__connected_stream = False
496+
self.__connect_to_stream(force_reconnect=True)
445497
continue
446498
raise
447499

@@ -555,9 +607,8 @@ def _allowed_event_list(self, only_events, ignored_events,
555607

556608
def __get_table_information(self, schema, table):
557609
for i in range(1, 3):
610+
self.__connect_to_ctl()
558611
try:
559-
if not self.__connected_ctl:
560-
self.__connect_to_ctl()
561612

562613
cur = self._ctl_connection.cursor()
563614
cur.execute("""
@@ -575,10 +626,10 @@ def __get_table_information(self, schema, table):
575626
except pymysql.OperationalError as error:
576627
code, message = error.args
577628
if code in MYSQL_EXPECTED_ERROR_CODES:
578-
self.__connected_ctl = False
579629
continue
580630
else:
581631
raise error
582632

583633
def __iter__(self):
584-
return iter(self.fetchone, None)
634+
self.__prefetch(force_reconnect=False)
635+
return iter(self.__fetchone, None)

Diff for: pymysqlreplication/tests/test_basic.py

+16-2
Original file line numberDiff line numberDiff line change
@@ -503,21 +503,35 @@ def test_end_log_pos(self):
503503

504504
binlog = self.execute("SHOW BINARY LOGS").fetchone()[0]
505505

506+
self.stream.close()
507+
self.stream = BinLogStreamReader(
508+
self.database,
509+
server_id=1024,
510+
log_pos=0,
511+
log_file=binlog)
512+
# fetch several events then get the end position of
513+
# the last event
514+
# do not use a fixed int as end position, cause that
515+
# may be an invalid position
516+
for i in range(13):
517+
_ = self.stream.fetchone()
518+
binlog = self.stream.log_file
519+
end_position = self.stream.log_pos
506520
self.stream.close()
507521
self.stream = BinLogStreamReader(
508522
self.database,
509523
server_id=1024,
510524
log_pos=0,
511525
log_file=binlog,
512-
end_log_pos=888)
526+
end_log_pos=end_position)
513527

514528
last_log_pos = 0
515529
last_event_type = 0
516530
for event in self.stream:
517531
last_log_pos = self.stream.log_pos
518532
last_event_type = event.event_type
519533

520-
self.assertEqual(last_log_pos, 888)
534+
self.assertEqual(last_log_pos, end_position)
521535
self.assertEqual(last_event_type, TABLE_MAP_EVENT)
522536

523537
class TestMultipleRowBinLogStreamReader(base.PyMySQLReplicationTestCase):

Diff for: pymysqlreplication/tests/test_data_type.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ def test_status_vars(self):
701701
self.assertEqual(event.catalog_nz_code, b'std')
702702
self.assertEqual(event.mts_accessed_db_names, [b'pymysqlreplication_test'])
703703

704-
def test_null_bitmask(self)
704+
def test_null_bitmask(self):
705705
"""Test parse of null-bitmask in table map events
706706
707707
Create table with 16 columns with nullability specified by 'bit_mask' variable
@@ -730,7 +730,7 @@ def test_null_bitmask(self)
730730
column_type = "INT"
731731
column_definition.append(column_type)
732732

733-
nullability = "NOT NULL" if not RowsEvent.__is_null(bit_mask, i) else ""
733+
nullability = "NOT NULL" if not RowsEvent._RowsEvent__is_null(None, bit_mask, i) else ""
734734
column_definition.append(nullability)
735735

736736
columns.append(" ".join(column_definition))
@@ -744,7 +744,7 @@ def test_null_bitmask(self)
744744
for i in range(16):
745745
values.append('0')
746746

747-
insert_query += f' ({",".join(values)})')
747+
insert_query += f' ({",".join(values)})'
748748

749749
self.execute(create_query)
750750
self.execute(insert_query)

0 commit comments

Comments
 (0)