From fe216190ff8de68c3617e27eae41cea2362d0f11 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Wed, 11 Feb 2015 01:30:45 +0900 Subject: [PATCH 1/5] 0.4 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6cd1632..c545f0d 100755 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setup( name="Tornado-MySQL", - version="0.3", + version="0.4", url='https://github.com/PyMySQL/Tornado-MySQL', author='INADA Naoki', author_email='songofacandy@gmail.com', From dd44a8e55eb33cad043f24a8c0d5d6efdd0ff24c Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Tue, 14 Apr 2015 17:22:09 +0900 Subject: [PATCH 2/5] pool: Add max_open_connections option --- tornado_mysql/pools.py | 63 ++++++++++++++++++++++++++++++++---------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/tornado_mysql/pools.py b/tornado_mysql/pools.py index 7d7756f..58b3ce5 100644 --- a/tornado_mysql/pools.py +++ b/tornado_mysql/pools.py @@ -2,13 +2,14 @@ from __future__ import absolute_import, division, print_function from collections import deque -import sys import warnings from tornado.ioloop import IOLoop from tornado.gen import coroutine, Return -from tornado.concurrent import Future +from tornado.concurrent import Future, chain_future + from tornado_mysql import connect +from tornado_mysql.connections import Connection DEBUG = False @@ -32,52 +33,84 @@ def __init__(self, connect_kwargs, max_idle_connections=1, max_recycle_sec=3600, + max_open_connections=0, io_loop=None, ): """ :param dict connect_kwargs: kwargs for tornado_mysql.connect() :param int max_idle_connections: Max number of keeping connections. :param int max_recycle_sec: How long connections are recycled. + :param int max_open_connections: + Max number of opened connections. 0 means no limit. """ connect_kwargs['autocommit'] = True self.io_loop = io_loop or IOLoop.current() self.connect_kwargs = connect_kwargs - self.max_idle_connections = max_idle_connections + self.max_idle = max_idle_connections + self.max_open = max_open_connections self.max_recycle_sec = max_recycle_sec self._opened_conns = 0 self._free_conn = deque() + self._waitings = deque() + + def stat(self): + return (self._opened_conns, len(self._free_conn), len(self._waitings)) def _get_conn(self): now = self.io_loop.time() + + # Try to reuse in free pool while self._free_conn: conn = self._free_conn.popleft() if now - conn.connected_time > self.max_recycle_sec: self._close_async(conn) continue - _debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,)) + _debug("Reusing connection from pool:", self.stat()) fut = Future() fut.set_result(conn) return fut - self._opened_conns += 1 - _debug("Creating new connection (opened=%d)" % (self._opened_conns,)) - return connect(**self.connect_kwargs) + # Open new connection + if self.max_open and self._opened_conns < self.max_open: + self._opened_conns += 1 + _debug("Creating new connection:", self.stat()) + return connect(**self.connect_kwargs) + + # Wait to other connection is released. + fut = Future() + self._waitings.append(fut) + return fut def _put_conn(self, conn): - if (len(self._free_conn) < self.max_idle_connections and + if (len(self._free_conn) < self.max_idle and self.io_loop.time() - conn.connected_time < self.max_recycle_sec): - self._free_conn.append(conn) + if self._waitings: + fut = self._waitings.popleft() + fut.set_result(conn) + _debug("Passing returned connection to waiter:", self.stat()) + else: + self._free_conn.append(conn) + _debug("Add conn to free pool:", self.stat()) else: self._close_async(conn) def _close_async(self, conn): - self.io_loop.add_future(conn.close_async(), callback=lambda f: None) - self._opened_conns -= 1 + self.io_loop.add_future(conn.close_async(), callback=self._after_close) def _close_conn(self, conn): conn.close() - self._opened_conns -= 1 + self._after_close() + + def _after_close(self, fut=None): + if self._waitings: + fut = self._waitings.popleft() + conn = Connection(**self.connect_kwargs) + cf = conn.connect() + self.io_loop.add_future(cf, callback=lambda f: fut.set_result(conn)) + else: + self._opened_conns -= 1 + _debug("Connection closed:", self.stat()) @coroutine def execute(self, query, params=None): @@ -94,11 +127,11 @@ def execute(self, query, params=None): cur = conn.cursor() yield cur.execute(query, params) yield cur.close() - self._put_conn(conn) except: - self._opened_conns -= 1 - conn.close() + self._close_conn(conn) raise + else: + self._put_conn(conn) raise Return(cur) @coroutine From 45c2ed0be0a031072821a96f0baa89e8186d18e5 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Tue, 14 Apr 2015 17:22:51 +0900 Subject: [PATCH 3/5] Update examples --- example.py => example/example.py | 0 example_pool.py => example/pool.py | 0 example/pool2.py | 38 ++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+) rename example.py => example/example.py (100%) rename example_pool.py => example/pool.py (100%) create mode 100644 example/pool2.py diff --git a/example.py b/example/example.py similarity index 100% rename from example.py rename to example/example.py diff --git a/example_pool.py b/example/pool.py similarity index 100% rename from example_pool.py rename to example/pool.py diff --git a/example/pool2.py b/example/pool2.py new file mode 100644 index 0000000..f41313f --- /dev/null +++ b/example/pool2.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +from __future__ import print_function + +import random +from tornado import ioloop, gen +from tornado_mysql import pools + + +pools.DEBUG = True + + +POOL = pools.Pool( + dict(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql'), + max_idle_connections=2, + max_recycle_sec=3, + max_open_connections=5, +) + + +@gen.coroutine +def worker(n): + for i in range(20): + t = random.random() * 5 + print(n, "sleeping", t, "seconds") + cur = yield POOL.execute("SELECT SLEEP(%s)", (t,)) + print(n, cur.fetchall()) + yield gen.sleep(t) + + +@gen.coroutine +def main(): + workers = [worker(i) for i in range(10)] + yield workers + + +ioloop.IOLoop.current().run_sync(main) +print(POOL._opened_conns) + From 7b10471ec4499687ca2b7586f5d74bc134fdb5a4 Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Tue, 14 Apr 2015 23:07:14 +0900 Subject: [PATCH 4/5] Close connection when conn.begin() fails --- tornado_mysql/pools.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tornado_mysql/pools.py b/tornado_mysql/pools.py index 58b3ce5..639ae46 100644 --- a/tornado_mysql/pools.py +++ b/tornado_mysql/pools.py @@ -144,7 +144,11 @@ def begin(self): :rtype: Future """ conn = yield self._get_conn() - yield conn.begin() + try: + yield conn.begin() + except: + self._close_conn(conn) + raise trx = Transaction(self, conn) raise Return(trx) From 125c663339fb2af054f42873bef221bbff73e22e Mon Sep 17 00:00:00 2001 From: INADA Naoki Date: Tue, 14 Apr 2015 23:09:34 +0900 Subject: [PATCH 5/5] nit fix --- tornado_mysql/pools.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tornado_mysql/pools.py b/tornado_mysql/pools.py index 639ae46..bc811ef 100644 --- a/tornado_mysql/pools.py +++ b/tornado_mysql/pools.py @@ -6,7 +6,7 @@ from tornado.ioloop import IOLoop from tornado.gen import coroutine, Return -from tornado.concurrent import Future, chain_future +from tornado.concurrent import Future from tornado_mysql import connect from tornado_mysql.connections import Connection @@ -55,6 +55,7 @@ def __init__(self, self._waitings = deque() def stat(self): + """Returns (opened connections, free connections, waiters)""" return (self._opened_conns, len(self._free_conn), len(self._waitings)) def _get_conn(self):