diff --git a/.evergreen/resync-specs.sh b/.evergreen/resync-specs.sh index 1f70940aa0..d7dfafbba9 100755 --- a/.evergreen/resync-specs.sh +++ b/.evergreen/resync-specs.sh @@ -131,6 +131,9 @@ do gridfs) cpjson gridfs/tests gridfs ;; + handshake) + cpjson mongodb-handshake/tests handshake + ;; index|index-management) cpjson index-management/tests index_management ;; diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 72755263c9..6390bcdd67 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -70,6 +70,7 @@ from pymongo.asynchronous.settings import TopologySettings from pymongo.asynchronous.topology import Topology, _ErrorContext from pymongo.client_options import ClientOptions +from pymongo.driver_info import DriverInfo from pymongo.errors import ( AutoReconnect, BulkWriteError, @@ -1040,6 +1041,23 @@ async def target() -> bool: self._kill_cursors_executor = executor self._opened = False + def append_metadata(self, driver_info: DriverInfo) -> None: + """ + Appends the given metadata to existing driver metadata. + """ + metadata = self._options.pool_options.metadata + for k, v in driver_info._asdict().items(): + if v is None: + continue + if k in metadata: + metadata[k] = f"{metadata[k]}|{v}" + elif k in metadata["driver"]: + metadata["driver"][k] = "{}|{}".format( + metadata["driver"][k], + v, + ) + self._options.pool_options._set_metadata(metadata) + def _should_pin_cursor(self, session: Optional[AsyncClientSession]) -> Optional[bool]: return self._options.load_balanced and not (session and session.in_transaction) diff --git a/pymongo/pool_options.py b/pymongo/pool_options.py index a2e309cc56..33cd97978f 100644 --- a/pymongo/pool_options.py +++ b/pymongo/pool_options.py @@ -522,3 +522,6 @@ def server_api(self) -> Optional[ServerApi]: def load_balanced(self) -> Optional[bool]: """True if this Pool is configured in load balanced mode.""" return self.__load_balanced + + def _set_metadata(self, new_data: dict[str, Any]) -> None: + self.__metadata = new_data diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 99a517e5c1..f36ae491d6 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -62,6 +62,7 @@ from bson.timestamp import Timestamp from pymongo import _csot, common, helpers_shared, periodic_executor from pymongo.client_options import ClientOptions +from pymongo.driver_info import DriverInfo from pymongo.errors import ( AutoReconnect, BulkWriteError, @@ -1040,6 +1041,23 @@ def target() -> bool: self._kill_cursors_executor = executor self._opened = False + def append_metadata(self, driver_info: DriverInfo) -> None: + """ + Appends the given metadata to existing driver metadata. + """ + metadata = self._options.pool_options.metadata + for k, v in driver_info._asdict().items(): + if v is None: + continue + if k in metadata: + metadata[k] = f"{metadata[k]}|{v}" + elif k in metadata["driver"]: + metadata["driver"][k] = "{}|{}".format( + metadata["driver"][k], + v, + ) + self._options.pool_options._set_metadata(metadata) + def _should_pin_cursor(self, session: Optional[ClientSession]) -> Optional[bool]: return self._options.load_balanced and not (session and session.in_transaction) diff --git a/test/asynchronous/test_client_metadata.py b/test/asynchronous/test_client_metadata.py new file mode 100644 index 0000000000..03452aa37c --- /dev/null +++ b/test/asynchronous/test_client_metadata.py @@ -0,0 +1,225 @@ +# Copyright 2013-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import asyncio +import os +import pathlib +import time +import unittest +from test.asynchronous import AsyncIntegrationTest +from test.asynchronous.unified_format import generate_test_classes +from test.utils_shared import CMAPListener +from typing import Any, Optional + +import pytest + +from pymongo import AsyncMongoClient, MongoClient +from pymongo.driver_info import DriverInfo +from pymongo.monitoring import ConnectionClosedEvent + +try: + from mockupdb import MockupDB, OpMsgReply + + _HAVE_MOCKUPDB = True +except ImportError: + _HAVE_MOCKUPDB = False + +pytestmark = pytest.mark.mockupdb + +_IS_SYNC = False + +# Location of JSON test specifications. +if _IS_SYNC: + _TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent, "handshake", "unified") +else: + _TEST_PATH = os.path.join( + pathlib.Path(__file__).resolve().parent.parent, "handshake", "unified" + ) + +# Generate unified tests. +globals().update(generate_test_classes(_TEST_PATH, module=__name__)) + + +def _get_handshake_driver_info(request): + assert "client" in request + return request["client"] + + +class TestClientMetadataProse(AsyncIntegrationTest): + async def asyncSetUp(self): + await super().asyncSetUp() + self.server = MockupDB() + self.handshake_req = None + + def respond(r): + if "ismaster" in r: + # then this is a handshake request + self.handshake_req = r + return r.reply(OpMsgReply(minWireVersion=0, maxWireVersion=13)) + + self.server.autoresponds(respond) + self.server.run() + self.addAsyncCleanup(self.server.stop) + + async def send_ping_and_get_metadata( + self, client: AsyncMongoClient, is_handshake: bool + ) -> tuple[str, Optional[str], Optional[str], dict[str, Any]]: + # reset if handshake request + if is_handshake: + self.handshake_req: Optional[dict] = None + + await client.admin.command("ping") + metadata = _get_handshake_driver_info(self.handshake_req) + driver_metadata = metadata["driver"] + name, version, platform = ( + driver_metadata["name"], + driver_metadata["version"], + metadata["platform"], + ) + return name, version, platform, metadata + + async def check_metadata_added( + self, + client: AsyncMongoClient, + add_name: str, + add_version: Optional[str], + add_platform: Optional[str], + ) -> None: + # send initial metadata + name, version, platform, metadata = await self.send_ping_and_get_metadata(client, True) + # wait for connection to become idle + await asyncio.sleep(0.005) + + # add new metadata + client.append_metadata(DriverInfo(add_name, add_version, add_platform)) + new_name, new_version, new_platform, new_metadata = await self.send_ping_and_get_metadata( + client, True + ) + self.assertEqual(new_name, f"{name}|{add_name}" if add_name is not None else name) + self.assertEqual( + new_version, + f"{version}|{add_version}" if add_version is not None else version, + ) + self.assertEqual( + new_platform, + f"{platform}|{add_platform}" if add_platform is not None else platform, + ) + + metadata.pop("driver") + metadata.pop("platform") + new_metadata.pop("driver") + new_metadata.pop("platform") + self.assertEqual(metadata, new_metadata) + + async def test_append_metadata(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + await self.check_metadata_added(client, "framework", "2.0", "Framework Platform") + await client.close() + + async def test_append_metadata_platform_none(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + await self.check_metadata_added(client, "framework", "2.0", None) + await client.close() + + async def test_append_metadata_version_none(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + await self.check_metadata_added(client, "framework", None, "Framework Platform") + await client.close() + + async def test_append_metadata_platform_version_none(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + await self.check_metadata_added(client, "framework", None, None) + await client.close() + + async def test_multiple_successive_metadata_updates(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, maxIdleTimeMS=1, connect=False + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + await self.check_metadata_added(client, "framework", "2.0", "Framework Platform") + await client.close() + + async def test_multiple_successive_metadata_updates_platform_none(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + await self.check_metadata_added(client, "framework", "2.0", None) + await client.close() + + async def test_multiple_successive_metadata_updates_version_none(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + await self.check_metadata_added(client, "framework", None, "Framework Platform") + await client.close() + + async def test_multiple_successive_metadata_updates_platform_version_none(self): + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + await self.check_metadata_added(client, "framework", None, None) + await client.close() + + async def test_doesnt_update_established_connections(self): + listener = CMAPListener() + client = AsyncMongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + event_listeners=[listener], + ) + + # send initial metadata + name, version, platform, metadata = await self.send_ping_and_get_metadata(client, True) + self.assertIsNotNone(name) + self.assertIsNotNone(version) + self.assertIsNotNone(platform) + + # add data + add_name, add_version, add_platform = "framework", "2.0", "Framework Platform" + client.append_metadata(DriverInfo(add_name, add_version, add_platform)) + # check new data isn't sent + self.handshake_req: Optional[dict] = None + await client.admin.command("ping") + self.assertIsNone(self.handshake_req) + self.assertEqual(listener.event_count(ConnectionClosedEvent), 0) + + await client.close() + + +if __name__ == "__main__": + unittest.main() diff --git a/test/asynchronous/unified_format.py b/test/asynchronous/unified_format.py index fbd1f87755..d793b414c8 100644 --- a/test/asynchronous/unified_format.py +++ b/test/asynchronous/unified_format.py @@ -75,6 +75,7 @@ from pymongo.asynchronous.database import AsyncDatabase from pymongo.asynchronous.encryption import AsyncClientEncryption from pymongo.asynchronous.helpers import anext +from pymongo.driver_info import DriverInfo from pymongo.encryption_options import _HAVE_PYMONGOCRYPT from pymongo.errors import ( AutoReconnect, @@ -840,6 +841,11 @@ async def _cursor_close(self, target, *args, **kwargs): self.__raise_if_unsupported("close", target, NonLazyCursor, AsyncCommandCursor) return await target.close() + async def _clientOperation_appendMetadata(self, target, *args, **kwargs): + info_opts = kwargs["driver_info_options"] + driver_info = DriverInfo(info_opts["name"], info_opts["version"], info_opts["platform"]) + target.append_metadata(driver_info) + async def _clientEncryptionOperation_createDataKey(self, target, *args, **kwargs): if "opts" in kwargs: kwargs.update(camel_to_snake_args(kwargs.pop("opts"))) @@ -925,7 +931,6 @@ async def run_entity_operation(self, spec): ) else: arguments = {} - if isinstance(target, AsyncMongoClient): method_name = f"_clientOperation_{opname}" elif isinstance(target, AsyncDatabase): diff --git a/test/handshake/unified/metadata-not-propagated.json b/test/handshake/unified/metadata-not-propagated.json new file mode 100644 index 0000000000..500b579b89 --- /dev/null +++ b/test/handshake/unified/metadata-not-propagated.json @@ -0,0 +1,100 @@ +{ + "description": "client metadata is not propagated to the server", + "schemaVersion": "1.9", + "runOnRequirements": [ + { + "minServerVersion": "6.0" + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "observeEvents": [ + "commandSucceededEvent", + "commandFailedEvent", + "connectionClosedEvent", + "connectionCreatedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "test" + } + } + ], + "tests": [ + { + "description": "metadata append does not create new connections or close existing ones and no hello command is sent", + "operations": [ + { + "name": "runCommand", + "object": "database", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + }, + { + "name": "appendMetadata", + "object": "client", + "arguments": { + "driverInfoOptions": { + "name": "framework", + "version": "2.0", + "platform": "Framework Platform" + } + } + }, + { + "name": "runCommand", + "object": "database", + "arguments": { + "commandName": "ping", + "command": { + "ping": 1 + } + }, + "expectResult": { + "ok": 1 + } + } + ], + "expectEvents": [ + { + "client": "client", + "eventType": "cmap", + "events": [ + { + "connectionCreatedEvent": {} + } + ] + }, + { + "client": "client", + "eventType": "command", + "events": [ + { + "commandSucceededEvent": { + "commandName": "ping" + } + }, + { + "commandSucceededEvent": { + "commandName": "ping" + } + } + ] + } + ] + } + ] +} diff --git a/test/test_client_metadata.py b/test/test_client_metadata.py new file mode 100644 index 0000000000..255644d72c --- /dev/null +++ b/test/test_client_metadata.py @@ -0,0 +1,225 @@ +# Copyright 2013-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import asyncio +import os +import pathlib +import time +import unittest +from test import IntegrationTest +from test.unified_format import generate_test_classes +from test.utils_shared import CMAPListener +from typing import Any, Optional + +import pytest + +from pymongo import MongoClient +from pymongo.driver_info import DriverInfo +from pymongo.monitoring import ConnectionClosedEvent + +try: + from mockupdb import MockupDB, OpMsgReply + + _HAVE_MOCKUPDB = True +except ImportError: + _HAVE_MOCKUPDB = False + +pytestmark = pytest.mark.mockupdb + +_IS_SYNC = True + +# Location of JSON test specifications. +if _IS_SYNC: + _TEST_PATH = os.path.join(pathlib.Path(__file__).resolve().parent, "handshake", "unified") +else: + _TEST_PATH = os.path.join( + pathlib.Path(__file__).resolve().parent.parent, "handshake", "unified" + ) + +# Generate unified tests. +globals().update(generate_test_classes(_TEST_PATH, module=__name__)) + + +def _get_handshake_driver_info(request): + assert "client" in request + return request["client"] + + +class TestClientMetadataProse(IntegrationTest): + def setUp(self): + super().setUp() + self.server = MockupDB() + self.handshake_req = None + + def respond(r): + if "ismaster" in r: + # then this is a handshake request + self.handshake_req = r + return r.reply(OpMsgReply(minWireVersion=0, maxWireVersion=13)) + + self.server.autoresponds(respond) + self.server.run() + self.addCleanup(self.server.stop) + + def send_ping_and_get_metadata( + self, client: MongoClient, is_handshake: bool + ) -> tuple[str, Optional[str], Optional[str], dict[str, Any]]: + # reset if handshake request + if is_handshake: + self.handshake_req: Optional[dict] = None + + client.admin.command("ping") + metadata = _get_handshake_driver_info(self.handshake_req) + driver_metadata = metadata["driver"] + name, version, platform = ( + driver_metadata["name"], + driver_metadata["version"], + metadata["platform"], + ) + return name, version, platform, metadata + + def check_metadata_added( + self, + client: MongoClient, + add_name: str, + add_version: Optional[str], + add_platform: Optional[str], + ) -> None: + # send initial metadata + name, version, platform, metadata = self.send_ping_and_get_metadata(client, True) + # wait for connection to become idle + time.sleep(0.005) + + # add new metadata + client.append_metadata(DriverInfo(add_name, add_version, add_platform)) + new_name, new_version, new_platform, new_metadata = self.send_ping_and_get_metadata( + client, True + ) + self.assertEqual(new_name, f"{name}|{add_name}" if add_name is not None else name) + self.assertEqual( + new_version, + f"{version}|{add_version}" if add_version is not None else version, + ) + self.assertEqual( + new_platform, + f"{platform}|{add_platform}" if add_platform is not None else platform, + ) + + metadata.pop("driver") + metadata.pop("platform") + new_metadata.pop("driver") + new_metadata.pop("platform") + self.assertEqual(metadata, new_metadata) + + def test_append_metadata(self): + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + self.check_metadata_added(client, "framework", "2.0", "Framework Platform") + client.close() + + def test_append_metadata_platform_none(self): + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + self.check_metadata_added(client, "framework", "2.0", None) + client.close() + + def test_append_metadata_version_none(self): + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + self.check_metadata_added(client, "framework", None, "Framework Platform") + client.close() + + def test_append_metadata_platform_version_none(self): + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + ) + self.check_metadata_added(client, "framework", None, None) + client.close() + + def test_multiple_successive_metadata_updates(self): + client = MongoClient( + "mongodb://" + self.server.address_string, maxIdleTimeMS=1, connect=False + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + self.check_metadata_added(client, "framework", "2.0", "Framework Platform") + client.close() + + def test_multiple_successive_metadata_updates_platform_none(self): + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + self.check_metadata_added(client, "framework", "2.0", None) + client.close() + + def test_multiple_successive_metadata_updates_version_none(self): + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + self.check_metadata_added(client, "framework", None, "Framework Platform") + client.close() + + def test_multiple_successive_metadata_updates_platform_version_none(self): + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + ) + client.append_metadata(DriverInfo("library", "1.2", "Library Platform")) + self.check_metadata_added(client, "framework", None, None) + client.close() + + def test_doesnt_update_established_connections(self): + listener = CMAPListener() + client = MongoClient( + "mongodb://" + self.server.address_string, + maxIdleTimeMS=1, + driver=DriverInfo("library", "1.2", "Library Platform"), + event_listeners=[listener], + ) + + # send initial metadata + name, version, platform, metadata = self.send_ping_and_get_metadata(client, True) + self.assertIsNotNone(name) + self.assertIsNotNone(version) + self.assertIsNotNone(platform) + + # add data + add_name, add_version, add_platform = "framework", "2.0", "Framework Platform" + client.append_metadata(DriverInfo(add_name, add_version, add_platform)) + # check new data isn't sent + self.handshake_req: Optional[dict] = None + client.admin.command("ping") + self.assertIsNone(self.handshake_req) + self.assertEqual(listener.event_count(ConnectionClosedEvent), 0) + + client.close() + + +if __name__ == "__main__": + unittest.main() diff --git a/test/unified_format.py b/test/unified_format.py index 0db037c654..e2504f6dd9 100644 --- a/test/unified_format.py +++ b/test/unified_format.py @@ -67,6 +67,7 @@ from bson.objectid import ObjectId from gridfs import GridFSBucket, GridOut, NoFile from pymongo import ASCENDING, CursorType, MongoClient, _csot +from pymongo.driver_info import DriverInfo from pymongo.encryption_options import _HAVE_PYMONGOCRYPT from pymongo.errors import ( AutoReconnect, @@ -837,6 +838,11 @@ def _cursor_close(self, target, *args, **kwargs): self.__raise_if_unsupported("close", target, NonLazyCursor, CommandCursor) return target.close() + def _clientOperation_appendMetadata(self, target, *args, **kwargs): + info_opts = kwargs["driver_info_options"] + driver_info = DriverInfo(info_opts["name"], info_opts["version"], info_opts["platform"]) + target.append_metadata(driver_info) + def _clientEncryptionOperation_createDataKey(self, target, *args, **kwargs): if "opts" in kwargs: kwargs.update(camel_to_snake_args(kwargs.pop("opts"))) @@ -916,7 +922,6 @@ def run_entity_operation(self, spec): ) else: arguments = {} - if isinstance(target, MongoClient): method_name = f"_clientOperation_{opname}" elif isinstance(target, Database): diff --git a/tools/synchro.py b/tools/synchro.py index 541231cf71..e502f96281 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -212,6 +212,7 @@ def async_only_test(f: str) -> bool: "test_client.py", "test_client_bulk_write.py", "test_client_context.py", + "test_client_metadata.py", "test_collation.py", "test_collection.py", "test_collection_management.py",