Skip to content

PYTHON-5078 Convert test.test_discovery_and_monitoring to async #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Mar 3, 2025

Conversation

sleepyStick
Copy link
Contributor

No description provided.

@sleepyStick sleepyStick requested a review from NoahStapp January 29, 2025 20:40
@sleepyStick sleepyStick marked this pull request as ready for review January 29, 2025 21:04
"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
"$clusterTime": new,
},
{"ok": 1, "minWireVersion": 0, "maxWireVersion": 6, "$clusterTime": new},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope! i'll undo it, sorry!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay not intentional, but it keeps getting reverted by pre-commit hooks so I think that means I should stop fighting it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd need to change it in the async file, otherwise the sync file will be overwritten by the synchro script.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am changing it in the async file...?

"test_data_lake.py",
"test_discovery_and_monitoring.py",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate addition here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oop sorry about that!



class TestHeartbeatStartOrdering(AsyncPyMongoTestCase):
@async_client_context.require_sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also re-write this one for async:

class TestHeartbeatStartOrdering(AsyncPyMongoTestCase):
    def test_heartbeat_start_ordering(self):
        events = []

        def handle_client(reader, writer):
            ...

        listener = HeartbeatEventsListListener(events)

        with asyncio.start_server(handle_client, "localhost", 9999):
           ...

We'll still keep the current version for the sync API.

setattr(TestAllScenarios, new_test.__name__, new_test)


create_tests()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's assume that #2094 will be merged before this and also make create_tests here async-compatible.



class TestIgnoreStaleErrors(AsyncIntegrationTest):
@async_client_context.require_sync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one can be re-written for async:

N_TASKS = 5
barrier = asyncio.Barrier(N_TASKS)
...
for i in range(N_TASKS):
    tasks.append(asyncio.create_task(insert_command(i)))

await asyncio.gather(*tasks)

We can keep the sync version instead of trying to generate it.

@sleepyStick sleepyStick changed the title PYTHOH-5078 Convert test.test_discovery_and_monitoring to async PYTHON-5078 Convert test.test_discovery_and_monitoring to async Feb 6, 2025
@sleepyStick sleepyStick requested a review from NoahStapp February 7, 2025 00:25


class TestIgnoreStaleErrors(AsyncIntegrationTest):
if _IS_SYNC:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this branch by creating a helper method for Barrier similar to create_async_event and create_event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a create_async_event and create_event? but i think i did what you wanted? Let me know if I imagined it incorrectly!

pass

threads = []
for i in range(N_THREADS):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, we should use ConcurrentRunner here as well.

class TestIgnoreStaleErrors(AsyncIntegrationTest):
if _IS_SYNC:

async def test_ignore_stale_connection_errors(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, asyncio.Barrier was added in Python 3.11, so we'll need to only run this async test on that version and newer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do i get it to only run in python 3.11 and newer?

Copy link
Contributor

@NoahStapp NoahStapp Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a block like this to the very start of the test:

if not _IS_SYNC and sys.version_info < (3, 11):
    self.skipTest("Test requires asyncio.Barrier (added in Python 3.11)")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks!

server = await asyncio.start_server(handle_client, "localhost", 9999)
server.events = events
await server.start_serving()
print(server.is_serving())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover print statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HAHA yup, sorry

serverSelectionTimeoutMS=500,
event_listeners=(listener,),
)
if _c._options.connect:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always want to connect here, so this if can be removed.

@@ -408,7 +408,7 @@ async def run(self):
finally:
self.stopped = True


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe its not there anymore, sorry about that.

"maxWireVersion": common.MIN_SUPPORTED_WIRE_VERSION,
"$clusterTime": new,
},
{"ok": 1, "minWireVersion": 0, "maxWireVersion": 6, "$clusterTime": new},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd need to change it in the async file, otherwise the sync file will be overwritten by the synchro script.

setattr(TestAllScenarios, new_test.__name__, new_test)


