forked from julien-duponchelle/python-mysql-replication
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbase.py
147 lines (122 loc) · 4.43 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# -*- coding: utf-8 -*-
import pymysql
import copy
from pymysqlreplication import BinLogStreamReader
import os
import sys
if sys.version_info < (2, 7):
import unittest2 as unittest
else:
import unittest
base = unittest.TestCase
class PyMySQLReplicationTestCase(base):
def ignoredEvents(self):
return []
def setUp(self):
# default
self.database = {
"host": os.environ.get("MYSQL_5_7") or "localhost",
"user": "root",
"passwd": "",
"port": 3306,
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}
self.conn_control = None
db = copy.copy(self.database)
db["db"] = None
self.connect_conn_control(db)
self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
self.execute("CREATE DATABASE pymysqlreplication_test")
db = copy.copy(self.database)
self.connect_conn_control(db)
self.stream = None
self.resetBinLog()
self.isMySQL56AndMore()
self.__is_mariaDB = None
def getMySQLVersion(self):
"""Return the MySQL version of the server
If version is 5.6.10-log the result is 5.6.10
"""
return self.execute("SELECT VERSION()").fetchone()[0].split('-')[0]
def isMySQL56AndMore(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
if version >= 5.6:
return True
return False
def isMySQL57(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
return version == 5.7
def isMySQL80AndMore(self):
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
return version >= 8.0
def isMariaDB(self):
if self.__is_mariaDB is None:
self.__is_mariaDB = "MariaDB" in self.execute("SELECT VERSION()").fetchone()[0]
return self.__is_mariaDB
@property
def supportsGTID(self):
if not self.isMySQL56AndMore():
return False
return self.execute("SELECT @@global.gtid_mode ").fetchone()[0] == "ON"
def connect_conn_control(self, db):
if self.conn_control is not None:
self.conn_control.close()
self.conn_control = pymysql.connect(**db)
def tearDown(self):
self.conn_control.close()
self.conn_control = None
self.stream.close()
self.stream = None
def execute(self, query):
c = self.conn_control.cursor()
c.execute(query)
return c
def execute_with_args(self, query, args):
c = self.conn_control.cursor()
c.execute(query, args)
return c
def resetBinLog(self):
self.execute("RESET MASTER")
if self.stream is not None:
self.stream.close()
self.stream = BinLogStreamReader(self.database, server_id=1024,
ignored_events=self.ignoredEvents())
def set_sql_mode(self):
"""set sql_mode to test with same sql_mode (mysql 5.7 sql_mode default is changed)"""
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
if version == 5.7:
self.execute("SET @@sql_mode='NO_ENGINE_SUBSTITUTION'")
def bin_log_format(self):
query = "SELECT @@binlog_format"
cursor = self.execute(query)
result = cursor.fetchone()
return result[0]
def bin_log_basename(self):
cursor = self.execute('SELECT @@log_bin_basename')
bin_log_basename = cursor.fetchone()[0]
bin_log_basename = bin_log_basename.split("/")[-1]
return bin_log_basename
class PyMySQLReplicationMariaDbTestCase(PyMySQLReplicationTestCase):
def setUp(self):
# default
self.database = {
"host": os.environ.get("MARIADB_10_6") or "localhost",
"user": "root",
"passwd": "",
"port": int(os.environ.get("MARIADB_10_6_PORT") or 3308),
"use_unicode": True,
"charset": "utf8",
"db": "pymysqlreplication_test"
}
self.conn_control = None
db = copy.copy(self.database)
db["db"] = None
self.connect_conn_control(db)
self.execute("DROP DATABASE IF EXISTS pymysqlreplication_test")
self.execute("CREATE DATABASE pymysqlreplication_test")
db = copy.copy(self.database)
self.connect_conn_control(db)
self.stream = None
self.resetBinLog()