Skip to content

PYTHON-5078 Convert test.test_discovery_and_monitoring to async #2093

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2e62605
Convert test.test_discovery_and_monitoring to async
sleepyStick Jan 29, 2025
3b63840
modify path
sleepyStick Jan 29, 2025
a2b75d5
Merge branch 'master' into PYTHON-5078
sleepyStick Jan 29, 2025
004f8f0
Update test/asynchronous/test_discovery_and_monitoring.py
sleepyStick Jan 29, 2025
70d09d3
Update test/asynchronous/test_discovery_and_monitoring.py
sleepyStick Jan 29, 2025
568cc64
debugging..
sleepyStick Jan 30, 2025
d7a4a28
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 5, 2025
47d9ebd
make test_heartbeat_start_ordering async
sleepyStick Feb 7, 2025
24d30af
address review
sleepyStick Feb 11, 2025
9b3cd72
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 11, 2025
ffb07b3
fix typing and lint
sleepyStick Feb 11, 2025
1c1ebef
fix typing
sleepyStick Feb 11, 2025
d00eb45
test_ignore_stale_connection_errors should only run on python3.11 and…
sleepyStick Feb 11, 2025
f22e84b
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 11, 2025
8d0921b
fix imports
sleepyStick Feb 11, 2025
d314742
Merge branch 'PYTHON-5078' of github.com:sleepyStick/mongo-python-dri…
sleepyStick Feb 11, 2025
a5c8930
undo changes to hello response
sleepyStick Feb 11, 2025
041e4e7
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 11, 2025
17d0e4c
address review -ish
sleepyStick Feb 12, 2025
31d2d81
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 13, 2025
c4c9e00
undo accidental changes
sleepyStick Feb 18, 2025
1e6016e
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 18, 2025
c454ade
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 19, 2025
a3465e9
retrying the tests because they're flakey
sleepyStick Feb 24, 2025
7aab775
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 24, 2025
9484d7a
Merge branch 'master' into PYTHON-5078
sleepyStick Feb 27, 2025
c4beba1
update discovery and monitoring based on #1925
sleepyStick Feb 27, 2025
9b4fd45
skip flakey test for now
sleepyStick Feb 28, 2025
c0e37e5
remove import
sleepyStick Feb 28, 2025
81f7a1b
another flakey test to skip
sleepyStick Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion test/asynchronous/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ async def run(self):
finally:
self.stopped = True


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whitespace change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe its not there anymore, sorry about that.

class ExceptionCatchingTask(ConcurrentRunner):
"""A Task that stores any exception encountered while running."""

Expand All @@ -422,3 +422,20 @@ async def run(self):
except BaseException as exc:
self.exc = exc
raise


def create_barrier(N_TASKS, timeout: float | None = None):
return threading.Barrier(N_TASKS, timeout)


def async_create_barrier(N_TASKS, timeout: float | None = None):
return asyncio.Barrier(N_TASKS)


def barrier_wait(barrier, timeout: float | None = None):
barrier.wait()


async def async_barrier_wait(barrier, timeout: float | None = None):
await asyncio.wait_for(barrier.wait(), timeout)

119 changes: 39 additions & 80 deletions test/asynchronous/test_discovery_and_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import threading
from asyncio import StreamReader, StreamWriter
from pathlib import Path
from test.asynchronous.helpers import ConcurrentRunner, async_barrier_wait, async_create_barrier

sys.path[0:0] = [""]

Expand Down Expand Up @@ -275,84 +276,44 @@ async def send_cluster_time(time, inc, should_update):


class TestIgnoreStaleErrors(AsyncIntegrationTest):
if _IS_SYNC:

async def test_ignore_stale_connection_errors(self):
N_THREADS = 5
barrier = threading.Barrier(N_THREADS, timeout=30)
client = await self.async_rs_or_single_client(minPoolSize=N_THREADS)

# Wait for initial discovery.
await client.admin.command("ping")
pool = await async_get_pool(client)
starting_generation = pool.gen.get_overall()
await async_wait_until(lambda: len(pool.conns) == N_THREADS, "created conns")

def mock_command(*args, **kwargs):
# Synchronize all threads to ensure they use the same generation.
barrier.wait()
raise AutoReconnect("mock AsyncConnection.command error")

for conn in pool.conns:
conn.command = mock_command

async def insert_command(i):
try:
await client.test.command("insert", "test", documents=[{"i": i}])
except AutoReconnect:
pass

