Skip to content

Commit 9144e8c

Browse files
Better connection and log rotate management
1 parent bfb022f commit 9144e8c

File tree

2 files changed

+32
-19
lines changed

2 files changed

+32
-19
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pymysql.constants.COMMAND import *
66
from pymysql.util import byte2int, int2byte
77
from .packet import BinLogPacketWrapper
8-
from .constants.BINLOG import TABLE_MAP_EVENT
8+
from .constants.BINLOG import TABLE_MAP_EVENT, ROTATE_EVENT
99

1010

1111
class BinLogStreamReader(object):
@@ -19,25 +19,32 @@ def __init__(self, connection_settings = {}, resume_stream = False, blocking = F
1919
'''
2020
self.__connection_settings = connection_settings
2121
self.__connection_settings['charset'] = 'utf8'
22-
ctl_connection_settings = copy.copy(self.__connection_settings)
23-
ctl_connection_settings['db'] = 'information_schema'
24-
ctl_connection_settings['cursorclass'] = pymysql.cursors.DictCursor
25-
self.__ctl_connection = pymysql.connect(**ctl_connection_settings)
26-
self.__connected = False
22+
23+
self.__connected_stream = False
24+
self.__connected_ctl = False
2725
self.__resume_stream = resume_stream
2826
self.__blocking = blocking
2927
self.__only_events = only_events
3028
self.__server_id = server_id
3129
self.__log_pos = None
30+
self.__log_file = None
3231

3332
#Store table meta informations
3433
self.table_map = {}
3534

3635
def close(self):
37-
if self.__connected:
36+
if self.__connected_stream:
3837
self._stream_connection.close()
39-
self.__connected = False
40-
self.__ctl_connection.close()
38+
self.__connected_stream = False
39+
if self.__connected_ctl:
40+
self._ctl_connection.close()
41+
self.__connected_ctl = False
42+
43+
def __connect_to_ctl(self):
44+
self._ctl_connection_settings = copy.copy(self.__connection_settings)
45+
self._ctl_connection_settings['db'] = 'information_schema'
46+
self._ctl_connection_settings['cursorclass'] = pymysql.cursors.DictCursor
47+
self._ctl_connection = pymysql.connect(**self._ctl_connection_settings)
4148

4249
def __connect_to_stream(self):
4350
self._stream_connection = pymysql.connect(**self.__connection_settings)
@@ -46,13 +53,14 @@ def __connect_to_stream(self):
4653
(log_file, log_pos) = cur.fetchone()[:2]
4754
cur.close()
4855

49-
56+
if self.__log_file is None:
57+
self.__log_file = log_file
5058
# binlog_pos (4) -- position in the binlog-file to start the stream with
5159
# flags (2) BINLOG_DUMP_NON_BLOCK (0 or 1)
5260
# server_id (4) -- server id of this slave
5361
# binlog-filename (string.EOF) -- filename of the binlog on the master
5462
command = COM_BINLOG_DUMP
55-
prelude = struct.pack('<i', len(log_file) + 11) \
63+
prelude = struct.pack('<i', len(self.__log_file) + 11) \
5664
+ int2byte(command)
5765
if self.__log_pos is None:
5866
if self.__resume_stream:
@@ -66,29 +74,35 @@ def __connect_to_stream(self):
6674
else:
6775
prelude += struct.pack('<h', 1)
6876
prelude += struct.pack('<I', self.__server_id)
69-
self._stream_connection.wfile.write(prelude + log_file.encode())
77+
self._stream_connection.wfile.write(prelude + self.__log_file.encode())
7078
self._stream_connection.wfile.flush()
71-
self.__connected = True
79+
self.__connected_stream = True
7280

7381
def fetchone(self):
7482
while True:
75-
if self.__connected == False:
83+
if self.__connected_stream == False:
7684
self.__connect_to_stream()
85+
if self.__connected_ctl == False:
86+
self.__connect_to_ctl()
7787
pkt = None
7888
try:
7989
pkt = self._stream_connection.read_packet()
8090
except pymysql.OperationalError as (code, message):
8191
if code == 2013: #2013: Connection Lost
82-
self.__connected = False
92+
self.__connected_stream = False
8393
continue
8494
if not pkt.is_ok_packet():
8595
return None
86-
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self.__ctl_connection)
96+
binlog_event = BinLogPacketWrapper(pkt, self.table_map, self._ctl_connection)
8797
if binlog_event.event_type == TABLE_MAP_EVENT:
8898
self.table_map[binlog_event.event.table_id] = binlog_event.event
8999
if self.__filter_event(binlog_event.event):
90100
continue
91-
self.__log_pos = binlog_event.log_pos
101+
if binlog_event.event_type == ROTATE_EVENT:
102+
self.__log_pos = binlog_event.event.position
103+
self.__log_file = binlog_event.event.next_binlog
104+
else:
105+
self.__log_pos = binlog_event.log_pos
92106
return binlog_event.event
93107

94108
def __filter_event(self, event):

pymysqlreplication/tests/test_basic.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_reading_rotate_event(self):
3434

3535
rotate_event = self.stream.fetchone()
3636

37-
def test_connection_lost_event(self):
37+
def test_connection_stream_lost_event(self):
3838
self.stream.close()
3939
self.stream = BinLogStreamReader(connection_settings = self.database, blocking = True)
4040

@@ -48,7 +48,6 @@ def test_connection_lost_event(self):
4848
#RotateEvent
4949
self.stream.fetchone()
5050

51-
5251
#FormatDescription
5352
self.stream.fetchone()
5453

0 commit comments

Comments
 (0)