Skip to content

Commit 66ca7ab

Browse files
Support test
1 parent fdaa06d commit 66ca7ab

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,15 @@ def __init__(self, connection, resume_stream = False, blocking = False):
1313
blocking: Read on stream is blocking
1414
'''
1515
self.__connection = connection
16+
self.__connected = False
17+
self.__resume_stream = resume_stream
18+
self.__blocking = blocking
19+
20+
#Store table meta informations
21+
self.table_map = {}
22+
1623

24+
def __connect_to_stream(self):
1725
cur = self.__connection.cursor()
1826
cur.execute("SHOW MASTER STATUS")
1927
(log_file, log_pos) = cur.fetchone()[:2]
@@ -27,22 +35,22 @@ def __init__(self, connection, resume_stream = False, blocking = False):
2735
command = COM_BINLOG_DUMP
2836
prelude = struct.pack('<i', len(log_file) + 11) \
2937
+ int2byte(command)
30-
if resume_stream:
38+
if self.__resume_stream:
3139
prelude += struct.pack('<I', log_pos)
3240
else:
3341
prelude += struct.pack('<I', 4)
34-
if blocking:
42+
if self.__blocking:
3543
prelude += struct.pack('<h', 0)
3644
else:
3745
prelude += struct.pack('<h', 1)
3846
prelude += struct.pack('<I', 3)
3947
self.__connection.wfile.write(prelude + log_file)
4048
self.__connection.wfile.flush()
41-
42-
#Store table meta informations
43-
self.table_map = {}
49+
self.__connected = True
4450

4551
def fetchone(self):
52+
if self.__connected == False:
53+
self.__connect_to_stream()
4654
pkt = self.__connection.read_packet()
4755
if not pkt.is_ok_packet():
4856
return None

pymysqlreplication/tests/base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
import pymysql
22
import unittest
3+
import copy
4+
from pymysqlreplication import BinLogStreamReader
35

46
class PyMySQLReplicationTestCase(unittest.TestCase):
57
'''Test the module. Be carefull it will reset your MySQL server'''
68
database = {"host":"localhost","user":"root", "passwd":"","use_unicode": True}
79

810
def setUp(self):
911
self.conn_control = pymysql.connect(**self.database)
10-
self.conn_test = pymysql.connect(**self.database)
1112
self.execute("CREATE DATABASE pymysqlreplication_test")
1213
self.resetBinLog()
14+
db = copy.copy(self.database)
15+
db["db"] = "pymysqlreplication_test"
16+
self.conn_control = pymysql.connect(**db)
17+
self.conn_test = pymysql.connect(**self.database)
18+
self.stream = BinLogStreamReader(self.conn_test)
1319

1420
def tearDown(self):
1521
self.execute("DROP DATABASE pymysqlreplication_test")

pymysqlreplication/tests/test_basic.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
11
import base
2-
from pymysqlreplication import BinLogStreamReader
2+
from pymysqlreplication.event import *
33

44
class TestBinLogStreamReader(base.PyMySQLReplicationTestCase):
5-
def test_open_stream(self):
6-
""" test opening stream"""
7-
stream = BinLogStreamReader(self.conn_test)
5+
def test_read_query_event(self):
6+
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
7+
self.execute(query)
88

9+
#RotateEvent
10+
self.stream.fetchone()
11+
#FormatDescription
12+
self.stream.fetchone()
13+
14+
event = self.stream.fetchone()
15+
self.assertIsInstance(event.event, BinLogQueryEvent)
16+
self.assertEqual(event.event.query, query)
917

1018
__all__ = ["TestBinLogStreamReader"]
1119

0 commit comments

Comments
 (0)