Skip to content

Commit e31df23

Browse files
author
Éric Lemoine
authored
Merge pull request #185 from peopledoc/ele_pool
Create aiopg pool lazily
2 parents 254e95b + bf97d56 commit e31df23

File tree

8 files changed

+176
-107
lines changed

8 files changed

+176
-107
lines changed

docs/reference.rst

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,8 @@ are accessible through :py:attr:`App.builtin_tasks`:
2020
Connectors
2121
----------
2222

23-
.. This does not indicate that create_with_pool* is an async classmethod because of
24-
https://github.com/sphinx-doc/sphinx/issues/7189
25-
2623
.. autoclass:: procrastinate.PostgresConnector
27-
:members: create_with_pool, create_with_pool_async, close, close_async
24+
:members: set_pool, close, close_async
2825

2926
.. autoclass:: procrastinate.testing.InMemoryConnector
3027
:members: reset

procrastinate/aiopg_connector.py

Lines changed: 82 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import psycopg2.sql
88
from psycopg2.extras import Json, RealDictCursor
99

10-
from procrastinate import connector, sql, utils
10+
from procrastinate import connector, exceptions, sql, utils
1111

1212
logger = logging.getLogger(__name__)
1313

@@ -18,70 +18,33 @@
1818
class PostgresConnector(connector.BaseConnector):
1919
def __init__(
2020
self,
21-
pool: aiopg.Pool,
21+
*,
2222
json_dumps: Optional[Callable] = None,
2323
json_loads: Optional[Callable] = None,
24+
**kwargs: Any,
2425
):
2526
"""
26-
The pool connections are expected to have jsonb adapters.
27+
Create a PostgreSQL connector. The connector uses an :py:func:`aiopg.Pool`,
28+
which is either created automatically upon first use, or set through the
29+
py:func:`PostgresConnector.set_pool` method.
2730
28-
See parameter details in :py:func:`PostgresConnector.create_with_pool`.
31+
All other arguments than ``json_dumps`` and ``json_loads`` are passed to
32+
``aiopg.create_pool`` (see aiopg documentation__), with default values that
33+
may differ from those of ``aiopg`` (see the list of parameters below).
2934
30-
Parameters
31-
----------
32-
pool:
33-
An aiopg pool, either externally configured or passed by
34-
:py:func:`PostgresConnector.create_with_pool`.
35-
36-
"""
37-
self._pool = pool
38-
self.json_dumps = json_dumps
39-
self.json_loads = json_loads
40-
41-
async def close_async(self) -> None:
42-
"""
43-
Closes the pool and awaits all connections to be released.
44-
"""
45-
self._pool.close()
46-
await self._pool.wait_closed()
47-
48-
def _wrap_json(self, arguments: Dict[str, Any]):
49-
return {
50-
key: Json(value, dumps=self.json_dumps)
51-
if isinstance(value, dict)
52-
else value
53-
for key, value in arguments.items()
54-
}
55-
56-
@classmethod
57-
async def create_with_pool_async(
58-
cls,
59-
json_dumps: Optional[Callable] = None,
60-
json_loads: Optional[Callable] = None,
61-
**kwargs,
62-
) -> aiopg.Pool:
63-
"""
64-
Creates a connector, and its connection pool, using the provided parameters.
65-
All additional parameters will be used to create a
66-
:py:func:`aiopg.Pool` (see the documentation__), sometimes with a different
67-
default value.
68-
69-
When using this method, you explicitely take the responsibility for opening the
70-
pool. It's your responsibility to call
71-
:py:func:`procrastinate.PostgresConnector.close` or
72-
:py:func:`procrastinate.PostgresConnector.close_async` to close connections
73-
when your process ends.
74-
75-
.. __: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.create_pool
7635
.. _psycopg2 doc: https://www.psycopg.org/docs/extras.html#json-adaptation
36+
.. __: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.create_pool
7737
38+
Parameters
39+
----------
7840
json_dumps:
7941
The JSON dumps function to use for serializing job arguments. Defaults to
8042
the function used by psycopg2. See the `psycopg2 doc`_.
8143
json_loads:
8244
The JSON loads function to use for deserializing job arguments. Defaults
83-
to the function used by psycopg2. See the `psycopg2 doc`_. Unused if pool
84-
is passed.
45+
to the function used by psycopg2. See the `psycopg2 doc`_. Unused if the
46+
pool is externally created and set into the connector through the
47+
:py:func:`PostgresConnector.set_pool` method.
8548
dsn (Optional[str]):
8649
Passed to aiopg. Default is "" instead of None, which means if no argument
8750
is passed, it will connect to localhost:5432 instead of a Unix-domain
@@ -102,32 +65,82 @@ async def create_with_pool_async(
10265
maxsize (int):
10366
Passed to aiopg. Cannot be lower than 2, otherwise worker won't be
10467
functionning normally (one connection for listen/notify, one for executing
105-
tasks)
68+
tasks).
69+
minsize (int):
70+
Passed to aiopg. Initial connections are not opened when the connector
71+
is created, but at first use of the pool.
10672
"""
107-
base_on_connect = kwargs.pop("on_connect", None)
73+
self._pool: Optional[aiopg.Pool] = None
74+
self.json_dumps = json_dumps
75+
self.json_loads = json_loads
76+
self._pool_args = self._adapt_pool_args(kwargs, json_loads)
77+
self._lock = asyncio.Lock()
78+
79+
@staticmethod
80+
def _adapt_pool_args(
81+
pool_args: Dict[str, Any], json_loads: Optional[Callable]
82+
) -> Dict[str, Any]:
83+
"""
84+
Adapt the pool args for ``aiopg``, using sensible defaults for Procrastinate.
85+
"""
86+
base_on_connect = pool_args.pop("on_connect", None)
10887