class TestClusterTimeComparison(unittest.IsolatedAsyncioTestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class TestClusterTimeComparison(unittest.IsolatedAsyncioTestCase):
class TestClusterTimeComparison(AsyncPyMongoTestCase):

await async_wait_until(lambda: len(pool.conns) == N_TASKS, "created conns")

async def mock_command(*args, **kwargs):
# Synchronize all threads to ensure they use the same generation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Synchronize all threads to ensure they use the same generation.
# Synchronize all tasks to ensure they use the same generation.

if _IS_SYNC:
server = TCPServer(("localhost", 9999), MockTCPHandler)
server.events = events
server_thread = threading.Thread(target=server.handle_request_and_shutdown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ConcurrentRunner here and collapse this branch entirely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would using ConcurrentRunner allow the branch to be squashed entirely? I mainly kept them separate because of the TCPServer vs asyncio.start_server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay i don't see how i'd join TCPServer and asyncio.start_server but i changed threading.Thread to be ConcurrentRunner

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we don't use asyncio.start_server at all, and only use TCPServer and ConcurrentRunner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TCPServer.handle_request_and_shutdown() is blocking the loop so nothing else runs once that's started >.<

@NoahStapp
Copy link
Contributor

Still seeing some failures on Windows.

@@ -237,10 +251,7 @@ def create_tests():
setattr(TestAllScenarios, new_test.__name__, new_test)


create_tests()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unintentional removal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah,, should be back now! sorry about that!

@@ -447,6 +497,5 @@ def test_heartbeat_start_ordering(self):
# Generate unified tests.
globals().update(generate_test_classes(os.path.join(SDAM_PATH, "unified"), module=__name__))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh i think my pycharm was reformatting and removed that extra line, fixed pycharm settings and it should be back now :)

Copy link
Member

@ShaneHarvey ShaneHarvey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just merged #1925 which adds some small changes to test_discovery_and_monitoring that will need to be applied to the async version in this PR.

@ShaneHarvey
Copy link
Member

Looks like one more newly added flaky test on the latest patch:

 [2025/02/28 14:01:50.193] _____ TestUnifiedHelloTimeout.test_Driver_extends_timeout_while_streaming _____
 [2025/02/28 14:01:50.193] self = <test.asynchronous.test_discovery_and_monitoring.TestUnifiedHelloTimeout testMethod=test_Driver_extends_timeout_while_streaming>
 [2025/02/28 14:01:50.193]     async def test_case(self):
 [2025/02/28 14:01:50.193] >       await self.run_scenario(spec)
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1474:
 [2025/02/28 14:01:50.193] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1416: in run_scenario
 [2025/02/28 14:01:50.193]     await self._run_scenario(spec, uri)
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1451: in _run_scenario
 [2025/02/28 14:01:50.193]     await self.run_operations(spec["operations"])
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1251: in run_operations
 [2025/02/28 14:01:50.193]     await self.run_special_operation(op)
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1246: in run_special_operation
 [2025/02/28 14:01:50.193]     method(spec["arguments"])
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1134: in _testOperation_assertEventCount
 [2025/02/28 14:01:50.193]     self.assertEqual(self._event_count(client, event), count, f"expected {count} not {event!r}")
 [2025/02/28 14:01:50.193] E   AssertionError: 1 != 0 : expected 0 not {'serverDescriptionChangedEvent': {'newDescription': {'type': 'Unknown'}}}

https://spruce.mongodb.com/task/mongo_python_driver_test_win64_python3.9_test_latest_sharded_cluster_auth_ssl_async_patch_c9a85ad321f98caf314fc9da7a367e94d661abac_67c229ce475953000739759c_25_02_28_21_25_35/logs?execution=0&sortBy=STATUS&sortDir=ASC

How should we deal with this one?

@sleepyStick
Copy link
Contributor Author

Looks like one more newly added flaky test on the latest patch:

 [2025/02/28 14:01:50.193] _____ TestUnifiedHelloTimeout.test_Driver_extends_timeout_while_streaming _____
 [2025/02/28 14:01:50.193] self = <test.asynchronous.test_discovery_and_monitoring.TestUnifiedHelloTimeout testMethod=test_Driver_extends_timeout_while_streaming>
 [2025/02/28 14:01:50.193]     async def test_case(self):
 [2025/02/28 14:01:50.193] >       await self.run_scenario(spec)
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1474:
 [2025/02/28 14:01:50.193] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1416: in run_scenario
 [2025/02/28 14:01:50.193]     await self._run_scenario(spec, uri)
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1451: in _run_scenario
 [2025/02/28 14:01:50.193]     await self.run_operations(spec["operations"])
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1251: in run_operations
 [2025/02/28 14:01:50.193]     await self.run_special_operation(op)
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1246: in run_special_operation
 [2025/02/28 14:01:50.193]     method(spec["arguments"])
 [2025/02/28 14:01:50.193] test\asynchronous\unified_format.py:1134: in _testOperation_assertEventCount
 [2025/02/28 14:01:50.193]     self.assertEqual(self._event_count(client, event), count, f"expected {count} not {event!r}")
 [2025/02/28 14:01:50.193] E   AssertionError: 1 != 0 : expected 0 not {'serverDescriptionChangedEvent': {'newDescription': {'type': 'Unknown'}}}

https://spruce.mongodb.com/task/mongo_python_driver_test_win64_python3.9_test_latest_sharded_cluster_auth_ssl_async_patch_c9a85ad321f98caf314fc9da7a367e94d661abac_67c229ce475953000739759c_25_02_28_21_25_35/logs?execution=0&sortBy=STATUS&sortDir=ASC

How should we deal with this one?

I'm down to try de-bugging. But there's a part of me that feels like it'll be a similar bug to the other flakey tests? So we can make a ticket for this tests and link it to the other one? Thoughts?

@ShaneHarvey
Copy link
Member

Sounds good to me.

@sleepyStick sleepyStick merged commit 150a3ba into mongodb:master Mar 3, 2025
46 of 51 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants