From 9c9020c0acf29c829baa57027be4fc7dd423b656 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 2 Feb 2022 13:53:58 -0600 Subject: [PATCH 1/2] PYTHON-3075 bulk_write does not apply CodecOptions to upserted_ids result (#840) --- pymongo/message.py | 13 +++---- pymongo/pool.py | 4 +-- test/test_bulk.py | 76 +++++++++++++++++++++++++++++++++++++++++ test/test_encryption.py | 33 +++++++++++++++++- 4 files changed, 117 insertions(+), 9 deletions(-) diff --git a/pymongo/message.py b/pymongo/message.py index 86a83f152e..5bce778fca 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -32,6 +32,7 @@ _decode_selective, _dict_to_bson, _make_c_string) +from bson import codec_options from bson.int64 import Int64 from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument) @@ -817,7 +818,7 @@ def write_command(self, cmd, request_id, msg, docs): self._start(cmd, request_id, docs) start = datetime.datetime.now() try: - reply = self.sock_info.write_command(request_id, msg) + reply = self.sock_info.write_command(request_id, msg, self.codec) if self.publish: duration = (datetime.datetime.now() - start) + duration self._succeed(request_id, reply, duration) @@ -886,7 +887,7 @@ def execute(self, cmd, docs, client): batched_cmd, to_send = self._batch_command(cmd, docs) result = self.sock_info.command( self.db_name, batched_cmd, - codec_options=_UNICODE_REPLACE_CODEC_OPTIONS, + codec_options=self.codec, session=self.session, client=client) return result, to_send @@ -1231,9 +1232,9 @@ def unpack_response(self, cursor_id=None, return bson._decode_all_selective( self.documents, codec_options, user_fields) - def command_response(self): + def command_response(self, codec_options): """Unpack a command response.""" - docs = self.unpack_response() + docs = self.unpack_response(codec_options=codec_options) assert self.number_returned == 1 return docs[0] @@ -1299,9 +1300,9 @@ def unpack_response(self, cursor_id=None, return bson._decode_all_selective( self.payload_document, codec_options, user_fields) - def command_response(self): + def command_response(self, codec_options): """Unpack a command response.""" - return self.unpack_response()[0] + return self.unpack_response(codec_options=codec_options)[0] def raw_command_response(self): """Return the bytes of the command response.""" diff --git a/pymongo/pool.py b/pymongo/pool.py index f9b370c66e..2ae2576250 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -782,7 +782,7 @@ def unack_write(self, msg, max_doc_size): self._raise_if_not_writable(True) self.send_message(msg, max_doc_size) - def write_command(self, request_id, msg): + def write_command(self, request_id, msg, codec_options): """Send "insert" etc. command, returning response as a dict. Can raise ConnectionFailure or OperationFailure. @@ -793,7 +793,7 @@ def write_command(self, request_id, msg): """ self.send_message(msg, 0) reply = self.receive_message(request_id) - result = reply.command_response() + result = reply.command_response(codec_options) # Raises NotPrimaryError or OperationFailure. helpers._check_command_response(result, self.max_wire_version) diff --git a/test/test_bulk.py b/test/test_bulk.py index f93cd6c766..08740a437e 100644 --- a/test/test_bulk.py +++ b/test/test_bulk.py @@ -15,9 +15,13 @@ """Test the bulk API.""" import sys +import uuid +from bson.binary import UuidRepresentation +from bson.codec_options import CodecOptions sys.path[0:0] = [""] +from bson import Binary from bson.objectid import ObjectId from pymongo.common import partition_node from pymongo.errors import (BulkWriteError, @@ -376,6 +380,78 @@ def test_client_generated_upsert_id(self): {'index': 2, '_id': 2}]}, result.bulk_api_result) + def test_upsert_uuid_standard(self): + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + coll = self.coll.with_options(codec_options=options) + uuids = [uuid.uuid4() for _ in range(3)] + result = coll.bulk_write([ + UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True), + ]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': uuids[0]}, + {'index': 1, '_id': uuids[1]}, + {'index': 2, '_id': uuids[2]}]}, + result.bulk_api_result) + + def test_upsert_uuid_unspecified(self): + options = CodecOptions(uuid_representation=UuidRepresentation.UNSPECIFIED) + coll = self.coll.with_options(codec_options=options) + uuids = [Binary.from_uuid(uuid.uuid4()) for _ in range(3)] + result = coll.bulk_write([ + UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True), + ]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': uuids[0]}, + {'index': 1, '_id': uuids[1]}, + {'index': 2, '_id': uuids[2]}]}, + result.bulk_api_result) + + def test_upsert_uuid_standard_subdocuments(self): + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + coll = self.coll.with_options(codec_options=options) + ids = [ + {'f': Binary(bytes(i)), 'f2': uuid.uuid4()} + for i in range(3) + ] + + result = coll.bulk_write([ + UpdateOne({'_id': ids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': ids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': ids[2]}, {'_id': ids[2]}, upsert=True), + ]) + + # The `Binary` values are returned as `bytes` objects. + for _id in ids: + _id['f'] = bytes(_id['f']) + + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': ids[0]}, + {'index': 1, '_id': ids[1]}, + {'index': 2, '_id': ids[2]}]}, + result.bulk_api_result) + def test_single_ordered_batch(self): result = self.coll.bulk_write([ InsertOne({'a': 1}), diff --git a/test/test_encryption.py b/test/test_encryption.py index f77d3fffc7..b53f2cc92a 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -29,6 +29,7 @@ from bson import encode, json_util from bson.binary import (Binary, + UuidRepresentation, JAVA_LEGACY, STANDARD, UUID_SUBTYPE) @@ -50,13 +51,14 @@ ServerSelectionTimeoutError, WriteError) from pymongo.mongo_client import MongoClient -from pymongo.operations import InsertOne +from pymongo.operations import InsertOne, ReplaceOne, UpdateOne from pymongo.write_concern import WriteConcern from test import (unittest, CA_PEM, CLIENT_PEM, client_context, IntegrationTest, PyMongoTestCase) +from test.test_bulk import BulkTestBase from test.utils import (TestCreator, camel_to_snake_args, OvertCommandListener, @@ -313,6 +315,35 @@ def test_use_after_close(self): client.admin.command('ping') +class TestEncryptedBulkWrite(BulkTestBase, EncryptionIntegrationTest): + + def test_upsert_uuid_standard_encrypte(self): + opts = AutoEncryptionOpts(KMS_PROVIDERS, 'keyvault.datakeys') + client = rs_or_single_client(auto_encryption_opts=opts) + self.addCleanup(client.close) + + options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD) + encrypted_coll = client.pymongo_test.test + coll = encrypted_coll.with_options(codec_options=options) + uuids = [uuid.uuid4() for _ in range(3)] + result = coll.bulk_write([ + UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True), + ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True), + # This is just here to make the counts right in all cases. + ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True), + ]) + self.assertEqualResponse( + {'nMatched': 0, + 'nModified': 0, + 'nUpserted': 3, + 'nInserted': 0, + 'nRemoved': 0, + 'upserted': [{'index': 0, '_id': uuids[0]}, + {'index': 1, '_id': uuids[1]}, + {'index': 2, '_id': uuids[2]}]}, + result.bulk_api_result) + + class TestClientMaxWireVersion(IntegrationTest): @classmethod From b61296669b66e6faa5b0a04c68859fb36ff9d307 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 2 Feb 2022 17:27:10 -0600 Subject: [PATCH 2/2] remove unused import --- pymongo/message.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pymongo/message.py b/pymongo/message.py index 5bce778fca..fe203c8431 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -32,7 +32,6 @@ _decode_selective, _dict_to_bson, _make_c_string) -from bson import codec_options from bson.int64 import Int64 from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS, RawBSONDocument)