Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6c2bac7

Browse files
committedMar 23, 2022
connection_pool: introduce connection pool
Introduce ConnectionPool class to work with cluster of Tarantool instances. ConnectionPool support master discovery and ro/rw-based requests, so it is most useful while working with a single replicaset of instances. ConnectionPool is supported only for Python 3.7 or newer. User used to connect must be able to call `box.info` on instances. ConnectionPool updates information about each server state (RO/RW) on initial connect and then asynchronously in separate threads. Application retries must be written considering the asynchronous nature of cluster state refresh. User does not need to use any synchronization mechanisms in requests, it's all handled with ConnectionPool methods. ConnectionPool API is the same as a plain Connection API. On each request, a connection is selected to execute this request. Connection is selected based on request mode: * Mode.RW selects an RW instance. * Mode.PREFER_RW selects an RW instance, if possible, RO instance otherwise. * Mode.PREFER_RO selects an RO instance, if possible, RW instance otherwise. All requests that are guaranteed to write (insert, replace, delete, upsert, update) use RW mode. For any other type of request (call, eval, execute, ping, select) mode could be set explicitly. pool.call('some_write_procedure', args, mode=tarantool.Mode.RW) Closes #196
1 parent 3f9983c commit 6c2bac7

File tree

7 files changed

+824
-9
lines changed

7 files changed

+824
-9
lines changed
 

‎tarantool/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,8 @@ def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None,
8080
__all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema',
8181
'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning',
8282
'SchemaError', 'dbapi']
83+
84+
# ConnectionPool is supported only for Python 3.7 or newer.
85+
if sys.version_info.major >= 3 and sys.version_info.minor >= 7:
86+
from tarantool.connection_pool import ConnectionPool, Mode
87+
__all__.extend(['ConnectionPool', 'Mode'])

‎tarantool/connection_pool.py

