Skip to content

Commit 87a597a

Browse files
committed
Merge pull request #5 from PyMySQL/pool-max-open
Add max_open_connections to pool
2 parents fe21619 + 125c663 commit 87a597a

File tree

4 files changed

+91
-15
lines changed

4 files changed

+91
-15
lines changed
File renamed without changes.
File renamed without changes.

example/pool2.py

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python
2+
from __future__ import print_function
3+
4+
import random
5+
from tornado import ioloop, gen
6+
from tornado_mysql import pools
7+
8+
9+
pools.DEBUG = True
10+
11+
12+
POOL = pools.Pool(
13+
dict(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql'),
14+
max_idle_connections=2,
15+
max_recycle_sec=3,
16+
max_open_connections=5,
17+
)
18+
19+
20+
@gen.coroutine
21+
def worker(n):
22+
for i in range(20):
23+
t = random.random() * 5
24+
print(n, "sleeping", t, "seconds")
25+
cur = yield POOL.execute("SELECT SLEEP(%s)", (t,))
26+
print(n, cur.fetchall())
27+
yield gen.sleep(t)
28+
29+
30+
@gen.coroutine
31+
def main():
32+
workers = [worker(i) for i in range(10)]
33+
yield workers
34+
35+
36+
ioloop.IOLoop.current().run_sync(main)
37+
print(POOL._opened_conns)
38+

tornado_mysql/pools.py

+53-15
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
from __future__ import absolute_import, division, print_function
33

44
from collections import deque
5-
import sys
65
import warnings
76

87
from tornado.ioloop import IOLoop
98
from tornado.gen import coroutine, Return
109
from tornado.concurrent import Future
10+
1111
from tornado_mysql import connect
12+
from tornado_mysql.connections import Connection
1213

1314

1415
DEBUG = False
@@ -32,52 +33,85 @@ def __init__(self,
3233
connect_kwargs,
3334
max_idle_connections=1,
3435
max_recycle_sec=3600,
36+
max_open_connections=0,
3537
io_loop=None,
3638
):
3739
"""
3840
:param dict connect_kwargs: kwargs for tornado_mysql.connect()
3941
:param int max_idle_connections: Max number of keeping connections.
4042
:param int max_recycle_sec: How long connections are recycled.
43+
:param int max_open_connections:
44+
Max number of opened connections. 0 means no limit.
4145
"""
4246
connect_kwargs['autocommit'] = True
4347
self.io_loop = io_loop or IOLoop.current()
4448
self.connect_kwargs = connect_kwargs
45-
self.max_idle_connections = max_idle_connections
49+
self.max_idle = max_idle_connections
50+
self.max_open = max_open_connections
4651
self.max_recycle_sec = max_recycle_sec
4752

4853
self._opened_conns = 0
4954
self._free_conn = deque()
55+
self._waitings = deque()
56+
57+
def stat(self):
58+
"""Returns (opened connections, free connections, waiters)"""
59+
return (self._opened_conns, len(self._free_conn), len(self._waitings))
5060

5161
def _get_conn(self):
5262
now = self.io_loop.time()
63+
64+
# Try to reuse in free pool
5365
while self._free_conn:
5466
conn = self._free_conn.popleft()
5567
if now - conn.connected_time > self.max_recycle_sec:
5668
self._close_async(conn)
5769
continue
58-
_debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,))
70+
_debug("Reusing connection from pool:", self.stat())
5971
fut = Future()
6072
fut.set_result(conn)
6173
return fut
6274

63-
self._opened_conns += 1
64-
_debug("Creating new connection (opened=%d)" % (self._opened_conns,))
65-
return connect(**self.connect_kwargs)
75+
# Open new connection
76+
if self.max_open and self._opened_conns < self.max_open:
77+
self._opened_conns += 1
78+
_debug("Creating new connection:", self.stat())
79+
return connect(**self.connect_kwargs)
80+
81+
# Wait to other connection is released.
82+
fut = Future()
83+
self._waitings.append(fut)
84+
return fut
6685

6786
def _put_conn(self, conn):
68-
if (len(self._free_conn) < self.max_idle_connections and
87+
if (len(self._free_conn) < self.max_idle and
6988
self.io_loop.time() - conn.connected_time < self.max_recycle_sec):
70-
self._free_conn.append(conn)
89+
if self._waitings:
90+
fut = self._waitings.popleft()
91+
fut.set_result(conn)
92+
_debug("Passing returned connection to waiter:", self.stat())
93+
else:
94+
self._free_conn.append(conn)
95+
_debug("Add conn to free pool:", self.stat())
7196
else:
7297
self._close_async(conn)
7398

7499
def _close_async(self, conn):
75-
self.io_loop.add_future(conn.close_async(), callback=lambda f: None)
76-
self._opened_conns -= 1
100+
self.io_loop.add_future(conn.close_async(), callback=self._after_close)
77101

78102
def _close_conn(self, conn):
79103
conn.close()
80-
self._opened_conns -= 1
104+
self._after_close()
105+
106+
def _after_close(self, fut=None):
107+
if self._waitings:
108+
fut = self._waitings.popleft()
109+
conn = Connection(**self.connect_kwargs)
110+
cf = conn.connect()
111+
self.io_loop.add_future(cf, callback=lambda f: fut.set_result(conn))
112+
else:
113+
self._opened_conns -= 1
114+
_debug("Connection closed:", self.stat())
81115

82116
@coroutine
83117
def execute(self, query, params=None):
@@ -94,11 +128,11 @@ def execute(self, query, params=None):
94128
cur = conn.cursor()
95129
yield cur.execute(query, params)
96130
yield cur.close()
97-
self._put_conn(conn)
98131
except:
99-
self._opened_conns -= 1
100-
conn.close()
132+
self._close_conn(conn)
101133
raise
134+
else:
135+
self._put_conn(conn)
102136
raise Return(cur)
103137

104138
@coroutine
@@ -111,7 +145,11 @@ def begin(self):
111145
:rtype: Future
112146
"""
113147
conn = yield self._get_conn()
114-
yield conn.begin()
148+
try:
149+
yield conn.begin()
150+
except:
151+
self._close_conn(conn)
152+
raise
115153
trx = Transaction(self, conn)
116154
raise Return(trx)
117155

0 commit comments

Comments
 (0)