diff --git a/example.py b/example/example.py similarity index 100% rename from example.py rename to example/example.py diff --git a/example_pool.py b/example/pool.py similarity index 100% rename from example_pool.py rename to example/pool.py diff --git a/example/pool2.py b/example/pool2.py new file mode 100644 index 0000000..f41313f --- /dev/null +++ b/example/pool2.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +from __future__ import print_function + +import random +from tornado import ioloop, gen +from tornado_mysql import pools + + +pools.DEBUG = True + + +POOL = pools.Pool( + dict(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql'), + max_idle_connections=2, + max_recycle_sec=3, + max_open_connections=5, +) + + +@gen.coroutine +def worker(n): + for i in range(20): + t = random.random() * 5 + print(n, "sleeping", t, "seconds") + cur = yield POOL.execute("SELECT SLEEP(%s)", (t,)) + print(n, cur.fetchall()) + yield gen.sleep(t) + + +@gen.coroutine +def main(): + workers = [worker(i) for i in range(10)] + yield workers + + +ioloop.IOLoop.current().run_sync(main) +print(POOL._opened_conns) + diff --git a/setup.py b/setup.py index 6cd1632..c545f0d 100755 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ setup( name="Tornado-MySQL", - version="0.3", + version="0.4", url='https://github.com/PyMySQL/Tornado-MySQL', author='INADA Naoki', author_email='songofacandy@gmail.com', diff --git a/tornado_mysql/pools.py b/tornado_mysql/pools.py index 7d7756f..bc811ef 100644 --- a/tornado_mysql/pools.py +++ b/tornado_mysql/pools.py @@ -2,13 +2,14 @@ from __future__ import absolute_import, division, print_function from collections import deque -import sys import warnings from tornado.ioloop import IOLoop from tornado.gen import coroutine, Return from tornado.concurrent import Future + from tornado_mysql import connect +from tornado_mysql.connections import Connection DEBUG = False @@ -32,52 +33,85 @@ def __init__(self, connect_kwargs, max_idle_connections=1, max_recycle_sec=3600, + max_open_connections=0, io_loop=None, ): """ :param dict connect_kwargs: kwargs for tornado_mysql.connect() :param int max_idle_connections: Max number of keeping connections. :param int max_recycle_sec: How long connections are recycled. + :param int max_open_connections: + Max number of opened connections. 0 means no limit. """ connect_kwargs['autocommit'] = True self.io_loop = io_loop or IOLoop.current() self.connect_kwargs = connect_kwargs - self.max_idle_connections = max_idle_connections + self.max_idle = max_idle_connections + self.max_open = max_open_connections self.max_recycle_sec = max_recycle_sec self._opened_conns = 0 self._free_conn = deque() + self._waitings = deque() + + def stat(self): + """Returns (opened connections, free connections, waiters)""" + return (self._opened_conns, len(self._free_conn), len(self._waitings)) def _get_conn(self): now = self.io_loop.time() + + # Try to reuse in free pool while self._free_conn: conn = self._free_conn.popleft() if now - conn.connected_time > self.max_recycle_sec: self._close_async(conn) continue - _debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,)) + _debug("Reusing connection from pool:", self.stat()) fut = Future() fut.set_result(conn) return fut - self._opened_conns += 1 - _debug("Creating new connection (opened=%d)" % (self._opened_conns,)) - return connect(**self.connect_kwargs) + # Open new connection + if self.max_open and self._opened_conns < self.max_open: + self._opened_conns += 1 + _debug("Creating new connection:", self.stat()) + return connect(**self.connect_kwargs) + + # Wait to other connection is released. + fut = Future() + self._waitings.append(fut) + return fut def _put_conn(self, conn): - if (len(self._free_conn) < self.max_idle_connections and + if (len(self._free_conn) < self.max_idle and self.io_loop.time() - conn.connected_time < self.max_recycle_sec): - self._free_conn.append(conn) + if self._waitings: + fut = self._waitings.popleft() + fut.set_result(conn) + _debug("Passing returned connection to waiter:", self.stat()) + else: + self._free_conn.append(conn) + _debug("Add conn to free pool:", self.stat()) else: self._close_async(conn) def _close_async(self, conn): - self.io_loop.add_future(conn.close_async(), callback=lambda f: None) - self._opened_conns -= 1 + self.io_loop.add_future(conn.close_async(), callback=self._after_close) def _close_conn(self, conn): conn.close() - self._opened_conns -= 1 + self._after_close() + + def _after_close(self, fut=None): + if self._waitings: + fut = self._waitings.popleft() + conn = Connection(**self.connect_kwargs) + cf = conn.connect() + self.io_loop.add_future(cf, callback=lambda f: fut.set_result(conn)) + else: + self._opened_conns -= 1 + _debug("Connection closed:", self.stat()) @coroutine def execute(self, query, params=None): @@ -94,11 +128,11 @@ def execute(self, query, params=None): cur = conn.cursor() yield cur.execute(query, params) yield cur.close() - self._put_conn(conn) except: - self._opened_conns -= 1 - conn.close() + self._close_conn(conn) raise + else: + self._put_conn(conn) raise Return(cur) @coroutine @@ -111,7 +145,11 @@ def begin(self): :rtype: Future """ conn = yield self._get_conn() - yield conn.begin() + try: + yield conn.begin() + except: + self._close_conn(conn) + raise trx = Transaction(self, conn) raise Return(trx)