Lines changed: 406 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,406 @@
1+
# -*- coding: utf-8 -*-
2+
3+
import abc
4+
import functools
5+
import itertools
6+
import json
7+
import queue
8+
import threading
9+
import time
10+
import typing
11+
from dataclasses import dataclass, field
12+
from enum import Enum
13+
14+
from tarantool.connection import Connection, ConnectionInterface
15+
from tarantool.const import (
16+
CONNECTION_TIMEOUT,
17+
POOL_INSTANCE_RECONNECT_DELAY,
18+
POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS,
19+
POOL_REFRESH_DELAY,
20+
SOCKET_TIMEOUT
21+
)
22+
from tarantool.error import (
23+
ClusterConnectWarning,
24+
ClusterTolopogyError,
25+
ClusterTolopogyWarning,
26+
ConfigurationError,
27+
DatabaseError,
28+
NetworkError,
29+
NetworkWarning,
30+
tnt_strerror,
31+
warn
32+
)
33+
from tarantool.utils import ENCODING_DEFAULT
34+
from tarantool.mesh_connection import validate_address
35+
36+
37+
class Mode(Enum):
38+
RW = 1
39+
PREFER_RW = 2
40+
PREFER_RO = 3
41+
42+
43+
class Status(Enum):
44+
HEALTHY = 1
45+
UNHEALTHY = 2
46+
47+
48+
@dataclass
49+
class InstanceState():
50+
status: Status = Status.UNHEALTHY
51+
ro: typing.Optional[bool] = None
52+
53+
54+
def QueueFactory():
55+
return queue.Queue(maxsize=1)
56+
57+
58+
@dataclass
59+
class PoolUnit():
60+
addr: dict
61+
conn: Connection
62+
input_queue: queue.Queue = field(default_factory=QueueFactory)
63+
output_queue: queue.Queue = field(default_factory=QueueFactory)
64+
thread: typing.Optional[threading.Thread] = None
65+
state: InstanceState = field(default_factory=InstanceState)
66+
request_process_enabled: bool = False
67+
68+
69+
# Based on https://realpython.com/python-interface/
70+
class StrategyInterface(metaclass=abc.ABCMeta):
71+
@classmethod
72+
def __subclasshook__(cls, subclass):
73+
return (hasattr(subclass, 'update') and
74+
callable(subclass.update) and
75+
hasattr(subclass, 'getnext') and
76+
callable(subclass.getnext) or
77+
NotImplemented)
78+
79+
@abc.abstractmethod
80+
def update(self):
81+
raise NotImplementedError
82+
83+
@abc.abstractmethod
84+
def getnext(self, mode):
85+
raise NotImplementedError
86+
87+
class RoundRobinStrategy(object):
88+
"""
89+
Simple round-robin connection rotation
90+
"""
91+
def __init__(self, pool):
92+
self.RW_iter = None
93+
self.RO_iter = None
94+
self.pool = pool
95+
self.rebuild_needed = True
96+
97+
def build(self):
98+
RW_pool = []
99+
RO_pool = []
100+
101+
for key in self.pool:
102+
state = self.pool[key].state
103+
104+
if state.status == Status.UNHEALTHY:
105+
continue
106+
107+
if state.ro == False:
108+
RW_pool.append(key)
109+
else:
110+
RO_pool.append(key)
111+
112+
if len(RW_pool) > 0:
113+
self.RW_iter = itertools.cycle(RW_pool)
114+
else:
115+
self.RW_iter = None
116+
117+
if len(RO_pool) > 0:
118+
self.RO_iter = itertools.cycle(RO_pool)
119+
else:
120+
self.RO_iter = None
121+
122+
self.rebuild_needed = False
123+
124+
def update(self):
125+
self.rebuild_needed = True
126+
127+
def getnext(self, mode):
128+
if self.rebuild_needed:
129+
self.build()
130+
131+
if mode == Mode.RW:
132+
if self.RW_iter is not None:
133+
return next(self.RW_iter)
134+
else:
135+
raise ClusterTolopogyError("Can't find healthy rw instance in pool")
136+
elif mode == Mode.PREFER_RO:
137+
if self.RO_iter is not None:
138+
return next(self.RO_iter)
139+
elif self.RW_iter is not None:
140+
return next(self.RW_iter)
141+
else:
142+
raise ClusterTolopogyError("Can't find healthy instance in pool")
143+
elif mode == Mode.PREFER_RW:
144+
if self.RW_iter is not None:
145+
return next(self.RW_iter)
146+
elif self.RO_iter is not None:
147+
return next(self.RO_iter)
148+
else:
149+
raise ClusterTolopogyError("Can't find healthy instance in pool")
150+
151+
152+
@dataclass
153+
class PoolTask():
154+
method_name: str
155+
args: tuple
156+
kwargs: dict
157+
158+
159+
class ConnectionPool(ConnectionInterface):
160+
'''
161+
Represents pool of connections to the cluster of Tarantool servers.
162+
163+
ConnectionPool API is the same as a plain Connection API.
164+
On each request, a connection is selected to execute this request.
165+
Connection is selected based on request mode:
166+
* Mode.RW selects an RW instance.
167+
* Mode.PREFER_RW selects an RW instance, if possible, RO instance
168+
otherwise.
169+
* Mode.PREFER_RO selects an RO instance, if possible, RW instance
170+
otherwise.
171+
All write requests (insert, replace, delete, upsert, update) use
172+
RW mode. For any other type of request mode could be set
173+
explicitly.
174+
175+
ConnectionPool updates information about each server state (RO/RW)
176+
on initial connect and then asynchronously in separate threads.
177+
Application retries must be written considering the asynchronous
178+
nature of cluster state refresh. User does not need to use any
179+
synchronization mechanisms in requests, it's all handled with
180+
ConnectionPool methods.
181+
'''
182+
def __init__(self,
183+
addrs,
184+
user=None,
185+
password=None,
186+
socket_timeout=SOCKET_TIMEOUT,
187+
reconnect_max_attempts=POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS,
188+
reconnect_delay=POOL_INSTANCE_RECONNECT_DELAY,
189+
connect_now=True,
190+
encoding=ENCODING_DEFAULT,
191+
call_16=False,
192+
connection_timeout=CONNECTION_TIMEOUT,
193+
strategy_class=RoundRobinStrategy,
194+
refresh_delay=POOL_REFRESH_DELAY):
195+
'''
196+
Initialize connections to the cluster of servers.
197+
198+
:param list addrs: List of {host: , port:} dictionaries,
199+
describing server addresses.
200+
:param int reconnect_max_attempts: Max attempts to reconnect
201+
for each connection in the pool. Be careful with reconnect
202+
parameters in ConnectionPool since every status refresh is
203+
also a request with reconnection. Default is 0.
204+
:param float reconnect_delay: Time between reconnect
205+
attempts for each connection in the pool. Be careful with
206+
reconnect parameters in ConnectionPool since every status
207+
refresh is also a request with reconnection. Default is 0.
208+
:param StrategyInterface strategy_class: Class for choosing
209+
instance based on request mode. By default, round-robin
210+
strategy is used.
211+
:param int refresh_delay: Minimal time between RW/RO status
212+
refreshes.
213+
'''
214+
215+
if not isinstance(addrs, list) or len(addrs) == 0:
216+
raise ConfigurationError("addrs must be non-empty list")
217+
218+
# Verify addresses.
219+
for addr in addrs:
220+
ok, msg = validate_address(addr)
221+
if not ok:
222+
raise ConfigurationError(msg)
223+
self.addrs = addrs
224+
225+
# Create connections
226+
self.pool = {}
227+
self.refresh_delay = refresh_delay
228+
self.strategy = strategy_class(self.pool)
229+
230+
for addr in self.addrs:
231+
key = self._make_key(addr)
232+
self.pool[key] = PoolUnit(
233+
addr=addr,
234+
conn=Connection(
235+
host=addr['host'],
236+
port=addr['port'],
237+
user=user,
238+
password=password,
239+
socket_timeout=socket_timeout,
240+
reconnect_max_attempts=reconnect_max_attempts,
241+
reconnect_delay=reconnect_delay,
242+
connect_now=False, # Connect in ConnectionPool.connect()
243+
encoding=encoding,
244+
call_16=call_16,
245+
connection_timeout=connection_timeout)
246+
)
247+
248+
if connect_now:
249+
self.connect()
250+
251+
def __del__(self):
252+
self.close()
253+
254+
def _make_key(self, addr):
255+
return json.dumps(addr)
256+
257+
def _get_new_state(self, key, unit):
258+
conn = unit.conn
259+
260+
if conn.is_closed():
261+
try:
262+
conn.connect()
263+
except NetworkError as e:
264+
msg = "Failed to connect to {0}:{1}".format(
265+
unit.addr['host'], unit.addr['port'])
266+
warn(msg, ClusterConnectWarning)
267+
return InstanceState(Status.UNHEALTHY)
268+
269+
try:
270+
resp = conn.call('box.info')
271+
except Exception as e:
272+
msg = "Failed to get box.info for {0}:{1}, reason: {2}".format(
273+
unit.addr['host'], unit.addr['port'], repr(e))
274+
warn(msg, ClusterTolopogyWarning)
275+
return InstanceState(Status.UNHEALTHY)
276+
277+
try:
278+
ro = resp.data[0]['ro']
279+
except Exception as e:
280+
msg = "Incorrect box.info response from {0}:{1}".format(
281+
unit.addr['host'], unit.addr['port'])
282+
warn(msg, ClusterTolopogyWarning)
283+
return InstanceState(Status.UNHEALTHY)
284+
285+
return InstanceState(Status.HEALTHY, ro)
286+
287+
def _refresh_state(self, key):
288+
unit = self.pool[key]
289+
conn = unit.conn
290+
291+
state = self._get_new_state(key, unit)
292+
if state != unit.state:
293+
unit.state = state
294+
self.strategy.update()
295+
296+
def close(self):
297+
for unit in self.pool.values():
298+
unit.request_process_enabled = False
299+
unit.thread.join()
300+
301+
if not unit.conn.is_closed():
302+
unit.conn.close()
303+
304+
def is_closed(self):
305+
return functools.reduce(
306+
lambda x, y:
307+
x.request_process_enabled or y.request_process_enabled,
308+
self.pool)
309+
310+
def _request_process_loop(self, key, unit, last_refresh):
311+
while unit.request_process_enabled:
312+
if not unit.input_queue.empty():
313+
task = unit.input_queue.get()
314+
method = getattr(Connection, task.method_name)
315+
try:
316+
resp = method(unit.conn, *task.args, **task.kwargs)
317+
except Exception as e:
318+
unit.output_queue.put(e)
319+
else:
320+
unit.output_queue.put(resp)
321+
322+
now = time.time()
323+
324+
if now - last_refresh > self.refresh_delay:
325+
self._refresh_state(key)
326+
last_refresh = time.time()
327+
328+
def connect(self):
329+
for key in self.pool:
330+
unit = self.pool[key]
331+
332+
self._refresh_state(key)
333+
last_refresh = time.time()
334+
335+
unit.thread = threading.Thread(
336+
target=self._request_process_loop,
337+
args=(key, unit, last_refresh),
338+
daemon=True,
339+
)
340+
unit.request_process_enabled = True
341+
unit.thread.start()
342+
343+
def _send(self, mode, method_name, *args, **kwargs):
344+
key = self.strategy.getnext(mode)
345+
unit = self.pool[key]
346+
347+
task = PoolTask(method_name=method_name, args=args, kwargs=kwargs)
348+
349+
unit.input_queue.put(task)
350+
resp = unit.output_queue.get()
351+
352+
if isinstance(resp, Exception):
353+
raise resp
354+
355+
return resp
356+
357+
def call(self, func_name, *args, mode=Mode.PREFER_RW):
358+
'''
359+
:param tarantool.Mode mode: Request mode (default is
360+
PREFER_RW).
361+
'''
362+
return self._send(mode, 'call', func_name, *args)
363+
364+
def eval(self, expr, *args, mode=Mode.PREFER_RW):
365+
'''
366+
:param tarantool.Mode mode: Request mode (default is
367+
PREFER_RW).
368+
'''
369+
return self._send(mode, 'eval', expr, *args)
370+
371+
def replace(self, space_name, values):
372+
return self._send(Mode.RW, 'replace', space_name, values)
373+
374+
def insert(self, space_name, values):
375+
return self._send(Mode.RW, 'insert', space_name, values)
376+
377+
def delete(self, space_name, key, **kwargs):
378+
return self._send(Mode.RW, 'delete', space_name, key, **kwargs)
379+
380+
def upsert(self, space_name, tuple_value, op_list, **kwargs):
381+
return self._send(Mode.RW, 'upsert', space_name, tuple_value,
382+
op_list, **kwargs)
383+
384+
def update(self, space_name, key, op_list, **kwargs):
385+
return self._send(Mode.RW, 'update', space_name, key,
386+
op_list, **kwargs)
387+
388+
def ping(self, mode=Mode.PREFER_RW, **kwargs):
389+
'''
390+
:param tarantool.Mode mode: Request mode (default is
391+
PREFER_RW).
392+
'''
393+
return self._send(mode, 'ping', **kwargs)
394+
395+
def select(self, space_name, key, mode=Mode.PREFER_RO, **kwargs):
396+
'''
397+
:param tarantool.Mode mode: Request mode (default is
398+
PREFER_RO).
399+
'''
400+
return self._send(mode, 'select', space_name, key, **kwargs)
401+
402+
def execute(self, query, params=None, mode=Mode.RW):
403+
'''
404+
:param tarantool.Mode mode: Request mode (default is RW).
405+
'''
406+
return self._send(mode, 'execute', query, params)

