-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5078 Convert test.test_discovery_and_monitoring to async #2093
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION, | ||
"$clusterTime": new, | ||
}, | ||
{"ok": 1, "minWireVersion": 0, "maxWireVersion": 6, "$clusterTime": new}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentional change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope! i'll undo it, sorry!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay not intentional, but it keeps getting reverted by pre-commit hooks so I think that means I should stop fighting it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd need to change it in the async file, otherwise the sync file will be overwritten by the synchro script.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am changing it in the async file...?
"test_data_lake.py", | ||
"test_discovery_and_monitoring.py", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate addition here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oop sorry about that!
|
||
|
||
class TestHeartbeatStartOrdering(AsyncPyMongoTestCase): | ||
@async_client_context.require_sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also re-write this one for async:
class TestHeartbeatStartOrdering(AsyncPyMongoTestCase):
def test_heartbeat_start_ordering(self):
events = []
def handle_client(reader, writer):
...
listener = HeartbeatEventsListListener(events)
with asyncio.start_server(handle_client, "localhost", 9999):
...
We'll still keep the current version for the sync API.
setattr(TestAllScenarios, new_test.__name__, new_test) | ||
|
||
|
||
create_tests() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's assume that #2094 will be merged before this and also make create_tests
here async-compatible.
|
||
|
||
class TestIgnoreStaleErrors(AsyncIntegrationTest): | ||
@async_client_context.require_sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one can be re-written for async:
N_TASKS = 5
barrier = asyncio.Barrier(N_TASKS)
...
for i in range(N_TASKS):
tasks.append(asyncio.create_task(insert_command(i)))
await asyncio.gather(*tasks)
We can keep the sync version instead of trying to generate it.
Co-authored-by: Noah Stapp <[email protected]>
Co-authored-by: Noah Stapp <[email protected]>
|
||
|
||
class TestIgnoreStaleErrors(AsyncIntegrationTest): | ||
if _IS_SYNC: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this branch by creating a helper method for Barrier
similar to create_async_event
and create_event
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find a create_async_event
and create_event
? but i think i did what you wanted? Let me know if I imagined it incorrectly!
pass | ||
|
||
threads = [] | ||
for i in range(N_THREADS): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, we should use ConcurrentRunner
here as well.
class TestIgnoreStaleErrors(AsyncIntegrationTest): | ||
if _IS_SYNC: | ||
|
||
async def test_ignore_stale_connection_errors(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, asyncio.Barrier
was added in Python 3.11, so we'll need to only run this async test on that version and newer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do i get it to only run in python 3.11 and newer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a block like this to the very start of the test:
if not _IS_SYNC and sys.version_info < (3, 11):
self.skipTest("Test requires asyncio.Barrier (added in Python 3.11)")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks!
server = await asyncio.start_server(handle_client, "localhost", 9999) | ||
server.events = events | ||
await server.start_serving() | ||
print(server.is_serving()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover print statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HAHA yup, sorry
serverSelectionTimeoutMS=500, | ||
event_listeners=(listener,), | ||
) | ||
if _c._options.connect: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always want to connect here, so this if
can be removed.
test/asynchronous/helpers.py
Outdated
@@ -408,7 +408,7 @@ async def run(self): | |||
finally: | |||
self.stopped = True | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe its not there anymore, sorry about that.
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION, | ||
"$clusterTime": new, | ||
}, | ||
{"ok": 1, "minWireVersion": 0, "maxWireVersion": 6, "$clusterTime": new}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'd need to change it in the async file, otherwise the sync file will be overwritten by the synchro script.
setattr(TestAllScenarios, new_test.__name__, new_test) | ||
|
||
|
||
class TestClusterTimeComparison(unittest.IsolatedAsyncioTestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class TestClusterTimeComparison(unittest.IsolatedAsyncioTestCase): | |
class TestClusterTimeComparison(AsyncPyMongoTestCase): |
await async_wait_until(lambda: len(pool.conns) == N_TASKS, "created conns") | ||
|
||
async def mock_command(*args, **kwargs): | ||
# Synchronize all threads to ensure they use the same generation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Synchronize all threads to ensure they use the same generation. | |
# Synchronize all tasks to ensure they use the same generation. |
if _IS_SYNC: | ||
server = TCPServer(("localhost", 9999), MockTCPHandler) | ||
server.events = events | ||
server_thread = threading.Thread(target=server.handle_request_and_shutdown) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use ConcurrentRunner
here and collapse this branch entirely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would using ConcurrentRunner
allow the branch to be squashed entirely? I mainly kept them separate because of the TCPServer
vs asyncio.start_server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay i don't see how i'd join TCPServer
and asyncio.start_server
but i changed threading.Thread
to be ConcurrentRunner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we don't use asyncio.start_server
at all, and only use TCPServer
and ConcurrentRunner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TCPServer.handle_request_and_shutdown()
is blocking the loop so nothing else runs once that's started >.<
Still seeing some failures on Windows. |
@@ -237,10 +251,7 @@ def create_tests(): | |||
setattr(TestAllScenarios, new_test.__name__, new_test) | |||
|
|||
|
|||
create_tests() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unintentional removal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah,, should be back now! sorry about that!
@@ -447,6 +497,5 @@ def test_heartbeat_start_ordering(self): | |||
# Generate unified tests. | |||
globals().update(generate_test_classes(os.path.join(SDAM_PATH, "unified"), module=__name__)) | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huh i think my pycharm was reformatting and removed that extra line, fixed pycharm settings and it should be back now :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just merged #1925 which adds some small changes to test_discovery_and_monitoring that will need to be applied to the async version in this PR.
Looks like one more newly added flaky test on the latest patch:
How should we deal with this one? |
I'm down to try de-bugging. But there's a part of me that feels like it'll be a similar bug to the other flakey tests? So we can make a ticket for this tests and link it to the other one? Thoughts? |
Sounds good to me. |
No description provided.