Skip to content

Add max_open_connections to pool #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
38 changes: 38 additions & 0 deletions example/pool2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python
from __future__ import print_function

import random
from tornado import ioloop, gen
from tornado_mysql import pools


pools.DEBUG = True


POOL = pools.Pool(
dict(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql'),
max_idle_connections=2,
max_recycle_sec=3,
max_open_connections=5,
)


@gen.coroutine
def worker(n):
for i in range(20):
t = random.random() * 5
print(n, "sleeping", t, "seconds")
cur = yield POOL.execute("SELECT SLEEP(%s)", (t,))
print(n, cur.fetchall())
yield gen.sleep(t)


@gen.coroutine
def main():
workers = [worker(i) for i in range(10)]
yield workers


ioloop.IOLoop.current().run_sync(main)
print(POOL._opened_conns)

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setup(
name="Tornado-MySQL",
version="0.3",
version="0.4",
url='https://github.com/PyMySQL/Tornado-MySQL',
author='INADA Naoki',
author_email='[email protected]',
Expand Down
68 changes: 53 additions & 15 deletions tornado_mysql/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
from __future__ import absolute_import, division, print_function

from collections import deque
import sys
import warnings

from tornado.ioloop import IOLoop
from tornado.gen import coroutine, Return
from tornado.concurrent import Future

from tornado_mysql import connect
from tornado_mysql.connections import Connection


DEBUG = False
Expand All @@ -32,52 +33,85 @@ def __init__(self,
connect_kwargs,
max_idle_connections=1,
max_recycle_sec=3600,
max_open_connections=0,
io_loop=None,
):
"""
:param dict connect_kwargs: kwargs for tornado_mysql.connect()
:param int max_idle_connections: Max number of keeping connections.
:param int max_recycle_sec: How long connections are recycled.
:param int max_open_connections:
Max number of opened connections. 0 means no limit.
"""
connect_kwargs['autocommit'] = True
self.io_loop = io_loop or IOLoop.current()
self.connect_kwargs = connect_kwargs
self.max_idle_connections = max_idle_connections
self.max_idle = max_idle_connections
self.max_open = max_open_connections
self.max_recycle_sec = max_recycle_sec

self._opened_conns = 0
self._free_conn = deque()
self._waitings = deque()

def stat(self):
"""Returns (opened connections, free connections, waiters)"""
return (self._opened_conns, len(self._free_conn), len(self._waitings))

def _get_conn(self):
now = self.io_loop.time()

# Try to reuse in free pool
while self._free_conn:
conn = self._free_conn.popleft()
if now - conn.connected_time > self.max_recycle_sec:
self._close_async(conn)
continue
_debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,))
_debug("Reusing connection from pool:", self.stat())
fut = Future()
fut.set_result(conn)
return fut

self._opened_conns += 1
_debug("Creating new connection (opened=%d)" % (self._opened_conns,))
return connect(**self.connect_kwargs)
# Open new connection
if self.max_open and self._opened_conns < self.max_open:
self._opened_conns += 1
_debug("Creating new connection:", self.stat())
return connect(**self.connect_kwargs)

# Wait to other connection is released.
fut = Future()
self._waitings.append(fut)
return fut

def _put_conn(self, conn):
if (len(self._free_conn) < self.max_idle_connections and
if (len(self._free_conn) < self.max_idle and
self.io_loop.time() - conn.connected_time < self.max_recycle_sec):
self._free_conn.append(conn)
if self._waitings:
fut = self._waitings.popleft()
fut.set_result(conn)
_debug("Passing returned connection to waiter:", self.stat())
else:
self._free_conn.append(conn)
_debug("Add conn to free pool:", self.stat())
else:
self._close_async(conn)

def _close_async(self, conn):
self.io_loop.add_future(conn.close_async(), callback=lambda f: None)
self._opened_conns -= 1
self.io_loop.add_future(conn.close_async(), callback=self._after_close)

def _close_conn(self, conn):
conn.close()
self._opened_conns -= 1
self._after_close()

def _after_close(self, fut=None):
if self._waitings:
fut = self._waitings.popleft()
conn = Connection(**self.connect_kwargs)
cf = conn.connect()
self.io_loop.add_future(cf, callback=lambda f: fut.set_result(conn))
else:
self._opened_conns -= 1
_debug("Connection closed:", self.stat())

@coroutine
def execute(self, query, params=None):
Expand All @@ -94,11 +128,11 @@ def execute(self, query, params=None):
cur = conn.cursor()
yield cur.execute(query, params)
yield cur.close()
self._put_conn(conn)
except:
self._opened_conns -= 1
conn.close()
self._close_conn(conn)
raise
else:
self._put_conn(conn)
raise Return(cur)

@coroutine
Expand All @@ -111,7 +145,11 @@ def begin(self):
:rtype: Future
"""
conn = yield self._get_conn()
yield conn.begin()
try:
yield conn.begin()
except:
self._close_conn(conn)
raise
trx = Transaction(self, conn)
raise Return(trx)

Expand Down