threads = []
for i in range(N_THREADS):
threads.append(threading.Thread(target=insert_command, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()

# Expect a single pool reset for the network error
self.assertEqual(starting_generation + 1, pool.gen.get_overall())

# Server should be selectable.
await client.admin.command("ping")
else:

async def test_ignore_stale_connection_errors(self):
N_TASKS = 5
barrier = asyncio.Barrier(N_TASKS)
client = await self.async_rs_or_single_client(minPoolSize=N_TASKS)

# Wait for initial discovery.
await client.admin.command("ping")
pool = await async_get_pool(client)
starting_generation = pool.gen.get_overall()
await async_wait_until(lambda: len(pool.conns) == N_TASKS, "created conns")

async def mock_command(*args, **kwargs):
# Synchronize all threads to ensure they use the same generation.
await asyncio.wait_for(barrier.wait(), timeout=30)
raise AutoReconnect("mock AsyncConnection.command error")

for conn in pool.conns:
conn.command = mock_command
async def test_ignore_stale_connection_errors(self):
N_TASKS = 5
barrier = async_create_barrier(N_TASKS, timeout=30)
client = await self.async_rs_or_single_client(minPoolSize=N_TASKS)

async def insert_command(i):
try:
await client.test.command("insert", "test", documents=[{"i": i}])
except AutoReconnect:
pass

tasks = []
for i in range(N_TASKS):
tasks.append(asyncio.create_task(insert_command(i)))
for t in tasks:
await t

# Expect a single pool reset for the network error
self.assertEqual(starting_generation + 1, pool.gen.get_overall())

# Server should be selectable.
await client.admin.command("ping")
# Wait for initial discovery.
await client.admin.command("ping")
pool = await async_get_pool(client)
starting_generation = pool.gen.get_overall()
await async_wait_until(lambda: len(pool.conns) == N_TASKS, "created conns")

async def mock_command(*args, **kwargs):
# Synchronize all threads to ensure they use the same generation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Synchronize all threads to ensure they use the same generation.
# Synchronize all tasks to ensure they use the same generation.

await async_barrier_wait(barrier, timeout=30)
raise AutoReconnect("mock AsyncConnection.command error")

for conn in pool.conns:
conn.command = mock_command

async def insert_command(i):
try:
await client.test.command("insert", "test", documents=[{"i": i}])
except AutoReconnect:
pass

tasks = []
for i in range(N_TASKS):
tasks.append(ConcurrentRunner(target=insert_command, args=(i,)))
for t in tasks:
await t.start()
for t in tasks:
await t.join()

# Expect a single pool reset for the network error
self.assertEqual(starting_generation + 1, pool.gen.get_overall())

# Server should be selectable.
await client.admin.command("ping")


class CMAPHeartbeatListener(HeartbeatEventListener, CMAPListener):
Expand Down Expand Up @@ -499,14 +460,12 @@ async def handle_client(reader: StreamReader, writer: StreamWriter):
server = await asyncio.start_server(handle_client, "localhost", 9999)
server.events = events
await server.start_serving()
print(server.is_serving())
_c = self.simple_client(
"mongodb://localhost:9999",
serverSelectionTimeoutMS=500,
event_listeners=(listener,),
)
if _c._options.connect:
await _c.aconnect()
await _c.aconnect()

await listener.async_wait_for_event(ServerHeartbeatStartedEvent, 1)
await listener.async_wait_for_event(ServerHeartbeatFailedEvent, 1)
Expand Down
17 changes: 17 additions & 0 deletions test/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,20 @@ def run(self):
except BaseException as exc:
self.exc = exc
raise


def create_barrier(N_TASKS, timeout: float | None = None):
return threading.Barrier(N_TASKS, timeout)


def create_barrier(N_TASKS, timeout: float | None = None):
return asyncio.Barrier(N_TASKS)


def barrier_wait(barrier, timeout: float | None = None):
barrier.wait()


def barrier_wait(barrier, timeout: float | None = None):
asyncio.wait_for(barrier.wait(), timeout)

119 changes: 39 additions & 80 deletions test/test_discovery_and_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import threading
from asyncio import StreamReader, StreamWriter
from pathlib import Path
from test.helpers import ConcurrentRunner, barrier_wait, create_barrier

sys.path[0:0] = [""]

Expand Down Expand Up @@ -275,84 +276,44 @@ def send_cluster_time(time, inc, should_update):


class TestIgnoreStaleErrors(IntegrationTest):
if _IS_SYNC:

def test_ignore_stale_connection_errors(self):
N_THREADS = 5
barrier = threading.Barrier(N_THREADS, timeout=30)
client = self.rs_or_single_client(minPoolSize=N_THREADS)

# Wait for initial discovery.
client.admin.command("ping")
pool = get_pool(client)
starting_generation = pool.gen.get_overall()
wait_until(lambda: len(pool.conns) == N_THREADS, "created conns")

def mock_command(*args, **kwargs):
# Synchronize all threads to ensure they use the same generation.
barrier.wait()
raise AutoReconnect("mock Connection.command error")

for conn in pool.conns:
conn.command = mock_command

def insert_command(i):
try:
client.test.command("insert", "test", documents=[{"i": i}])
except AutoReconnect:
pass

threads = []
for i in range(N_THREADS):
threads.append(threading.Thread(target=insert_command, args=(i,)))
for t in threads:
t.start()
for t in threads:
t.join()

# Expect a single pool reset for the network error
self.assertEqual(starting_generation + 1, pool.gen.get_overall())

# Server should be selectable.
client.admin.command("ping")
else:

def test_ignore_stale_connection_errors(self):
N_TASKS = 5
barrier = asyncio.Barrier(N_TASKS)
client = self.rs_or_single_client(minPoolSize=N_TASKS)

# Wait for initial discovery.
client.admin.command("ping")
pool = get_pool(client)
starting_generation = pool.gen.get_overall()
wait_until(lambda: len(pool.conns) == N_TASKS, "created conns")

def mock_command(*args, **kwargs):
# Synchronize all threads to ensure they use the same generation.
asyncio.wait_for(barrier.wait(), timeout=30)
raise AutoReconnect("mock Connection.command error")

for conn in pool.conns:
conn.command = mock_command
def test_ignore_stale_connection_errors(self):
N_TASKS = 5
barrier = create_barrier(N_TASKS, timeout=30)
client = self.rs_or_single_client(minPoolSize=N_TASKS)

def insert_command(i):
try:
client.test.command("insert", "test", documents=[{"i": i}])
except AutoReconnect:
pass

tasks = []
for i in range(N_TASKS):
tasks.append(asyncio.create_task(insert_command(i)))
for t in tasks:
t

# Expect a single pool reset for the network error
self.assertEqual(starting_generation + 1, pool.gen.get_overall())

# Server should be selectable.
client.admin.command("ping")
# Wait for initial discovery.
client.admin.command("ping")
pool = get_pool(client)
starting_generation = pool.gen.get_overall()
wait_until(lambda: len(pool.conns) == N_TASKS, "created conns")

def mock_command(*args, **kwargs):
# Synchronize all threads to ensure they use the same generation.
barrier_wait(barrier, timeout=30)
raise AutoReconnect("mock Connection.command error")

for conn in pool.conns:
conn.command = mock_command

def insert_command(i):
try:
client.test.command("insert", "test", documents=[{"i": i}])
except AutoReconnect:
pass

tasks = []
for i in range(N_TASKS):
tasks.append(ConcurrentRunner(target=insert_command, args=(i,)))
for t in tasks:
t.start()
for t in tasks:
t.join()

# Expect a single pool reset for the network error
self.assertEqual(starting_generation + 1, pool.gen.get_overall())

# Server should be selectable.
client.admin.command("ping")


class CMAPHeartbeatListener(HeartbeatEventListener, CMAPListener):
Expand Down Expand Up @@ -499,14 +460,12 @@ def handle_client(reader: StreamReader, writer: StreamWriter):
server = asyncio.start_server(handle_client, "localhost", 9999)
server.events = events
server.start_serving()
print(server.is_serving())
_c = self.simple_client(
"mongodb://localhost:9999",
serverSelectionTimeoutMS=500,
event_listeners=(listener,),
)
if _c._options.connect:
_c._connect()
_c._connect()

listener.wait_for_event(ServerHeartbeatStartedEvent, 1)
listener.wait_for_event(ServerHeartbeatFailedEvent, 1)
Expand Down
2 changes: 2 additions & 0 deletions tools/synchro.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
"AsyncMockConnection": "MockConnection",
"AsyncMockPool": "MockPool",
"create_async_event": "create_event",
"async_create_barrier": "create_barrier",
"async_barrier_wait": "barrier_wait",
}

docstring_replacements: dict[tuple[str, str], str] = {
Expand Down
Loading