-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-5078 Convert test.test_discovery_and_monitoring to async #2093
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
2e62605
Convert test.test_discovery_and_monitoring to async
sleepyStick 3b63840
modify path
sleepyStick a2b75d5
Merge branch 'master' into PYTHON-5078
sleepyStick 004f8f0
Update test/asynchronous/test_discovery_and_monitoring.py
sleepyStick 70d09d3
Update test/asynchronous/test_discovery_and_monitoring.py
sleepyStick 568cc64
debugging..
sleepyStick d7a4a28
Merge branch 'master' into PYTHON-5078
sleepyStick 47d9ebd
make test_heartbeat_start_ordering async
sleepyStick 24d30af
address review
sleepyStick 9b3cd72
Merge branch 'master' into PYTHON-5078
sleepyStick ffb07b3
fix typing and lint
sleepyStick 1c1ebef
fix typing
sleepyStick d00eb45
test_ignore_stale_connection_errors should only run on python3.11 and…
sleepyStick f22e84b
Merge branch 'master' into PYTHON-5078
sleepyStick 8d0921b
fix imports
sleepyStick d314742
Merge branch 'PYTHON-5078' of github.com:sleepyStick/mongo-python-dri…
sleepyStick a5c8930
undo changes to hello response
sleepyStick 041e4e7
Merge branch 'master' into PYTHON-5078
sleepyStick 17d0e4c
address review -ish
sleepyStick 31d2d81
Merge branch 'master' into PYTHON-5078
sleepyStick c4c9e00
undo accidental changes
sleepyStick 1e6016e
Merge branch 'master' into PYTHON-5078
sleepyStick c454ade
Merge branch 'master' into PYTHON-5078
sleepyStick a3465e9
retrying the tests because they're flakey
sleepyStick 7aab775
Merge branch 'master' into PYTHON-5078
sleepyStick 9484d7a
Merge branch 'master' into PYTHON-5078
sleepyStick c4beba1
update discovery and monitoring based on #1925
sleepyStick 9b4fd45
skip flakey test for now
sleepyStick c0e37e5
remove import
sleepyStick 81f7a1b
another flakey test to skip
sleepyStick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -22,6 +22,7 @@ | |||||
import threading | ||||||
from asyncio import StreamReader, StreamWriter | ||||||
from pathlib import Path | ||||||
from test.asynchronous.helpers import ConcurrentRunner, async_barrier_wait, async_create_barrier | ||||||
|
||||||
sys.path[0:0] = [""] | ||||||
|
||||||
|
@@ -275,84 +276,44 @@ async def send_cluster_time(time, inc, should_update): | |||||
|
||||||
|
||||||
class TestIgnoreStaleErrors(AsyncIntegrationTest): | ||||||
if _IS_SYNC: | ||||||
|
||||||
async def test_ignore_stale_connection_errors(self): | ||||||
N_THREADS = 5 | ||||||
barrier = threading.Barrier(N_THREADS, timeout=30) | ||||||
client = await self.async_rs_or_single_client(minPoolSize=N_THREADS) | ||||||
|
||||||
# Wait for initial discovery. | ||||||
await client.admin.command("ping") | ||||||
pool = await async_get_pool(client) | ||||||
starting_generation = pool.gen.get_overall() | ||||||
await async_wait_until(lambda: len(pool.conns) == N_THREADS, "created conns") | ||||||
|
||||||
def mock_command(*args, **kwargs): | ||||||
# Synchronize all threads to ensure they use the same generation. | ||||||
barrier.wait() | ||||||
raise AutoReconnect("mock AsyncConnection.command error") | ||||||
|
||||||
for conn in pool.conns: | ||||||
conn.command = mock_command | ||||||
|
||||||
async def insert_command(i): | ||||||
try: | ||||||
await client.test.command("insert", "test", documents=[{"i": i}]) | ||||||
except AutoReconnect: | ||||||
pass | ||||||
|
||||||
threads = [] | ||||||
for i in range(N_THREADS): | ||||||
threads.append(threading.Thread(target=insert_command, args=(i,))) | ||||||
for t in threads: | ||||||
t.start() | ||||||
for t in threads: | ||||||
t.join() | ||||||
|
||||||
# Expect a single pool reset for the network error | ||||||
self.assertEqual(starting_generation + 1, pool.gen.get_overall()) | ||||||
|
||||||
# Server should be selectable. | ||||||
await client.admin.command("ping") | ||||||
else: | ||||||
|
||||||
async def test_ignore_stale_connection_errors(self): | ||||||
N_TASKS = 5 | ||||||
barrier = asyncio.Barrier(N_TASKS) | ||||||
client = await self.async_rs_or_single_client(minPoolSize=N_TASKS) | ||||||
|
||||||
# Wait for initial discovery. | ||||||
await client.admin.command("ping") | ||||||
pool = await async_get_pool(client) | ||||||
starting_generation = pool.gen.get_overall() | ||||||
await async_wait_until(lambda: len(pool.conns) == N_TASKS, "created conns") | ||||||
|
||||||
async def mock_command(*args, **kwargs): | ||||||
# Synchronize all threads to ensure they use the same generation. | ||||||
await asyncio.wait_for(barrier.wait(), timeout=30) | ||||||
raise AutoReconnect("mock AsyncConnection.command error") | ||||||
|
||||||
for conn in pool.conns: | ||||||
conn.command = mock_command | ||||||
async def test_ignore_stale_connection_errors(self): | ||||||
N_TASKS = 5 | ||||||
barrier = async_create_barrier(N_TASKS, timeout=30) | ||||||
client = await self.async_rs_or_single_client(minPoolSize=N_TASKS) | ||||||
|
||||||
async def insert_command(i): | ||||||
try: | ||||||
await client.test.command("insert", "test", documents=[{"i": i}]) | ||||||
except AutoReconnect: | ||||||
pass | ||||||
|
||||||
tasks = [] | ||||||
for i in range(N_TASKS): | ||||||
tasks.append(asyncio.create_task(insert_command(i))) | ||||||
for t in tasks: | ||||||
await t | ||||||
|
||||||
# Expect a single pool reset for the network error | ||||||
self.assertEqual(starting_generation + 1, pool.gen.get_overall()) | ||||||
|
||||||
# Server should be selectable. | ||||||
await client.admin.command("ping") | ||||||
# Wait for initial discovery. | ||||||
await client.admin.command("ping") | ||||||
pool = await async_get_pool(client) | ||||||
starting_generation = pool.gen.get_overall() | ||||||
await async_wait_until(lambda: len(pool.conns) == N_TASKS, "created conns") | ||||||
|
||||||
async def mock_command(*args, **kwargs): | ||||||
# Synchronize all threads to ensure they use the same generation. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
await async_barrier_wait(barrier, timeout=30) | ||||||
raise AutoReconnect("mock AsyncConnection.command error") | ||||||
|
||||||
for conn in pool.conns: | ||||||
conn.command = mock_command | ||||||
|
||||||
async def insert_command(i): | ||||||
try: | ||||||
await client.test.command("insert", "test", documents=[{"i": i}]) | ||||||
except AutoReconnect: | ||||||
pass | ||||||
|
||||||
tasks = [] | ||||||
for i in range(N_TASKS): | ||||||
tasks.append(ConcurrentRunner(target=insert_command, args=(i,))) | ||||||
for t in tasks: | ||||||
await t.start() | ||||||
for t in tasks: | ||||||
await t.join() | ||||||
|
||||||
# Expect a single pool reset for the network error | ||||||
self.assertEqual(starting_generation + 1, pool.gen.get_overall()) | ||||||
|
||||||
# Server should be selectable. | ||||||
await client.admin.command("ping") | ||||||
|
||||||
|
||||||
class CMAPHeartbeatListener(HeartbeatEventListener, CMAPListener): | ||||||
|
@@ -499,14 +460,12 @@ async def handle_client(reader: StreamReader, writer: StreamWriter): | |||||
server = await asyncio.start_server(handle_client, "localhost", 9999) | ||||||
server.events = events | ||||||
await server.start_serving() | ||||||
print(server.is_serving()) | ||||||
_c = self.simple_client( | ||||||
"mongodb://localhost:9999", | ||||||
serverSelectionTimeoutMS=500, | ||||||
event_listeners=(listener,), | ||||||
) | ||||||
if _c._options.connect: | ||||||
await _c.aconnect() | ||||||
await _c.aconnect() | ||||||
|
||||||
await listener.async_wait_for_event(ServerHeartbeatStartedEvent, 1) | ||||||
await listener.async_wait_for_event(ServerHeartbeatFailedEvent, 1) | ||||||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whitespace change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i believe its not there anymore, sorry about that.