Skip to content

Commit cd8feab

Browse files
committed
Add a workaround for bpo-37658
`asyncio.wait_for()` currently has a bug where it raises a `CancelledError` even when the wrapped awaitable has completed. The upstream fix is in python/cpython#37658. This adds a workaround until the aforementioned PR is merged, backported and released. Fixes: #467 Fixes: #547 Related: #468 Supersedes: #548
1 parent db4f1a6 commit cd8feab

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

asyncpg/compat.py

+16
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,19 @@ async def wait_closed(stream):
9090
# On Windows wait_closed() sometimes propagates
9191
# ConnectionResetError which is totally unnecessary.
9292
pass
93+
94+
95+
# Workaround for https://bugs.python.org/issue37658
96+
async def wait_for(fut, timeout):
97+
if timeout is None:
98+
return await fut
99+
100+
fut = asyncio.ensure_future(fut)
101+
102+
try:
103+
return await asyncio.wait_for(fut, timeout)
104+
except asyncio.CancelledError:
105+
if fut.done():
106+
return fut.result()
107+
else:
108+
raise

asyncpg/connect_utils.py

+2-16
Original file line numberDiff line numberDiff line change
@@ -636,18 +636,13 @@ async def _connect_addr(
636636

637637
connector = asyncio.ensure_future(connector)
638638
before = time.monotonic()
639-
try:
640-
tr, pr = await asyncio.wait_for(
641-
connector, timeout=timeout)
642-
except asyncio.CancelledError:
643-
connector.add_done_callback(_close_leaked_connection)
644-
raise
639+
tr, pr = await compat.wait_for(connector, timeout=timeout)
645640
timeout -= time.monotonic() - before
646641

647642
try:
648643
if timeout <= 0:
649644
raise asyncio.TimeoutError
650-
await asyncio.wait_for(connected, timeout=timeout)
645+
await compat.wait_for(connected, timeout=timeout)
651646
except (Exception, asyncio.CancelledError):
652647
tr.close()
653648
raise
@@ -745,12 +740,3 @@ def _create_future(loop):
745740
return asyncio.Future(loop=loop)
746741
else:
747742
return create_future()
748-
749-
750-
def _close_leaked_connection(fut):
751-
try:
752-
tr, pr = fut.result()
753-
if tr:
754-
tr.close()
755-
except asyncio.CancelledError:
756-
pass # hide the exception

asyncpg/pool.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import time
1313
import warnings
1414

15+
from . import compat
1516
from . import connection
1617
from . import connect_utils
1718
from . import exceptions
@@ -198,7 +199,7 @@ async def release(self, timeout):
198199
# If the connection is in cancellation state,
199200
# wait for the cancellation
200201
started = time.monotonic()
201-
await asyncio.wait_for(
202+
await compat.wait_for(
202203
self._con._protocol._wait_for_cancellation(),
203204
budget)
204205
if budget is not None:
@@ -623,7 +624,7 @@ async def _acquire_impl():
623624
if timeout is None:
624625
return await _acquire_impl()
625626
else:
626-
return await asyncio.wait_for(
627+
return await compat.wait_for(
627628
_acquire_impl(), timeout=timeout)
628629

629630
async def release(self, connection, *, timeout=None):

0 commit comments

Comments
 (0)