-
Notifications
You must be signed in to change notification settings - Fork 683
/
Copy pathbase.py
112 lines (92 loc) · 3.25 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# -*- coding: utf-8 -*-
import pymysql
import copy
from pymysqlreplication import BinLogStreamReader
import os
import sys
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest
base = unittest.TestCase
class PyMySQLReplicationTestCase(base):
def ignoredEvents(self):
return []
def setUp(self):
db = os.environ.get('DB')
# default
self.database = {
"host": "localhost",
"user": "root",
"passwd": "",
"port": 3306,
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}
self.conn_control = None
db = copy.copy(self.database)
db["db"] = None
self.connect_conn_control(db)
self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
self.execute("CREATE DATABASE pymysqlreplication_test")
db = copy.copy(self.database)
self.connect_conn_control(db)
self.stream = None
self.resetBinLog()
self.isMySQL56AndMore()
def getMySQLVersion(self):
"""Return the MySQL version of the server
If version is 5.6.10-log the result is 5.6.10
"""
return self.execute("SELECT VERSION()").fetchone()[0].split('-')[0]
def isMySQL56AndMore(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
if version >= 5.6:
return True
return False
def isMySQL57(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
return version == 5.7
def isMySQL80AndMore(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
return version >= 8.0
@property
def supportsGTID(self):
if not self.isMySQL56AndMore():
return False
return self.execute("SELECT @@global.gtid_mode ").fetchone()[0] == "ON"
def connect_conn_control(self, db):
if self.conn_control is not None:
self.conn_control.close()
self.conn_control = pymysql.connect(**db)
def tearDown(self):
self.conn_control.close()
self.conn_control = None
self.stream.close()
self.stream = None
def execute(self, query):
c = self.conn_control.cursor()
c.execute(query)
return c
def resetBinLog(self):
self.execute("RESET MASTER")
if self.stream is not None:
self.stream.close()
self.stream = BinLogStreamReader(self.database, server_id=1024,
ignored_events=self.ignoredEvents())
def set_sql_mode(self):
"""set sql_mode to test with same sql_mode (mysql 5.7 sql_mode default is changed)"""
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
if version == 5.7:
self.execute("set @@sql_mode='NO_ENGINE_SUBSTITUTION'")
def bin_log_format(self):
query = "select @@binlog_format"
cursor = self.execute(query)
result = cursor.fetchone()
return result[0]
def bin_log_basename(self):
cursor = self.execute('select @@log_bin_basename')
bin_log_basename = cursor.fetchone()[0]
bin_log_basename = bin_log_basename.split("/")[-1]
return bin_log_basename