Skip to content

PYTHON-3075 [v4.0] bulk_write does not apply CodecOptions to upserted_ids result #841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pymongo/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
_decode_selective,
_dict_to_bson,
_make_c_string)
from bson import codec_options
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unneeded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

from bson.int64 import Int64
from bson.raw_bson import (_inflate_bson, DEFAULT_RAW_BSON_OPTIONS,
RawBSONDocument)
Expand Down Expand Up @@ -817,7 +818,7 @@ def write_command(self, cmd, request_id, msg, docs):
self._start(cmd, request_id, docs)
start = datetime.datetime.now()
try:
reply = self.sock_info.write_command(request_id, msg)
reply = self.sock_info.write_command(request_id, msg, self.codec)
if self.publish:
duration = (datetime.datetime.now() - start) + duration
self._succeed(request_id, reply, duration)
Expand Down Expand Up @@ -886,7 +887,7 @@ def execute(self, cmd, docs, client):
batched_cmd, to_send = self._batch_command(cmd, docs)
result = self.sock_info.command(
self.db_name, batched_cmd,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS,
codec_options=self.codec,
session=self.session, client=client)
return result, to_send

Expand Down Expand Up @@ -1231,9 +1232,9 @@ def unpack_response(self, cursor_id=None,
return bson._decode_all_selective(
self.documents, codec_options, user_fields)

def command_response(self):
def command_response(self, codec_options):
"""Unpack a command response."""
docs = self.unpack_response()
docs = self.unpack_response(codec_options=codec_options)
assert self.number_returned == 1
return docs[0]

Expand Down Expand Up @@ -1299,9 +1300,9 @@ def unpack_response(self, cursor_id=None,
return bson._decode_all_selective(
self.payload_document, codec_options, user_fields)

def command_response(self):
def command_response(self, codec_options):
"""Unpack a command response."""
return self.unpack_response()[0]
return self.unpack_response(codec_options=codec_options)[0]

def raw_command_response(self):
"""Return the bytes of the command response."""
Expand Down
4 changes: 2 additions & 2 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ def unack_write(self, msg, max_doc_size):
self._raise_if_not_writable(True)
self.send_message(msg, max_doc_size)

def write_command(self, request_id, msg):
def write_command(self, request_id, msg, codec_options):
"""Send "insert" etc. command, returning response as a dict.

Can raise ConnectionFailure or OperationFailure.
Expand All @@ -793,7 +793,7 @@ def write_command(self, request_id, msg):
"""
self.send_message(msg, 0)
reply = self.receive_message(request_id)
result = reply.command_response()
result = reply.command_response(codec_options)

# Raises NotPrimaryError or OperationFailure.
helpers._check_command_response(result, self.max_wire_version)
Expand Down
76 changes: 76 additions & 0 deletions test/test_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
"""Test the bulk API."""

import sys
import uuid
from bson.binary import UuidRepresentation
from bson.codec_options import CodecOptions

sys.path[0:0] = [""]

from bson import Binary
from bson.objectid import ObjectId
from pymongo.common import partition_node
from pymongo.errors import (BulkWriteError,
Expand Down Expand Up @@ -376,6 +380,78 @@ def test_client_generated_upsert_id(self):
{'index': 2, '_id': 2}]},
result.bulk_api_result)

def test_upsert_uuid_standard(self):
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
coll = self.coll.with_options(codec_options=options)
uuids = [uuid.uuid4() for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)

def test_upsert_uuid_unspecified(self):
options = CodecOptions(uuid_representation=UuidRepresentation.UNSPECIFIED)
coll = self.coll.with_options(codec_options=options)
uuids = [Binary.from_uuid(uuid.uuid4()) for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)

def test_upsert_uuid_standard_subdocuments(self):
options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
coll = self.coll.with_options(codec_options=options)
ids = [
{'f': Binary(bytes(i)), 'f2': uuid.uuid4()}
for i in range(3)
]

result = coll.bulk_write([
UpdateOne({'_id': ids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': ids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': ids[2]}, {'_id': ids[2]}, upsert=True),
])

# The `Binary` values are returned as `bytes` objects.
for _id in ids:
_id['f'] = bytes(_id['f'])

self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': ids[0]},
{'index': 1, '_id': ids[1]},
{'index': 2, '_id': ids[2]}]},
result.bulk_api_result)

def test_single_ordered_batch(self):
result = self.coll.bulk_write([
InsertOne({'a': 1}),
Expand Down
33 changes: 32 additions & 1 deletion test/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from bson import encode, json_util
from bson.binary import (Binary,
UuidRepresentation,
JAVA_LEGACY,
STANDARD,
UUID_SUBTYPE)
Expand All @@ -50,13 +51,14 @@
ServerSelectionTimeoutError,
WriteError)
from pymongo.mongo_client import MongoClient
from pymongo.operations import InsertOne
from pymongo.operations import InsertOne, ReplaceOne, UpdateOne
from pymongo.write_concern import WriteConcern

from test import (unittest, CA_PEM, CLIENT_PEM,
client_context,
IntegrationTest,
PyMongoTestCase)
from test.test_bulk import BulkTestBase
from test.utils import (TestCreator,
camel_to_snake_args,
OvertCommandListener,
Expand Down Expand Up @@ -313,6 +315,35 @@ def test_use_after_close(self):
client.admin.command('ping')


class TestEncryptedBulkWrite(BulkTestBase, EncryptionIntegrationTest):

def test_upsert_uuid_standard_encrypte(self):
opts = AutoEncryptionOpts(KMS_PROVIDERS, 'keyvault.datakeys')
client = rs_or_single_client(auto_encryption_opts=opts)
self.addCleanup(client.close)

options = CodecOptions(uuid_representation=UuidRepresentation.STANDARD)
encrypted_coll = client.pymongo_test.test
coll = encrypted_coll.with_options(codec_options=options)
uuids = [uuid.uuid4() for _ in range(3)]
result = coll.bulk_write([
UpdateOne({'_id': uuids[0]}, {'$set': {'a': 0}}, upsert=True),
ReplaceOne({'a': 1}, {'_id': uuids[1]}, upsert=True),
# This is just here to make the counts right in all cases.
ReplaceOne({'_id': uuids[2]}, {'_id': uuids[2]}, upsert=True),
])
self.assertEqualResponse(
{'nMatched': 0,
'nModified': 0,
'nUpserted': 3,
'nInserted': 0,
'nRemoved': 0,
'upserted': [{'index': 0, '_id': uuids[0]},
{'index': 1, '_id': uuids[1]},
{'index': 2, '_id': uuids[2]}]},
result.bulk_api_result)


class TestClientMaxWireVersion(IntegrationTest):

@classmethod
Expand Down