10988
async def on_connect(connection):
11089
if base_on_connect:
11190
await base_on_connect(connection)
11291
if json_loads:
11392
psycopg2.extras.register_default_jsonb(connection.raw, loads=json_loads)
11493

115-
defaults = {
94+
final_args = {
11695
"dsn": "",
11796
"enable_json": False,
11897
"enable_hstore": False,
11998
"enable_uuid": False,
12099
"on_connect": on_connect,
121100
"cursor_factory": RealDictCursor,
122101
}
123-
if "maxsize" in kwargs:
124-
kwargs["maxsize"] = max(2, kwargs["maxsize"])
102+
if "maxsize" in pool_args:
103+
pool_args["maxsize"] = max(2, pool_args["maxsize"])
125104

126-
defaults.update(kwargs)
105+
final_args.update(pool_args)
106+
return final_args
127107

128-
pool = await aiopg.create_pool(**defaults)
108+
async def close_async(self) -> None:
109+
"""
110+
Close the pool and awaits all connections to be released.
111+
"""
112+
if self._pool:
113+
self._pool.close()
114+
await self._pool.wait_closed()
115+
self._pool = None
129116

130-
return cls(pool=pool, json_dumps=json_dumps, json_loads=json_loads)
117+
def _wrap_json(self, arguments: Dict[str, Any]):
118+
return {
119+
key: Json(value, dumps=self.json_dumps)
120+
if isinstance(value, dict)
121+
else value
122+
for key, value in arguments.items()
123+
}
124+
125+
@staticmethod
126+
async def _create_pool(pool_args: Dict[str, Any]) -> aiopg.Pool:
127+
return await aiopg.create_pool(**pool_args)
128+
129+
def set_pool(self, pool: aiopg.Pool) -> None:
130+
"""
131+
Set the connection pool. Raises an exception if the pool is already set.
132+
"""
133+
if self._pool:
134+
raise exceptions.PoolAlreadySet
135+
self._pool = pool
136+
137+
async def _get_pool(self) -> aiopg.Pool:
138+
if self._pool:
139+
return self._pool
140+
async with self._lock:
141+
if not self._pool:
142+
self.set_pool(await self._create_pool(self._pool_args))
143+
return self._pool
131144

132145
# Pools and single connections do not exactly share their cursor API:
133146
# - connection.cursor() is an async context manager (async with)
@@ -136,7 +149,8 @@ async def on_connect(connection):
136149
# a pool or from a connection
137150

138151
async def execute_query(self, query: str, **arguments: Any) -> None:
139-
with await self._pool.cursor() as cursor:
152+
pool = await self._get_pool()
153+
with await pool.cursor() as cursor:
140154
await cursor.execute(query, self._wrap_json(arguments))
141155

142156
async def _execute_query_connection(
@@ -146,16 +160,17 @@ async def _execute_query_connection(
146160
await cursor.execute(query, self._wrap_json(arguments))
147161

148162
async def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]:
149-
with await self._pool.cursor() as cursor:
163+
pool = await self._get_pool()
164+
with await pool.cursor() as cursor:
150165
await cursor.execute(query, self._wrap_json(arguments))
151166

152167
return await cursor.fetchone()
153168

154169
async def execute_query_all(
155170
self, query: str, **arguments: Any
156171
) -> List[Dict[str, Any]]:
157-
158-
with await self._pool.cursor() as cursor:
172+
pool = await self._get_pool()
173+
with await pool.cursor() as cursor:
159174
await cursor.execute(query, self._wrap_json(arguments))
160175

161176
return await cursor.fetchall()
@@ -171,10 +186,11 @@ def make_dynamic_query(self, query: str, **identifiers: str) -> str:
171186
async def listen_notify(
172187
self, event: asyncio.Event, channels: Iterable[str]
173188
) -> NoReturn:
189+
pool = await self._get_pool()
174190
# We need to acquire a dedicated connection, and use the listen
175191
# query
176192
while True:
177-
async with self._pool.acquire() as connection:
193+
async with pool.acquire() as connection:
178194
for channel_name in channels:
179195
await self._execute_query_connection(
180196
connection=connection,
@@ -215,4 +231,4 @@ def PostgresJobStore(*args, **kwargs):
215231
)
216232
logger.warning(f"Deprecation Warning: {message}")
217233
warnings.warn(DeprecationWarning(message))
218-
return PostgresConnector.create_with_pool(*args, **kwargs)
234+
return PostgresConnector(**kwargs)

procrastinate/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,7 @@ class LoadFromPathError(ImportError, ProcrastinateException):
2020
class JobRetry(ProcrastinateException):
2121
def __init__(self, scheduled_at: datetime.datetime):
2222
self.scheduled_at = scheduled_at
23+
24+
25+
class PoolAlreadySet(ProcrastinateException):
26+
pass

procrastinate_demo/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import procrastinate
22

33
app = procrastinate.App(
4-
connector=procrastinate.PostgresConnector.create_with_pool(), # type: ignore
4+
connector=procrastinate.PostgresConnector(),
55
import_paths=["procrastinate_demo.tasks"],
66
)

tests/acceptance/app.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def decode(dct):
2525
json_loads = functools.partial(json.loads, object_hook=decode)
2626

2727
app = procrastinate.App(
28-
connector=procrastinate.PostgresConnector.create_with_pool( # type: ignore
28+
connector=procrastinate.PostgresConnector(
2929
json_dumps=json_dumps, json_loads=json_loads
3030
)
3131
)
@@ -38,6 +38,10 @@ def sum_task(a, b):
3838

3939
@app.task(queue="default")
4040
def sum_task_param(p1: Param, p2: Param):
41+
if not isinstance(p1, Param):
42+
raise Exception("wrong type for p1")
43+
if not isinstance(p2, Param):
44+
raise Exception("wrong type for p2")
4145
print(p1 + p2)
4246

4347

tests/conftest.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ def setup_db():
3737
)
3838
_execute(cursor, "CREATE DATABASE {}", "procrastinate_test_template")
3939

40-
connector = aiopg_connector.PostgresConnector.create_with_pool(
41-
dbname="procrastinate_test_template"
42-
)
40+
connector = aiopg_connector.PostgresConnector(dbname="procrastinate_test_template")
4341
schema_manager = schema.SchemaManager(connector=connector)
4442
schema_manager.apply_schema()
4543
# We need to close the psycopg2 underlying connection synchronously
@@ -84,9 +82,7 @@ async def connection(connection_params):
8482

8583
@pytest.fixture
8684
async def pg_connector(connection_params):
87-
connector = await aiopg_connector.PostgresConnector.create_with_pool_async(
88-
**connection_params
89-
)
85+
connector = aiopg_connector.PostgresConnector(**connection_params)
9086
yield connector
9187
await connector.close_async()
9288

0 commit comments

Comments
 (0)