‎tarantool/const.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,9 @@
9696
RECONNECT_DELAY = 0.1
9797
# Default cluster nodes list refresh interval (seconds)
9898
CLUSTER_DISCOVERY_DELAY = 60
99+
# Default cluster nodes state refresh interval (seconds)
100+
POOL_REFRESH_DELAY = 1
101+
# Default maximum number of attempts to reconnect for pool instance
102+
POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS = 0
103+
# Default delay between attempts to reconnect (seconds)
104+
POOL_INSTANCE_RECONNECT_DELAY = 0

‎tarantool/error.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,21 @@ class ClusterDiscoveryWarning(UserWarning):
223223
pass
224224

225225

226+
class ClusterConnectWarning(UserWarning):
227+
'''Warning related to cluster pool connection'''
228+
pass
229+
230+
231+
class ClusterTolopogyWarning(UserWarning):
232+
'''Warning related to ro/rw cluster pool topology'''
233+
pass
234+
235+
236+
class ClusterTolopogyError(DatabaseError):
237+
'''Exception raised due to unsatisfying ro/rw cluster pool topology'''
238+
pass
239+
240+
226241
# always print this warnings
227242
warnings.filterwarnings("always", category=NetworkWarning)
228243

‎test/suites/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@
1010
from .test_protocol import TestSuite_Protocol
1111
from .test_reconnect import TestSuite_Reconnect
1212
from .test_mesh import TestSuite_Mesh
13+
from .test_pool import TestSuite_Pool
1314
from .test_execute import TestSuite_Execute
1415
from .test_dbapi import TestSuite_DBAPI
16+
from .test_schema import (TestSuite_Schema_BinaryConnection,
17+
TestSuite_Schema_UnicodeConnection)
1518

