From 54b95398eb4c8733a0d918b65e5ca43459d079c5 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Tue, 25 Jan 2022 12:02:03 +0300 Subject: [PATCH 1/3] python: drop Python 2 support Python 2.7 reached the end of its life on January 1st, 2020 [1]. Since it would be a waste to ignore several Python 3.x features in master discovery implementation, we decided to drop Python 2 support here. Python 2 workaround cleanup activities are expected to be solved as part of #212 solution. 1. https://www.python.org/doc/sunset-python-2/ Part of #196 --- .github/workflows/testing.yml | 2 -- CHANGELOG.md | 1 + setup.py | 3 ++- test/suites/lib/skip.py | 9 --------- test/suites/test_encoding.py | 5 +---- 5 files changed, 4 insertions(+), 16 deletions(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index c6d6cf08..5d77881f 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -24,7 +24,6 @@ jobs: - '2.8' - '2.x-latest' python: - - '2.7' - '3.5' - '3.6' - '3.7' @@ -116,7 +115,6 @@ jobs: - '1.10' - '2.8' python: - - '2.7' - '3.10' steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index b1f6e082..b541c274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 (PR #192). ### Changed +- **Breaking**: drop Python 2 support (PR #207). - **Breaking**: change binary types encode/decode for Python 3 to support working with varbinary (PR #211, #105). With Python 2 the behavior of the connector remains the same. diff --git a/setup.py b/setup.py index 2f4746f0..c57573d5 100755 --- a/setup.py +++ b/setup.py @@ -84,5 +84,6 @@ def find_version(*file_paths): command_options=command_options, install_requires=[ 'msgpack>=0.4.0', - ] + ], + python_requires='>=3', ) diff --git a/test/suites/lib/skip.py b/test/suites/lib/skip.py index f8f5a475..5fcc9355 100644 --- a/test/suites/lib/skip.py +++ b/test/suites/lib/skip.py @@ -80,12 +80,3 @@ def skip_or_run_varbinary_test(func): return skip_or_run_test_tarantool(func, '2.2.1', 'does not support VARBINARY type') - -def skip_or_run_mp_bin_test(func): - """Decorator to skip or run mp_bin-related tests depending on - the Python version. - - Python 2 connector do not support mp_bin. - """ - - return skip_or_run_test_python_major(func, 3, 'does not support mp_bin') \ No newline at end of file diff --git a/test/suites/test_encoding.py b/test/suites/test_encoding.py index 1ee0f4aa..f5f5f8c0 100644 --- a/test/suites/test_encoding.py +++ b/test/suites/test_encoding.py @@ -6,7 +6,7 @@ import unittest import tarantool -from .lib.skip import skip_or_run_mp_bin_test, skip_or_run_varbinary_test +from .lib.skip import skip_or_run_varbinary_test from .lib.tarantool_server import TarantoolServer class TestSuite_Encoding(unittest.TestCase): @@ -99,7 +99,6 @@ def test_01_02_string_decode_for_encoding_utf8_behavior(self): resp = self.con_encoding_utf8.eval("return box.space['%s']:get('%s')" % (space, data)) self.assertSequenceEqual(resp, [[data]]) - @skip_or_run_mp_bin_test @skip_or_run_varbinary_test def test_01_03_bytes_encode_for_encoding_utf8_behavior(self): data_id = 103 @@ -111,7 +110,6 @@ def test_01_03_bytes_encode_for_encoding_utf8_behavior(self): resp = self.con_encoding_utf8.select(space, [ data ], index='varbin') self.assertSequenceEqual(resp, [[data_id, data]]) - @skip_or_run_mp_bin_test @skip_or_run_varbinary_test def test_01_04_varbinary_decode_for_encoding_utf8_behavior(self): data_id = 104 @@ -162,7 +160,6 @@ def test_02_03_bytes_encode_for_encoding_none_behavior(self): resp = self.con_encoding_none.select(space, [data]) self.assertSequenceEqual(resp, [[data]]) - @skip_or_run_mp_bin_test @skip_or_run_varbinary_test def test_02_04_varbinary_decode_for_encoding_none_behavior(self): data_id = 204 From e1c0c53cc31ac44e1fa4c4be4c2978122f3db131 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Thu, 17 Mar 2022 16:31:50 +0300 Subject: [PATCH 2/3] connection: introduce common interface Introduce connection interface to be used in connection pool implementation. Only CRUD and base connect/close API is required by the interface. Part of #196 --- tarantool/connection.py | 87 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/tarantool/connection.py b/tarantool/connection.py index 0ff39b58..73574e46 100644 --- a/tarantool/connection.py +++ b/tarantool/connection.py @@ -8,6 +8,7 @@ import time import errno import socket +import abc import ctypes import ctypes.util @@ -76,8 +77,92 @@ ENCODING_DEFAULT, ) +# Based on https://realpython.com/python-interface/ +class ConnectionInterface(metaclass=abc.ABCMeta): + @classmethod + def __subclasshook__(cls, subclass): + return (hasattr(subclass, 'close') and + callable(subclass.close) and + hasattr(subclass, 'is_closed') and + callable(subclass.is_closed) and + hasattr(subclass, 'connect') and + callable(subclass.connect) and + hasattr(subclass, 'call') and + callable(subclass.call) and + hasattr(subclass, 'eval') and + callable(subclass.eval) and + hasattr(subclass, 'replace') and + callable(subclass.replace) and + hasattr(subclass, 'insert') and + callable(subclass.insert) and + hasattr(subclass, 'delete') and + callable(subclass.delete) and + hasattr(subclass, 'upsert') and + callable(subclass.upsert) and + hasattr(subclass, 'update') and + callable(subclass.update) and + hasattr(subclass, 'ping') and + callable(subclass.ping) and + hasattr(subclass, 'select') and + callable(subclass.select) and + hasattr(subclass, 'execute') and + callable(subclass.execute) or + NotImplemented) + + @abc.abstractmethod + def close(self): + raise NotImplementedError + + @abc.abstractmethod + def is_closed(self): + raise NotImplementedError + + @abc.abstractmethod + def connect(self): + raise NotImplementedError + + @abc.abstractmethod + def call(self, func_name, *args, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def eval(self, expr, *args, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def replace(self, space_name, values): + raise NotImplementedError + + @abc.abstractmethod + def insert(self, space_name, values): + raise NotImplementedError + + @abc.abstractmethod + def delete(self, space_name, key, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def upsert(self, space_name, tuple_value, op_list, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def update(self, space_name, key, op_list, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def ping(self, notime): + raise NotImplementedError + + @abc.abstractmethod + def select(self, space_name, key, **kwargs): + raise NotImplementedError + + @abc.abstractmethod + def execute(self, query, params, **kwargs): + raise NotImplementedError + -class Connection(object): +class Connection(ConnectionInterface): ''' Represents connection to the Tarantool server. From 25464908a5f5ee86fba9c4e03fac42a4651395ac Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Thu, 17 Mar 2022 16:32:08 +0300 Subject: [PATCH 3/3] connection_pool: introduce connection pool Introduce ConnectionPool class to work with cluster of Tarantool instances. ConnectionPool support master discovery and ro/rw-based requests, so it is most useful while working with a single replicaset of instances. ConnectionPool is supported only for Python 3.7 or newer. Authenticated user must be able to call `box.info` on instances. ConnectionPool updates information about each server state (RO/RW) on initial connect and then asynchronously in separate threads. Application retries must be written considering the asynchronous nature of cluster state refresh. User does not need to use any synchronization mechanisms in requests, it's all handled with ConnectionPool methods. ConnectionPool API is the same as a plain Connection API. On each request, a connection is chosen to execute this request. A connection is chosen based on a request mode: * Mode.ANY chooses any instance. * Mode.RW chooses an RW instance. * Mode.RO chooses an RO instance. * Mode.PREFER_RW chooses an RW instance, if possible, RO instance otherwise. * Mode.PREFER_RO chooses an RO instance, if possible, RW instance otherwise. All requests that are guaranteed to write (insert, replace, delete, upsert, update) use RW mode by default. select uses ANY by default. You can set the mode explicitly. call, eval, execute and ping requests require to set the mode explicitly. Example: pool = tarantool.ConnectionPool( addrs=[ {'host': '108.177.16.0', 'port': 3301}, {'host': '108.177.16.0', 'port': 3302}, ], user='test', password='test',) pool.call('some_write_procedure', arg, mode=tarantool.Mode.RW) Closes #196 --- CHANGELOG.md | 38 +++ tarantool/__init__.py | 7 + tarantool/connection_pool.py | 474 ++++++++++++++++++++++++++++++ tarantool/const.py | 6 + tarantool/error.py | 15 + test/suites/__init__.py | 3 +- test/suites/lib/skip.py | 35 ++- test/suites/test_pool.py | 539 +++++++++++++++++++++++++++++++++++ 8 files changed, 1109 insertions(+), 8 deletions(-) create mode 100644 tarantool/connection_pool.py create mode 100644 test/suites/test_pool.py diff --git a/CHANGELOG.md b/CHANGELOG.md index b541c274..7a0eae78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,44 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Reusable testing workflow for integration with tarantool artifacts (PR #192). +- Connection pool with master discovery (PR #211, #196). + + ConnectionPool is supported only for Python 3.7 or newer. + Authenticated user must be able to call `box.info` on instances. + + ConnectionPool updates information about each server state (RO/RW) + on initial connect and then asynchronously in separate threads. + Application retries must be written considering the asynchronous nature + of cluster state refresh. User does not need to use any synchronization + mechanisms in requests, it's all handled with ConnectionPool methods. + + ConnectionPool API is the same as a plain Connection API. + On each request, a connection is chosen to execute this request. + A connection is chosen based on a request mode: + * Mode.ANY chooses any instance. + * Mode.RW chooses an RW instance. + * Mode.RO chooses an RO instance. + * Mode.PREFER_RW chooses an RW instance, if possible, RO instance + otherwise. + * Mode.PREFER_RO chooses an RO instance, if possible, RW instance + otherwise. + All requests that are guaranteed to write (insert, replace, delete, + upsert, update) use RW mode by default. select uses ANY by default. You + can set the mode explicitly. call, eval, execute and ping requests + require to set the mode explicitly. + + Example: + ```python + pool = tarantool.ConnectionPool( + addrs=[ + {'host': '108.177.16.0', 'port': 3301}, + {'host': '108.177.16.0', 'port': 3302}, + ], + user='test', + password='test',) + + pool.call('some_write_procedure', arg, mode=tarantool.Mode.RW) + ``` ### Changed - **Breaking**: drop Python 2 support (PR #207). diff --git a/tarantool/__init__.py b/tarantool/__init__.py index 8ae0c44c..8f7b6d07 100644 --- a/tarantool/__init__.py +++ b/tarantool/__init__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- # pylint: disable=C0301,W0105,W0401,W0614 +import sys + from tarantool.connection import Connection from tarantool.mesh_connection import MeshConnection from tarantool.const import ( @@ -76,3 +78,8 @@ def connectmesh(addrs=({'host': 'localhost', 'port': 3301},), user=None, __all__ = ['connect', 'Connection', 'connectmesh', 'MeshConnection', 'Schema', 'Error', 'DatabaseError', 'NetworkError', 'NetworkWarning', 'SchemaError', 'dbapi'] + +# ConnectionPool is supported only for Python 3.7 or newer. +if sys.version_info.major >= 3 and sys.version_info.minor >= 7: + from tarantool.connection_pool import ConnectionPool, Mode + __all__.extend(['ConnectionPool', 'Mode']) diff --git a/tarantool/connection_pool.py b/tarantool/connection_pool.py new file mode 100644 index 00000000..176c29b8 --- /dev/null +++ b/tarantool/connection_pool.py @@ -0,0 +1,474 @@ +# -*- coding: utf-8 -*- + +import abc +import itertools +import queue +import threading +import time +import typing +from dataclasses import dataclass, field +from enum import Enum + +from tarantool.connection import Connection, ConnectionInterface +from tarantool.const import ( + CONNECTION_TIMEOUT, + POOL_INSTANCE_RECONNECT_DELAY, + POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS, + POOL_REFRESH_DELAY, + SOCKET_TIMEOUT +) +from tarantool.error import ( + ClusterConnectWarning, + PoolTolopogyError, + PoolTolopogyWarning, + ConfigurationError, + DatabaseError, + NetworkError, + NetworkWarning, + tnt_strerror, + warn +) +from tarantool.utils import ENCODING_DEFAULT +from tarantool.mesh_connection import validate_address + + +class Mode(Enum): + ANY = 1 + RW = 2 + RO = 3 + PREFER_RW = 4 + PREFER_RO = 5 + + +class Status(Enum): + HEALTHY = 1 + UNHEALTHY = 2 + + +@dataclass +class InstanceState(): + status: Status = Status.UNHEALTHY + ro: typing.Optional[bool] = None + + +def QueueFactory(): + return queue.Queue(maxsize=1) + + +@dataclass +class PoolUnit(): + addr: dict + conn: Connection + input_queue: queue.Queue = field(default_factory=QueueFactory) + output_queue: queue.Queue = field(default_factory=QueueFactory) + thread: typing.Optional[threading.Thread] = None + state: InstanceState = field(default_factory=InstanceState) + # request_processing_enabled is used to stop requests processing + # in background thread on close or destruction. + request_processing_enabled: bool = False + + +# Based on https://realpython.com/python-interface/ +class StrategyInterface(metaclass=abc.ABCMeta): + @classmethod + def __subclasshook__(cls, subclass): + return (hasattr(subclass, '__init__') and + callable(subclass.__init__) and + hasattr(subclass, 'update') and + callable(subclass.update) and + hasattr(subclass, 'getnext') and + callable(subclass.getnext) or + NotImplemented) + + @abc.abstractmethod + def __init__(self, pool): + raise NotImplementedError + + @abc.abstractmethod + def update(self): + raise NotImplementedError + + @abc.abstractmethod + def getnext(self, mode): + raise NotImplementedError + +class RoundRobinStrategy(StrategyInterface): + """ + Simple round-robin connection rotation + """ + def __init__(self, pool): + self.ANY_iter = None + self.RW_iter = None + self.RO_iter = None + self.pool = pool + self.rebuild_needed = True + + def build(self): + ANY_pool = [] + RW_pool = [] + RO_pool = [] + + for key in self.pool: + state = self.pool[key].state + + if state.status == Status.UNHEALTHY: + continue + + ANY_pool.append(key) + + if state.ro == False: + RW_pool.append(key) + else: + RO_pool.append(key) + + if len(ANY_pool) > 0: + self.ANY_iter = itertools.cycle(ANY_pool) + else: + self.ANY_iter = None + + if len(RW_pool) > 0: + self.RW_iter = itertools.cycle(RW_pool) + else: + self.RW_iter = None + + if len(RO_pool) > 0: + self.RO_iter = itertools.cycle(RO_pool) + else: + self.RO_iter = None + + self.rebuild_needed = False + + def update(self): + self.rebuild_needed = True + + def getnext(self, mode): + if self.rebuild_needed: + self.build() + + if mode == Mode.ANY: + if self.ANY_iter is not None: + return next(self.ANY_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + elif mode == Mode.RW: + if self.RW_iter is not None: + return next(self.RW_iter) + else: + raise PoolTolopogyError("Can't find healthy rw instance in pool") + elif mode == Mode.RO: + if self.RO_iter is not None: + return next(self.RO_iter) + else: + raise PoolTolopogyError("Can't find healthy ro instance in pool") + elif mode == Mode.PREFER_RO: + if self.RO_iter is not None: + return next(self.RO_iter) + elif self.RW_iter is not None: + return next(self.RW_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + elif mode == Mode.PREFER_RW: + if self.RW_iter is not None: + return next(self.RW_iter) + elif self.RO_iter is not None: + return next(self.RO_iter) + else: + raise PoolTolopogyError("Can't find healthy instance in pool") + + +@dataclass +class PoolTask(): + method_name: str + args: tuple + kwargs: dict + + +class ConnectionPool(ConnectionInterface): + ''' + Represents pool of connections to the cluster of Tarantool servers. + + ConnectionPool API is the same as a plain Connection API. + On each request, a connection is chosen to execute this request. + Connection is selected based on request mode: + * Mode.ANY chooses any instance. + * Mode.RW chooses an RW instance. + * Mode.RO chooses an RO instance. + * Mode.PREFER_RW chooses an RW instance, if possible, RO instance + otherwise. + * Mode.PREFER_RO chooses an RO instance, if possible, RW instance + otherwise. + All requests that are guaranteed to write (insert, replace, delete, + upsert, update) use RW mode by default. select uses ANY by default. You + can set the mode explicitly. call, eval, execute and ping requests + require to set the mode explicitly. + ''' + def __init__(self, + addrs, + user=None, + password=None, + socket_timeout=SOCKET_TIMEOUT, + reconnect_max_attempts=POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS, + reconnect_delay=POOL_INSTANCE_RECONNECT_DELAY, + connect_now=True, + encoding=ENCODING_DEFAULT, + call_16=False, + connection_timeout=CONNECTION_TIMEOUT, + strategy_class=RoundRobinStrategy, + refresh_delay=POOL_REFRESH_DELAY): + ''' + Initialize connections to the cluster of servers. + + :param list addrs: List of {host: , port:} dictionaries, + describing server addresses. + :param int reconnect_max_attempts: Max attempts to reconnect + for each connection in the pool. Be careful with reconnect + parameters in ConnectionPool since every status refresh is + also a request with reconnection. Default is 0 (fail after + first attempt). + :param float reconnect_delay: Time between reconnect + attempts for each connection in the pool. Be careful with + reconnect parameters in ConnectionPool since every status + refresh is also a request with reconnection. Default is 0. + :param StrategyInterface strategy_class: Class for choosing + instance based on request mode. By default, round-robin + strategy is used. + :param int refresh_delay: Minimal time between RW/RO status + refreshes. + ''' + + if not isinstance(addrs, list) or len(addrs) == 0: + raise ConfigurationError("addrs must be non-empty list") + + # Verify addresses. + for addr in addrs: + ok, msg = validate_address(addr) + if not ok: + raise ConfigurationError(msg) + self.addrs = addrs + + # Create connections + self.pool = {} + self.refresh_delay = refresh_delay + self.strategy = strategy_class(self.pool) + + for addr in self.addrs: + key = self._make_key(addr) + self.pool[key] = PoolUnit( + addr=addr, + conn=Connection( + host=addr['host'], + port=addr['port'], + user=user, + password=password, + socket_timeout=socket_timeout, + reconnect_max_attempts=reconnect_max_attempts, + reconnect_delay=reconnect_delay, + connect_now=False, # Connect in ConnectionPool.connect() + encoding=encoding, + call_16=call_16, + connection_timeout=connection_timeout) + ) + + if connect_now: + self.connect() + + def __del__(self): + self.close() + + def _make_key(self, addr): + return '{0}:{1}'.format(addr['host'], addr['port']) + + def _get_new_state(self, unit): + conn = unit.conn + + if conn.is_closed(): + try: + conn.connect() + except NetworkError as e: + msg = "Failed to connect to {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, ClusterConnectWarning) + return InstanceState(Status.UNHEALTHY) + + try: + resp = conn.call('box.info') + except NetworkError as e: + msg = "Failed to get box.info for {0}:{1}, reason: {2}".format( + unit.addr['host'], unit.addr['port'], repr(e)) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + try: + ro = resp.data[0]['ro'] + except (IndexError, KeyError) as e: + msg = "Incorrect box.info response from {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + try: + status = resp.data[0]['status'] + + if status != 'running': + msg = "{0}:{1} instance status is not 'running'".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + except (IndexError, KeyError) as e: + msg = "Incorrect box.info response from {0}:{1}".format( + unit.addr['host'], unit.addr['port']) + warn(msg, PoolTolopogyWarning) + return InstanceState(Status.UNHEALTHY) + + return InstanceState(Status.HEALTHY, ro) + + def _refresh_state(self, key): + unit = self.pool[key] + + state = self._get_new_state(unit) + if state != unit.state: + unit.state = state + self.strategy.update() + + def close(self): + for unit in self.pool.values(): + unit.request_processing_enabled = False + unit.thread.join() + + if not unit.conn.is_closed(): + unit.conn.close() + + def is_closed(self): + return all(unit.request_processing_enabled == False for unit in self.pool.values()) + + def _request_process_loop(self, key, unit, last_refresh): + while unit.request_processing_enabled: + if not unit.input_queue.empty(): + task = unit.input_queue.get() + method = getattr(Connection, task.method_name) + try: + resp = method(unit.conn, *task.args, **task.kwargs) + except Exception as e: + unit.output_queue.put(e) + else: + unit.output_queue.put(resp) + + now = time.time() + + if now - last_refresh > self.refresh_delay: + self._refresh_state(key) + last_refresh = time.time() + + def connect(self): + for key in self.pool: + unit = self.pool[key] + + self._refresh_state(key) + last_refresh = time.time() + + unit.thread = threading.Thread( + target=self._request_process_loop, + args=(key, unit, last_refresh), + daemon=True, + ) + unit.request_processing_enabled = True + unit.thread.start() + + def _send(self, mode, method_name, *args, **kwargs): + key = self.strategy.getnext(mode) + unit = self.pool[key] + + task = PoolTask(method_name=method_name, args=args, kwargs=kwargs) + + unit.input_queue.put(task) + resp = unit.output_queue.get() + + if isinstance(resp, Exception): + raise resp + + return resp + + def call(self, func_name, *args, mode=None): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'call', func_name, *args) + + def eval(self, expr, *args, mode=None): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'eval', expr, *args) + + def replace(self, space_name, values, *, mode=Mode.RW): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'replace', space_name, values) + + def insert(self, space_name, values, *, mode=Mode.RW): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'insert', space_name, values) + + def delete(self, space_name, key, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'delete', space_name, key, **kwargs) + + def upsert(self, space_name, tuple_value, op_list, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'upsert', space_name, tuple_value, + op_list, **kwargs) + + def update(self, space_name, key, op_list, *, mode=Mode.RW, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + return self._send(mode, 'update', space_name, key, + op_list, **kwargs) + + def ping(self, *, mode=None, **kwargs): + ''' + :param tarantool.Mode mode: Request mode. + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'ping', **kwargs) + + def select(self, space_name, key, *, mode=Mode.ANY, **kwargs): + ''' + :param tarantool.Mode mode: Request mode (default is + ANY). + ''' + + return self._send(mode, 'select', space_name, key, **kwargs) + + def execute(self, query, params=None, *, mode=None): + ''' + :param tarantool.Mode mode: Request mode (default is RW). + ''' + + if mode is None: + raise ValueError("Please, specify 'mode' keyword argument") + + return self._send(mode, 'execute', query, params) diff --git a/tarantool/const.py b/tarantool/const.py index 0db35978..9eb834a8 100644 --- a/tarantool/const.py +++ b/tarantool/const.py @@ -96,3 +96,9 @@ RECONNECT_DELAY = 0.1 # Default cluster nodes list refresh interval (seconds) CLUSTER_DISCOVERY_DELAY = 60 +# Default cluster nodes state refresh interval (seconds) +POOL_REFRESH_DELAY = 1 +# Default maximum number of attempts to reconnect for pool instance +POOL_INSTANCE_RECONNECT_MAX_ATTEMPTS = 0 +# Default delay between attempts to reconnect (seconds) +POOL_INSTANCE_RECONNECT_DELAY = 0 diff --git a/tarantool/error.py b/tarantool/error.py index 78519b68..c7165a9b 100644 --- a/tarantool/error.py +++ b/tarantool/error.py @@ -223,6 +223,21 @@ class ClusterDiscoveryWarning(UserWarning): pass +class ClusterConnectWarning(UserWarning): + '''Warning related to cluster pool connection''' + pass + + +class PoolTolopogyWarning(UserWarning): + '''Warning related to ro/rw cluster pool topology''' + pass + + +class PoolTolopogyError(DatabaseError): + '''Exception raised due to unsatisfying ro/rw cluster pool topology''' + pass + + # always print this warnings warnings.filterwarnings("always", category=NetworkWarning) diff --git a/test/suites/__init__.py b/test/suites/__init__.py index 8e2eafc1..1868ad42 100644 --- a/test/suites/__init__.py +++ b/test/suites/__init__.py @@ -10,6 +10,7 @@ from .test_protocol import TestSuite_Protocol from .test_reconnect import TestSuite_Reconnect from .test_mesh import TestSuite_Mesh +from .test_pool import TestSuite_Pool from .test_execute import TestSuite_Execute from .test_dbapi import TestSuite_DBAPI from .test_encoding import TestSuite_Encoding @@ -18,7 +19,7 @@ TestSuite_Schema_BinaryConnection, TestSuite_Request, TestSuite_Protocol, TestSuite_Reconnect, TestSuite_Mesh, TestSuite_Execute, TestSuite_DBAPI, - TestSuite_Encoding) + TestSuite_Encoding, TestSuite_Pool) def load_tests(loader, tests, pattern): suite = unittest.TestSuite() diff --git a/test/suites/lib/skip.py b/test/suites/lib/skip.py index 5fcc9355..284b70b6 100644 --- a/test/suites/lib/skip.py +++ b/test/suites/lib/skip.py @@ -18,8 +18,18 @@ def wrapper(self, *args, **kwargs): func(self, *args, **kwargs) if not hasattr(self, 'tnt_version'): + srv = None + + if hasattr(self, 'servers'): + srv = self.servers[0] + + if hasattr(self, 'srv'): + srv = self.srv + + assert srv is not None + self.__class__.tnt_version = re.match( - r'[\d.]+', self.srv.admin('box.info.version')[0] + r'[\d.]+', srv.admin('box.info.version')[0] ).group() tnt_version = pkg_resources.parse_version(self.tnt_version) @@ -34,9 +44,8 @@ def wrapper(self, *args, **kwargs): return wrapper -def skip_or_run_test_python_major(func, REQUIRED_PYTHON_MAJOR, msg): - """Decorator to skip or run tests depending on the Python major - version. +def skip_or_run_test_python(func, REQUIRED_PYTHON_VERSION, msg): + """Decorator to skip or run tests depending on the Python version. Also, it can be used with the 'setUp' method for skipping the whole test suite. @@ -47,9 +56,12 @@ def wrapper(self, *args, **kwargs): if func.__name__ == 'setUp': func(self, *args, **kwargs) - major = sys.version_info.major - if major != REQUIRED_PYTHON_MAJOR: - self.skipTest('Python %s connector %s' % (major, msg)) + ver = sys.version_info + python_version_str = '%d.%d' % (ver.major, ver.minor) + python_version = pkg_resources.parse_version(python_version_str) + support_version = pkg_resources.parse_version(REQUIRED_PYTHON_VERSION) + if python_version < support_version: + self.skipTest('Python %s connector %s' % (python_version, msg)) if func.__name__ != 'setUp': func(self, *args, **kwargs) @@ -80,3 +92,12 @@ def skip_or_run_varbinary_test(func): return skip_or_run_test_tarantool(func, '2.2.1', 'does not support VARBINARY type') + +def skip_or_run_conn_pool_test(func): + """Decorator to skip or run ConnectionPool tests depending on + the Python version. + """ + + return skip_or_run_test_python(func, '3.7', + 'does not support ConnectionPool') + diff --git a/test/suites/test_pool.py b/test/suites/test_pool.py new file mode 100644 index 00000000..12263d79 --- /dev/null +++ b/test/suites/test_pool.py @@ -0,0 +1,539 @@ +# -*- coding: utf-8 -*- + +from __future__ import print_function + +import sys +import time +import unittest +import warnings + +import tarantool +from tarantool.error import PoolTolopogyError, DatabaseError, NetworkError + +from .lib.skip import skip_or_run_sql_test, skip_or_run_conn_pool_test +from .lib.tarantool_server import TarantoolServer + + +def create_server(_id): + srv = TarantoolServer() + srv.script = 'test/suites/box.lua' + srv.start() + srv.admin("box.schema.user.create('test', {password = 'test', " + + "if_not_exists = true})") + srv.admin("box.schema.user.grant('test', 'execute', 'universe')") + srv.admin("box.schema.space.create('test')") + srv.admin(r"box.space.test:format({" + +r" { name = 'pk', type = 'string' }," + + r" { name = 'id', type = 'number', is_nullable = true }" + + r"})") + srv.admin(r"box.space.test:create_index('pk'," + + r"{ unique = true," + + r" parts = {{field = 1, type = 'string'}}})") + srv.admin(r"box.space.test:create_index('id'," + + r"{ unique = true," + + r" parts = {{field = 2, type = 'number', is_nullable=true}}})") + srv.admin("box.schema.user.grant('test', 'read,write', 'space', 'test')") + srv.admin("json = require('json')") + + # Create srv_id function (for testing purposes). + srv.admin("function srv_id() return %s end" % _id) + return srv + + +@unittest.skipIf(sys.platform.startswith("win"), + 'Pool tests on windows platform are not supported') +class TestSuite_Pool(unittest.TestCase): + def set_ro(self, srv, read_only): + if read_only: + req = r'box.cfg{read_only = true}' + else: + req = r'box.cfg{read_only = false}' + + srv.admin(req) + + def set_cluster_ro(self, read_only_list): + assert len(self.servers) == len(read_only_list) + + for i in range(len(self.servers)): + self.set_ro(self.servers[i], read_only_list[i]) + + def retry(self, func, count=5, timeout=0.5): + for i in range(count): + try: + func() + except Exception as e: + if i + 1 == count: + raise e + + time.sleep(timeout) + + @classmethod + def setUpClass(self): + print(' POOL '.center(70, '='), file=sys.stderr) + print('-' * 70, file=sys.stderr) + + @skip_or_run_conn_pool_test + def setUp(self): + # Create five servers and extract helpful fields for tests. + self.servers = [] + self.addrs = [] + self.servers_count = 5 + for i in range(self.servers_count): + srv = create_server(i) + self.servers.append(srv) + self.addrs.append({'host': srv.host, 'port': srv.args['primary']}) + + def test_00_basic(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.RW), + [False]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.RO), + [True]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RW), + [False]) + self.assertSequenceEqual( + self.pool.eval('return box.info().ro', mode=tarantool.Mode.PREFER_RO), + [True]) + + def test_01_roundrobin(self): + self.set_cluster_ro([False, False, True, False, True]) + RW_ports = set([str(self.addrs[0]['port']), str(self.addrs[1]['port']), str(self.addrs[3]['port'])]) + RO_ports = set([str(self.addrs[2]['port']), str(self.addrs[4]['port'])]) + all_ports = set() + for addr in self.addrs: + all_ports.add(str(addr['port'])) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + def get_port(self, mode): + resp = self.pool.eval('return box.cfg.listen', mode=mode) + self.assertIsInstance(resp.data[0], str) + return resp.data[0] + + # Expect ANY iterate through all instances. + ANY_ports_result = set() + for i in range(len(self.servers)): + ANY_ports_result.add(get_port(self, tarantool.Mode.ANY)) + + self.assertSetEqual(ANY_ports_result, all_ports) + + # Expect RW iterate through all RW instances. + RW_ports_result = set() + for i in range(len(self.servers)): + RW_ports_result.add(get_port(self, tarantool.Mode.RW)) + + self.assertSetEqual(RW_ports_result, RW_ports) + + # Expect RO iterate through all RO instances. + RO_ports_result = set() + for i in range(len(self.servers)): + RO_ports_result.add(get_port(self, tarantool.Mode.RO)) + + self.assertSetEqual(RO_ports_result, RO_ports) + + # Expect PREFER_RW iterate through all RW instances if there is at least one. + PREFER_RW_ports_result = set() + for i in range(len(self.servers)): + PREFER_RW_ports_result.add(get_port(self, tarantool.Mode.PREFER_RW)) + + self.assertSetEqual(PREFER_RW_ports_result, RW_ports) + + # Expect PREFER_RO iterate through all RO instances if there is at least one. + PREFER_RO_ports_result = set() + for i in range(len(self.servers)): + PREFER_RO_ports_result.add(get_port(self, tarantool.Mode.PREFER_RO)) + + self.assertSetEqual(PREFER_RO_ports_result, RO_ports) + + # Setup cluster with no RW. + self.set_cluster_ro([True, True, True, True, True]) + + # Expect RW to fail if there are no RW. + def expect_RW_to_fail_if_there_are_no_RW(): + with self.assertRaises(PoolTolopogyError): + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW) + + self.retry(func=expect_RW_to_fail_if_there_are_no_RW) + + # Expect PREFER_RW iterate through all instances if there are no RW. + def expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW(): + PREFER_RW_ports_result_all_ro = set() + for i in range(len(self.servers)): + PREFER_RW_ports_result_all_ro.add(get_port(self, tarantool.Mode.PREFER_RW)) + + self.assertSetEqual(PREFER_RW_ports_result_all_ro, all_ports) + + self.retry(func=expect_PREFER_RW_iterate_through_all_instances_if_there_are_no_RW) + + # Setup cluster with no RO. + self.set_cluster_ro([False, False, False, False, False]) + + # Expect RO to fail if there are no RO. + def expect_RO_to_fail_if_there_are_no_RO(): + with self.assertRaises(PoolTolopogyError): + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RO) + + self.retry(func=expect_RO_to_fail_if_there_are_no_RO) + + # Expect PREFER_RO iterate through all instances if there are no RO. + def expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO(): + PREFER_RO_ports_result_all_rw = set() + for i in range(len(self.servers)): + PREFER_RO_ports_result_all_rw.add(get_port(self, tarantool.Mode.PREFER_RO)) + + self.assertSetEqual(PREFER_RO_ports_result_all_rw, all_ports) + + self.retry(func=expect_PREFER_RO_iterate_through_all_instances_if_there_are_no_RO) + + def test_02_exception_raise(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaises(DatabaseError): + self.pool.call('non_existing_procedure', mode=tarantool.Mode.ANY) + + def test_03_insert(self): + self.set_cluster_ro([True, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.insert('test', ['test_03_insert_1', 1]), + [['test_03_insert_1', 1]]) + self.assertSequenceEqual( + self.pool.insert('test', ['test_03_insert_2', 2], + mode=tarantool.Mode.RW), + [['test_03_insert_2', 2]]) + + conn_2 = tarantool.connect( + host=self.addrs[2]['host'], + port=self.addrs[2]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + conn_2.select('test', 'test_03_insert_1'), + [['test_03_insert_1', 1]]) + + def test_04_delete(self): + self.set_cluster_ro([True, True, True, False, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_3 = tarantool.connect( + host=self.addrs[3]['host'], + port=self.addrs[3]['port'], + user='test', + password='test') + + conn_3.insert('test', ['test_04_delete_1', 1]) + conn_3.insert('test', ['test_04_delete_2', 2]) + + self.assertSequenceEqual( + self.pool.delete('test', 'test_04_delete_1'), + [['test_04_delete_1', 1]]) + self.assertSequenceEqual( + conn_3.select('test', 'test_04_delete_1'), + []) + + self.assertSequenceEqual( + self.pool.delete('test', 2, index='id', mode=tarantool.Mode.RW), + [['test_04_delete_2', 2]]) + self.assertSequenceEqual( + conn_3.select('test', 'test_04_delete_2'), + []) + + def test_05_upsert(self): + self.set_cluster_ro([True, False, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_1 = tarantool.connect( + host=self.addrs[1]['host'], + port=self.addrs[1]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.upsert('test', ['test_05_upsert', 3], [('+', 1, 1)]), + []) + self.assertSequenceEqual( + conn_1.select('test', 'test_05_upsert'), + [['test_05_upsert', 3]]) + + self.assertSequenceEqual( + self.pool.upsert('test', ['test_05_upsert', 3], + [('+', 1, 1)], mode=tarantool.Mode.RW), []) + self.assertSequenceEqual( + conn_1.select('test', 'test_05_upsert'), + [['test_05_upsert', 4]]) + + def test_06_update(self): + self.set_cluster_ro([True, True, True, True, False]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_4 = tarantool.connect( + host=self.addrs[4]['host'], + port=self.addrs[4]['port'], + user='test', + password='test') + conn_4.insert('test', ['test_06_update_1', 3]) + conn_4.insert('test', ['test_06_update_2', 14]) + + self.assertSequenceEqual( + self.pool.update('test', ('test_06_update_1',), [('+', 1, 1)]), + [['test_06_update_1', 4]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_06_update_1'), + [['test_06_update_1', 4]]) + + self.assertSequenceEqual( + self.pool.update('test', ('test_06_update_2',), + [('=', 1, 10)], mode=tarantool.Mode.RW), + [['test_06_update_2', 10]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_06_update_2'), + [['test_06_update_2', 10]]) + + def test_07_replace(self): + self.set_cluster_ro([True, True, True, True, False]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + conn_4 = tarantool.connect( + host=self.addrs[4]['host'], + port=self.addrs[4]['port'], + user='test', + password='test') + conn_4.insert('test', ['test_07_replace', 3]) + + self.assertSequenceEqual( + self.pool.replace('test', ['test_07_replace', 4], + mode=tarantool.Mode.RW), + [['test_07_replace', 4]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_07_replace'), + [['test_07_replace', 4]]) + + self.assertSequenceEqual( + self.pool.replace('test', ['test_07_replace', 5]), + [['test_07_replace', 5]]) + self.assertSequenceEqual( + conn_4.select('test', 'test_07_replace'), + [['test_07_replace', 5]]) + + def test_08_select(self): + self.set_cluster_ro([False, False, False, False, False]) + + for addr in self.addrs: + conn = tarantool.connect( + host=addr['host'], + port=addr['port'], + user='test', + password='test') + conn.insert('test', ['test_08_select', 3]) + + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + self.assertSequenceEqual( + self.pool.select('test', 'test_08_select'), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', ['test_08_select'], + mode=tarantool.Mode.ANY), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', 3, index='id', + mode=tarantool.Mode.RO), + [['test_08_select', 3]]) + self.assertSequenceEqual( + self.pool.select('test', [3], index='id', + mode=tarantool.Mode.PREFER_RW), + [['test_08_select', 3]]) + + def test_09_ping(self): + self.pool = tarantool.ConnectionPool(addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.ping() + + self.assertTrue(self.pool.ping(mode=tarantool.Mode.ANY) > 0) + self.assertEqual(self.pool.ping(mode=tarantool.Mode.RW, notime=True), "Success") + + def test_10_call(self): + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.call('box.info') + + self.assertEqual( + self.pool.call('box.info', mode=tarantool.Mode.RW)[0]['ro'], + False) + + self.assertSequenceEqual( + self.pool.call('json.encode', {'test_10_call': 1}, mode=tarantool.Mode.ANY), + ['{"test_10_call":1}']) + + def test_11_eval(self): + self.set_cluster_ro([False, True, False, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.eval('return box.info()') + + self.assertEqual( + self.pool.eval('return box.info()', mode=tarantool.Mode.RW)[0]['ro'], + False) + + self.assertSequenceEqual( + self.pool.eval('return json.encode(...)', {'test_11_eval': 1}, mode=tarantool.Mode.ANY), + ['{"test_11_eval":1}']) + + @skip_or_run_sql_test + def test_12_execute(self): + self.set_cluster_ro([False, True, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test') + + with self.assertRaisesRegex(ValueError, "Please, specify 'mode' keyword argument"): + self.pool.execute("insert into \"test\" values ('test_12_execute_1', 1)") + + resp = self.pool.execute( + "insert into \"test\" values ('test_12_execute_1', 1)", + mode=tarantool.Mode.RW) + + self.assertEqual(resp.affected_row_count, 1) + self.assertEqual(resp.data, None) + + resp = self.pool.execute( + 'insert into "test" values (:pk, :id)', + { 'pk': 'test_12_execute_2', 'id': 2}, + mode=tarantool.Mode.RW) + self.assertEqual(resp.affected_row_count, 1) + self.assertEqual(resp.data, None) + + conn_0 = tarantool.connect( + host=self.addrs[0]['host'], + port=self.addrs[0]['port'], + user='test', + password='test') + + self.assertSequenceEqual( + conn_0.select('test', 'test_12_execute_1'), + [['test_12_execute_1', 1]]) + self.assertSequenceEqual( + conn_0.select('test', 'test_12_execute_2'), + [['test_12_execute_2', 2]]) + + def test_13_failover(self): + self.set_cluster_ro([False, True, True, True, True]) + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + # Simulate failover + self.servers[0].stop() + self.set_ro(self.servers[1], False) + + def expect_RW_request_execute_on_new_master(): + self.assertSequenceEqual( + self.pool.eval('return box.cfg.listen', mode=tarantool.Mode.RW), + [ str(self.addrs[1]['port']) ]) + + self.retry(func=expect_RW_request_execute_on_new_master) + + def test_14_cluster_with_instances_dead_in_runtime_is_ok(self): + self.set_cluster_ro([False, True, False, True, True]) + self.servers[0].stop() + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + self.pool.ping(mode=tarantool.Mode.RW) + + def test_15_cluster_with_dead_instances_on_start_is_ok(self): + self.set_cluster_ro([False, True, True, True, True]) + self.servers[0].stop() + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test', + refresh_delay=0.2) + + self.servers[0].start() + + def ping_RW(): + self.pool.ping(mode=tarantool.Mode.RW) + + self.retry(func=ping_RW) + + def test_16_is_closed(self): + self.set_cluster_ro([False, False, True, False, True]) + + self.pool = tarantool.ConnectionPool( + addrs=self.addrs, + user='test', + password='test',) + + self.assertEquals(self.pool.is_closed(), False) + + self.pool.close() + + self.assertEquals(self.pool.is_closed(), True) + + def tearDown(self): + if hasattr(self, 'pool'): + self.pool.close() + + for srv in self.servers: + srv.stop() + srv.clean()