Skip to content

Commit dd44a8e

Browse files
committed
pool: Add max_open_connections option
1 parent fe21619 commit dd44a8e

File tree

1 file changed

+48
-15
lines changed

1 file changed

+48
-15
lines changed

tornado_mysql/pools.py

+48-15
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
from __future__ import absolute_import, division, print_function
33

44
from collections import deque
5-
import sys
65
import warnings
76

87
from tornado.ioloop import IOLoop
98
from tornado.gen import coroutine, Return
10-
from tornado.concurrent import Future
9+
from tornado.concurrent import Future, chain_future
10+
1111
from tornado_mysql import connect
12+
from tornado_mysql.connections import Connection
1213

1314

1415
DEBUG = False
@@ -32,52 +33,84 @@ def __init__(self,
3233
connect_kwargs,
3334
max_idle_connections=1,
3435
max_recycle_sec=3600,
36+
max_open_connections=0,
3537
io_loop=None,
3638
):
3739
"""
3840
:param dict connect_kwargs: kwargs for tornado_mysql.connect()
3941
:param int max_idle_connections: Max number of keeping connections.
4042
:param int max_recycle_sec: How long connections are recycled.
43+
:param int max_open_connections:
44+
Max number of opened connections. 0 means no limit.
4145
"""
4246
connect_kwargs['autocommit'] = True
4347
self.io_loop = io_loop or IOLoop.current()
4448
self.connect_kwargs = connect_kwargs
45-
self.max_idle_connections = max_idle_connections
49+
self.max_idle = max_idle_connections
50+
self.max_open = max_open_connections
4651
self.max_recycle_sec = max_recycle_sec
4752

4853
self._opened_conns = 0
4954
self._free_conn = deque()
55+
self._waitings = deque()
56+
57+
def stat(self):
58+
return (self._opened_conns, len(self._free_conn), len(self._waitings))
5059

5160
def _get_conn(self):
5261
now = self.io_loop.time()
62+
63+
# Try to reuse in free pool
5364
while self._free_conn:
5465
conn = self._free_conn.popleft()
5566
if now - conn.connected_time > self.max_recycle_sec:
5667
self._close_async(conn)
5768
continue
58-
_debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,))
69+
_debug("Reusing connection from pool:", self.stat())
5970
fut = Future()
6071
fut.set_result(conn)
6172
return fut
6273

63-
self._opened_conns += 1
64-
_debug("Creating new connection (opened=%d)" % (self._opened_conns,))
65-
return connect(**self.connect_kwargs)
74+
# Open new connection
75+
if self.max_open and self._opened_conns < self.max_open:
76+
self._opened_conns += 1
77+
_debug("Creating new connection:", self.stat())
78+
return connect(**self.connect_kwargs)
79+
80+
# Wait to other connection is released.
81+
fut = Future()
82+
self._waitings.append(fut)
83+
return fut
6684

6785
def _put_conn(self, conn):
68-
if (len(self._free_conn) < self.max_idle_connections and
86+
if (len(self._free_conn) < self.max_idle and
6987
self.io_loop.time() - conn.connected_time < self.max_recycle_sec):
70-
self._free_conn.append(conn)
88+
if self._waitings:
89+
fut = self._waitings.popleft()
90+
fut.set_result(conn)
91+
_debug("Passing returned connection to waiter:", self.stat())
92+
else:
93+
self._free_conn.append(conn)
94+
_debug("Add conn to free pool:", self.stat())
7195
else:
7296
self._close_async(conn)
7397

7498
def _close_async(self, conn):
75-
self.io_loop.add_future(conn.close_async(), callback=lambda f: None)
76-
self._opened_conns -= 1
99+
self.io_loop.add_future(conn.close_async(), callback=self._after_close)
77100

78101
def _close_conn(self, conn):
79102
conn.close()
80-
self._opened_conns -= 1
103+
self._after_close()
104+
105+
def _after_close(self, fut=None):
106+
if self._waitings:
107+
fut = self._waitings.popleft()
108+
conn = Connection(**self.connect_kwargs)
109+
cf = conn.connect()
110+
self.io_loop.add_future(cf, callback=lambda f: fut.set_result(conn))
111+
else:
112+
self._opened_conns -= 1
113+
_debug("Connection closed:", self.stat())
81114

82115
@coroutine
83116
def execute(self, query, params=None):
@@ -94,11 +127,11 @@ def execute(self, query, params=None):
94127
cur = conn.cursor()
95128
yield cur.execute(query, params)
96129
yield cur.close()
97-
self._put_conn(conn)
98130
except:
99-
self._opened_conns -= 1
100-
conn.close()
131+
self._close_conn(conn)
101132
raise
133+
else:
134+
self._put_conn(conn)
102135
raise Return(cur)
103136

104137
@coroutine

0 commit comments

Comments
 (0)