1619
test_cases = (TestSuite_Schema_UnicodeConnection,
1720
TestSuite_Schema_BinaryConnection,
1821
TestSuite_Request, TestSuite_Protocol, TestSuite_Reconnect,
19-
TestSuite_Mesh, TestSuite_Execute, TestSuite_DBAPI)
22+
TestSuite_Mesh, TestSuite_Execute, TestSuite_DBAPI, TestSuite_Pool)
2023

2124
def load_tests(loader, tests, pattern):
2225
suite = unittest.TestSuite()

‎test/suites/lib/skip.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,18 @@ def wrapper(self, *args, **kwargs):
1818
func(self, *args, **kwargs)
1919

2020
if not hasattr(self, 'tnt_version'):
21+
srv = None
22+
23+
if hasattr(self, 'servers'):
24+
srv = self.servers[0]
25+
26+
if hasattr(self, 'srv'):
27+
srv = self.srv
28+
29+
assert srv is not None
30+
2131
self.__class__.tnt_version = re.match(
22-
r'[\d.]+', self.srv.admin('box.info.version')[0]
32+
r'[\d.]+', srv.admin('box.info.version')[0]
2333
).group()
2434

2535
tnt_version = pkg_resources.parse_version(self.tnt_version)
@@ -34,9 +44,8 @@ def wrapper(self, *args, **kwargs):
3444
return wrapper
3545

