-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-4864 - Create async version of SpecRunnerThread #2094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
69d657e
05bb77c
1687553
abb3880
e3f2f26
a287c61
3a678d9
9fddf51
25e31db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not loving the duplicated code between sync and async but i'm guessing its because the async version needs some more methods? If so, then i understand and can live with it >.< There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The async version doesn't implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did some refactoring, much less duplication now. Great call-out! |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,46 +47,68 @@ | |
from pymongo.asynchronous.command_cursor import AsyncCommandCursor | ||
from pymongo.asynchronous.cursor import AsyncCursor | ||
from pymongo.errors import AutoReconnect, BulkWriteError, OperationFailure, PyMongoError | ||
from pymongo.lock import _async_create_condition, _async_create_lock | ||
from pymongo.read_concern import ReadConcern | ||
from pymongo.read_preferences import ReadPreference | ||
from pymongo.results import BulkWriteResult, _WriteResult | ||
from pymongo.write_concern import WriteConcern | ||
|
||
_IS_SYNC = False | ||
|
||
if _IS_SYNC: | ||
PARENT = threading.Thread | ||
else: | ||
PARENT = object | ||
|
||
class SpecRunnerThread(threading.Thread): | ||
|
||
class SpecRunnerTask(PARENT): | ||
def __init__(self, name): | ||
super().__init__() | ||
self.name = name | ||
self.exc = None | ||
self.daemon = True | ||
self.cond = threading.Condition() | ||
self.cond = _async_create_condition(_async_create_lock()) | ||
self.ops = [] | ||
self.stopped = False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stopped is never set to True by this class. |
||
self.task = None | ||
|
||
if not _IS_SYNC: | ||
|
||
async def start(self): | ||
self.task = asyncio.create_task(self.run(), name=self.name) | ||
|
||
def schedule(self, work): | ||
async def join(self, timeout: float | None = 0): # type: ignore[override] | ||
if self.task is not None: | ||
await asyncio.wait([self.task], timeout=timeout) | ||
|
||
def is_alive(self): | ||
return not self.stopped | ||
|
||
async def schedule(self, work): | ||
self.ops.append(work) | ||
with self.cond: | ||
async with self.cond: | ||
self.cond.notify() | ||
|
||
def stop(self): | ||
async def stop(self): | ||
self.stopped = True | ||
with self.cond: | ||
async with self.cond: | ||
self.cond.notify() | ||
|
||
def run(self): | ||
async def run(self): | ||
while not self.stopped or self.ops: | ||
if not self.ops: | ||
with self.cond: | ||
self.cond.wait(10) | ||
async with self.cond: | ||
if _IS_SYNC: | ||
await self.cond.wait(10) # type: ignore[call-arg] | ||
else: | ||
await asyncio.wait_for(self.cond.wait(), timeout=10) # type: ignore[arg-type] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use our |
||
if self.ops: | ||
try: | ||
work = self.ops.pop(0) | ||
work() | ||
await work() | ||
except Exception as exc: | ||
self.exc = exc | ||
self.stop() | ||
await self.stop() | ||
|
||
|
||
class AsyncSpecTestCreator: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we running any async unified tests that use runOnThread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SDAM unified tests are the only ones that use
runOnThread
. Those are currently slated to be converted to async, yes.