3646

37-
def skip_or_run_test_python_major(func, REQUIRED_PYTHON_MAJOR, msg):
38-
"""Decorator to skip or run tests depending on the Python major
39-
version.
47+
def skip_or_run_test_python(func, REQUIRED_PYTHON_VERSION, msg):
48+
"""Decorator to skip or run tests depending on the Python version.
4049
4150
Also, it can be used with the 'setUp' method for skipping
4251
the whole test suite.
@@ -47,9 +56,12 @@ def wrapper(self, *args, **kwargs):
4756
if func.__name__ == 'setUp':
4857
func(self, *args, **kwargs)
4958

50-
major = sys.version_info.major
51-
if major != REQUIRED_PYTHON_MAJOR:
52-
self.skipTest('Python %s connector %s' % (major, msg))
59+
ver = sys.version_info
60+
python_version_str = '%d.%d' % (ver.major, ver.minor)
61+
python_version = pkg_resources.parse_version(python_version_str)
62+
support_version = pkg_resources.parse_version(REQUIRED_PYTHON_VERSION)
63+
if python_version < support_version:
64+
self.skipTest('Python %s connector %s' % (python_version, msg))
5365

5466
if func.__name__ != 'setUp':
5567
func(self, *args, **kwargs)
@@ -88,4 +100,15 @@ def skip_or_run_mp_bin_test(func):
88100
Python 2 connector do not support mp_bin.
89101
"""
90102

91-
return skip_or_run_test_python_major(func, 3, 'does not support mp_bin')
103+
return skip_or_run_test_python(func, '3.0', 'does not support mp_bin')
104+
105+
106+
def skip_or_run_conn_pool_test(func):
107+
"""Decorator to skip or run mp_bin-related tests depending on
108+
the Python version.
109+
110+
Python 2 connector do not support mp_bin.
111+
"""
112+
113+
return skip_or_run_test_python(func, '3.7',
114+
'does not support ConnectionPool')

‎test/suites/test_pool.py

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
# -*- coding: utf-8 -*-
2+
3+
from __future__ import print_function
4+
5+
import sys
6+
import time
7+
import unittest
8+
import warnings
9+
from time import sleep
10+
11+
import tarantool
12+
from tarantool.error import ClusterTolopogyError, DatabaseError, NetworkError
13+
14+
from .lib.skip import skip_or_run_sql_test, skip_or_run_conn_pool_test
15+
from .lib.tarantool_server import TarantoolServer
16+
17+
18+
def create_server(_id):
19+
srv = TarantoolServer()
20+
srv.script = 'test/suites/box.lua'
21+
srv.start()
22+
srv.admin("box.schema.user.create('test', {password = 'test', " +
23+
"if_not_exists = true})")
24+
srv.admin("box.schema.user.grant('test', 'execute', 'universe')")
25+
srv.admin("box.schema.space.create('test')")
26+
srv.admin(r"box.space.test:format({"
27+
+r" { name = 'pk', type = 'string' }," +
28+
r" { name = 'id', type = 'number', is_nullable = true }" +
29+
r"})")
30+
srv.admin(r"box.space.test:create_index('pk'," +
31+
r"{ unique = true," +
32+
r" parts = {{field = 1, type = 'string'}}})")
33+
srv.admin(r"box.space.test:create_index('id'," +
34+
r"{ unique = true," +
35+
r" parts = {{field = 2, type = 'number', is_nullable=true}}})")
36+
srv.admin("box.schema.user.grant('test', 'read,write', 'space', 'test')")
37+
38+
# Create srv_id function (for testing purposes).
39+
srv.admin("function srv_id() return %s end" % _id)
40+
return srv
41+
42+
43+
@unittest.skipIf(sys.platform.startswith("win"),
44+
'Pool tests on windows platform are not supported')
45+
class TestSuite_Pool(unittest.TestCase):
46+
def set_ro(self, srv, read_only):
47+
if read_only:
48+
req = r'box.cfg{read_only = true}'
49+
else:
50+
req = r'box.cfg{read_only = false}'
51+
52+
srv.admin(req)
53+
54+
def set_cluster_ro(self, read_only_list):
55+
assert len(self.servers) == len(read_only_list)
56+
57+
for i in range(len(self.servers)):
58+
self.set_ro(self.servers[i], read_only_list[i])
59+
60+
def retry(self, func, count=5, timeout=0.5):
61+
for i in range(count):
62+
try:
63+
func()
64+
except Exception as e:
65+
if i + 1 == count:
66+
raise e
67+
68+
time.sleep(timeout)
69+
70+
@classmethod
71+
def setUpClass(self):
72+
print(' POOL '.center(70, '='), file=sys.stderr)
73+
print('-' * 70, file=sys.stderr)
74+
75+
@skip_or_run_conn_pool_test
76+
def setUp(self):
77+
# Create five servers and extract helpful fields for tests.
78+
self.servers = []
79+
self.addrs = []
80+
for i in range(5):
81+
srv = create_server(i)
82+
self.servers.append(srv)
83+
self.addrs.append({'host': srv.host, 'port': srv.args['primary']})
84+
85+
def test_00_basic(self):
86+
self.set_cluster_ro([False, False, True, False, True])
87+
88+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
89+
90+
self.assertSequenceEqual(self.pool.eval('return box.info().ro', mode=tarantool.Mode.RW), [False])
91+
self.assertSequenceEqual(self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RW), [False])
92+
self.assertSequenceEqual(self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RO), [True])
93+
94+
def test_01_roundrobin(self):
95+
self.set_cluster_ro([False, False, True, False, True])
96+
RW_ports = set([str(self.addrs[0]['port']), str(self.addrs[1]['port']), str(self.addrs[3]['port'])])
97+
RO_ports = set([str(self.addrs[2]['port']), str(self.addrs[4]['port'])])
98+
all_ports = set()
99+
for addr in self.addrs:
100+
all_ports.add(str(addr['port']))
101+
102+
self.pool = tarantool.ConnectionPool(
103+
addrs=self.addrs,
104+
user='test',
105+
password='test',
106+
refresh_delay=0.2)
107+
108+
# Expect RW iterate through all RW instances.
109+
RW_ports_result = set()
110+
for i in range(len(self.servers)):
111+
resp = self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW)
112+
RW_ports_result.add(resp.data[0])
113+
114+
self.assertSetEqual(RW_ports_result, RW_ports)
115+
116+
# Expect PREFER_RW iterate through all RW instances if there is at least one.
117+
PREFER_RW_ports_result = set()
118+
for i in range(len(self.servers)):
119+
resp = self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.PREFER_RW)
120+
PREFER_RW_ports_result.add(resp.data[0])
121+
122+
self.assertSetEqual(PREFER_RW_ports_result, RW_ports)
123+
124+
# Expect PREFER_RO iterate through all RO instances if there is at least one.
125+
PREFER_RO_ports_result = set()
126+
for i in range(len(self.servers)):
127+
resp = self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.PREFER_RO)
128+
PREFER_RO_ports_result.add(resp.data[0])
129+
130+
self.assertSetEqual(PREFER_RO_ports_result, RO_ports)
131+
132+
# Expect PREFER_RW iterate through all instances if there are no RW.
133+
self.set_cluster_ro([True, True, True, True, True])
134+
135+
def expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW():
136+
PREFER_RW_ports_result_all_ro = set()
137+
for i in range(len(self.servers)):
138+
resp = self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.PREFER_RW)
139+
PREFER_RW_ports_result_all_ro.add(resp.data[0])
140+
141+
self.assertSetEqual(PREFER_RW_ports_result_all_ro, all_ports)
142+
143+
self.retry(func=expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW)
144+
145+
# Expect PREFER_RO iterate through all instances if there are no RO.
146+
self.set_cluster_ro([False, False, False, False, False])
147+
148+
def expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO():
149+
PREFER_RO_ports_result_all_rw = set()
150+
for i in range(len(self.servers)):
151+
resp = self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.PREFER_RO)
152+
PREFER_RO_ports_result_all_rw.add(resp.data[0])
153+
154+
self.assertSetEqual(PREFER_RO_ports_result_all_rw, all_ports)
155+
156+
self.retry(func=expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO)
157+
158+
def test_02_exception_raise(self):
159+
self.set_cluster_ro([False, False, True, False, True])
160+
161+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
162+
with self.assertRaises(DatabaseError):
163+
self.pool.call('non_existing_procedure')
164+
165+
def test_03_insert(self):
166+
self.set_cluster_ro([False, True, False, True, True])
167+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
168+
169+
self.assertSequenceEqual(self.pool.insert('test', ['test_05_insert_1', 1]), [['test_05_insert_1', 1]])
170+
self.assertSequenceEqual(self.pool.insert('test', ['test_05_insert_2', 2]), [['test_05_insert_2', 2]])
171+
self.assertSequenceEqual(self.pool.insert('test', ['test_05_insert_3', 3]), [['test_05_insert_3', 3]])
172+
173+
conn_0 = tarantool.connect(
174+
host=self.addrs[0]['host'],
175+
port=self.addrs[0]['port'],
176+
user='test',
177+
password='test')
178+
conn_2 = tarantool.connect(
179+
host=self.addrs[2]['host'],
180+
port=self.addrs[2]['port'],
181+
user='test',
182+
password='test')
183+
184+
for key in ['test_05_insert_1', 'test_05_insert_2', 'test_05_insert_3']:
185+
resp_0 = conn_0.select('test', key)
186+
resp_2 = conn_2.select('test', key)
187+
self.assertEqual(len(resp_0) + len(resp_2), 1)
188+
189+
def test_04_delete(self):
190+
self.set_cluster_ro([True, True, True, False, True])
191+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
192+
193+
conn_3 = tarantool.connect(
194+
host=self.addrs[3]['host'],
195+
port=self.addrs[3]['port'],
196+
user='test',
197+
password='test')
198+
199+
conn_3.insert('test', ['test_06_delete_1', 1])
200+
conn_3.insert('test', ['test_06_delete_2', 2])
201+
202+
self.assertSequenceEqual(self.pool.delete('test', 'test_06_delete_1'), [['test_06_delete_1', 1]])
203+
self.assertSequenceEqual(conn_3.select('test', 'test_06_delete_1'), [])
204+
205+
self.assertSequenceEqual(self.pool.delete('test', 2, index='id'), [['test_06_delete_2', 2]])
206+
self.assertSequenceEqual(conn_3.select('test', 'test_06_delete_2'), [])
207+
208+
def test_05_upsert(self):
209+
self.set_cluster_ro([True, False, True, True, True])
210+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
211+
212+
conn_1 = tarantool.connect(
213+
host=self.addrs[1]['host'],
214+
port=self.addrs[1]['port'],
215+
user='test',
216+
password='test')
217+
218+
self.assertSequenceEqual(self.pool.upsert('test', ['test_07_upsert', 3], [('+', 1, 1)]), [])
219+
self.assertSequenceEqual(conn_1.select('test', 'test_07_upsert'), [['test_07_upsert', 3]])
220+
self.assertSequenceEqual(self.pool.upsert('test', ['test_07_upsert', 3], [('+', 1, 1)]), [])
221+
self.assertSequenceEqual(conn_1.select('test', 'test_07_upsert'), [['test_07_upsert', 4]])
222+
223+
def test_06_update(self):
224+
self.set_cluster_ro([True, True, True, True, False])
225+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
226+
227+
conn_4 = tarantool.connect(
228+
host=self.addrs[4]['host'],
229+
port=self.addrs[4]['port'],
230+
user='test',
231+
password='test')
232+
conn_4.insert('test', ['test_08_update', 3])
233+
234+
self.assertSequenceEqual(self.pool.update('test', ('test_08_update',), [('+', 1, 1)]), [['test_08_update', 4]])
235+
self.assertSequenceEqual(conn_4.select('test', 'test_08_update'), [['test_08_update', 4]])
236+
237+
def test_07_replace(self):
238+
self.set_cluster_ro([True, True, True, True, False])
239+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
240+
241+
conn_4 = tarantool.connect(
242+
host=self.addrs[4]['host'],
243+
port=self.addrs[4]['port'],
244+
user='test',
245+
password='test')
246+
conn_4.insert('test', ['test_09_replace', 3])
247+
248+
self.assertSequenceEqual(self.pool.update('test', ('test_09_replace',) , [('+', 1, 1)]), [['test_09_replace', 4]])
249+
self.assertSequenceEqual(conn_4.select('test', 'test_09_replace'), [['test_09_replace', 4]])
250+
251+
def test_08_select(self):
252+
self.set_cluster_ro([False, False, False, False, False])
253+
254+
for addr in self.addrs:
255+
conn = tarantool.connect(
256+
host=addr['host'],
257+
port=addr['port'],
258+
user='test',
259+
password='test')
260+
conn.insert('test', ['test_10_select', 3])
261+
262+
self.set_cluster_ro([False, True, False, True, True])
263+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
264+
265+
self.assertSequenceEqual(self.pool.select('test', 'test_10_select'), [['test_10_select', 3]])
266+
self.assertSequenceEqual(self.pool.select('test', ['test_10_select']), [['test_10_select', 3]])
267+
self.assertSequenceEqual(self.pool.select('test', 3, index='id'), [['test_10_select', 3]])
268+
self.assertSequenceEqual(self.pool.select('test', [3], index='id'), [['test_10_select', 3]])
269+
270+
def test_09_ping(self):
271+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
272+
273+
self.assertTrue(self.pool.ping() > 0)
274+
self.assertEqual(self.pool.ping(notime=True), "Success")
275+
276+
def test_10_call(self):
277+
self.set_cluster_ro([False, True, False, True, True])
278+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
279+
280+
self.assertEqual(self.pool.call('box.info')[0]['ro'], False)
281+
self.assertEqual(self.pool.call('box.info', mode=tarantool.Mode.RW)[0]['ro'], False)
282+
self.assertEqual(self.pool.call('box.info', mode=tarantool.Mode.PREFER_RO)[0]['ro'], True)
283+
284+
@skip_or_run_sql_test
285+
def test_11_execute(self):
286+
self.set_cluster_ro([False, True, True, True, True])
287+
self.pool = tarantool.ConnectionPool(addrs=self.addrs, user='test', password='test')
288+
289+
resp = self.pool.execute(
290+
'insert into "test" values (:pk, :id)',
291+
{ 'pk': 'test_11_execute', 'id': 1})
292+
self.assertEqual(resp.affected_row_count, 1)
293+
self.assertEqual(resp.data, None)
294+
295+
conn_0 = tarantool.connect(
296+
host=self.addrs[0]['host'],
297+
port=self.addrs[0]['port'],
298+
user='test',
299+
password='test')
300+
301+
self.assertSequenceEqual(conn_0.select('test', 'test_11_execute'), [['test_11_execute', 1]])
302+
303+
def test_12_failover(self):
304+
self.set_cluster_ro([False, True, True, True, True])
305+
self.pool = tarantool.ConnectionPool(
306+
addrs=self.addrs,
307+
user='test',
308+
password='test',
309+
refresh_delay=0.2)
310+
311+
# Simulate failover
312+
self.servers[0].stop()
313+
self.set_ro(self.servers[1], False)
314+
315+
def expect_RW_request_execute_on_new_master():
316+
self.assertSequenceEqual(
317+
self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW),
318+
[ str(self.addrs[1]['port']) ])
319+
320+
self.retry(func=expect_RW_request_execute_on_new_master)
321+
322+
def test_13_cluster_with_instances_dead_in_runtime_is_ok(self):
323+
self.set_cluster_ro([False, True, False, True, True])
324+
self.servers[0].stop()
325+
326+
self.pool = tarantool.ConnectionPool(
327+
addrs=self.addrs,
328+
user='test',
329+
password='test',
330+
refresh_delay=0.2)
331+
332+
self.pool.ping(mode=tarantool.Mode.RW)
333+
334+
def test_14_cluster_with_dead_instances_on_start_is_ok(self):
335+
self.set_cluster_ro([False, True, True, True, True])
336+
self.servers[0].stop()
337+
338+
self.pool = tarantool.ConnectionPool(
339+
addrs=self.addrs,
340+
user='test',
341+
password='test',
342+
refresh_delay=0.2)
343+
344+
self.servers[0].start()
345+
346+
def ping_RW():
347+
self.pool.ping(mode=tarantool.Mode.RW)
348+
349+
self.retry(func=ping_RW)
350+
351+
def tearDown(self):
352+
if hasattr(self, 'pool'):
353+
self.pool.close()
354+
355+
for srv in self.servers:
356+
srv.stop()
357+
srv.clean()

0 commit comments

Comments
 (0)
Please sign in to comment.