Skip to content

PYTHON-1864 PYTHON-2931 Spec complaint $readPreference #809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions pymongo/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ def _database(self):
"""The database against which the aggregation command is run."""
raise NotImplementedError

def _process_result(self, result, session, server, sock_info, secondary_ok):
if self._result_processor:
self._result_processor(
result, session, server, sock_info, secondary_ok)

def get_read_preference(self, session):
if self._write_preference:
return self._write_preference
Expand All @@ -105,7 +100,7 @@ def get_read_preference(self, session):
self._write_preference = pref = _AggWritePref(pref)
return pref

def get_cursor(self, session, server, sock_info, secondary_ok):
def get_cursor(self, session, server, sock_info, read_preference):
# Serialize command.
cmd = SON([("aggregate", self._aggregation_target),
("pipeline", self._pipeline)])
Expand Down Expand Up @@ -134,8 +129,7 @@ def get_cursor(self, session, server, sock_info, secondary_ok):
result = sock_info.command(
self._database.name,
cmd,
secondary_ok,
self.get_read_preference(session),
read_preference,
self._target.codec_options,
parse_write_concern_error=True,
read_concern=read_concern,
Expand All @@ -145,7 +139,8 @@ def get_cursor(self, session, server, sock_info, secondary_ok):
client=self._database.client,
user_fields=self._user_fields)

self._process_result(result, session, server, sock_info, secondary_ok)
if self._result_processor:
self._result_processor(result, sock_info)

# Extract cursor from result or mock/fake one if necessary.
if 'cursor' in result:
Expand Down
2 changes: 1 addition & 1 deletion pymongo/change_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _aggregation_pipeline(self):
full_pipeline.extend(self._pipeline)
return full_pipeline

def _process_result(self, result, session, server, sock_info, secondary_ok):
def _process_result(self, result, sock_info):
"""Callback that caches the postBatchResumeToken or
startAtOperationTime from a changeStream aggregate command response
containing an empty batch of change documents.
Expand Down
35 changes: 18 additions & 17 deletions pymongo/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def _socket_for_reads(self, session):
def _socket_for_writes(self, session):
return self.__database.client._socket_for_writes(session)

def _command(self, sock_info, command, secondary_ok=False,
def _command(self, sock_info, command,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=None,
Expand All @@ -200,7 +200,6 @@ def _command(self, sock_info, command, secondary_ok=False,
:Parameters:
- `sock_info` - A SocketInfo instance.
- `command` - The command itself, as a SON instance.
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit.
- `codec_options` (optional) - An instance of
:class:`~bson.codec_options.CodecOptions`.
- `check`: raise OperationFailure if there are errors
Expand All @@ -226,7 +225,6 @@ def _command(self, sock_info, command, secondary_ok=False,
return sock_info.command(
self.__database.name,
command,
secondary_ok,
read_preference or self._read_preference_for(session),
codec_options or self.codec_options,
check,
Expand Down Expand Up @@ -1356,14 +1354,14 @@ def find_raw_batches(self, *args, **kwargs):

return RawBatchCursor(self, *args, **kwargs)

def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation):
def _count_cmd(self, session, sock_info, read_preference, cmd, collation):
"""Internal count command helper."""
# XXX: "ns missing" checks can be removed when we drop support for
# MongoDB 3.0, see SERVER-17051.
res = self._command(
sock_info,
cmd,
secondary_ok,
read_preference=read_preference,
allowable_errors=["ns missing"],
codec_options=self.__write_response_codec_options,
read_concern=self.read_concern,
Expand All @@ -1374,12 +1372,12 @@ def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation):
return int(res["n"])

def _aggregate_one_result(
self, sock_info, secondary_ok, cmd, collation, session):
self, sock_info, read_preference, cmd, collation, session):
"""Internal helper to run an aggregate that returns a single result."""
result = self._command(
sock_info,
cmd,
secondary_ok,
read_preference,
allowable_errors=[26], # Ignore NamespaceNotFound.
codec_options=self.__write_response_codec_options,
read_concern=self.read_concern,
Expand Down Expand Up @@ -1413,7 +1411,7 @@ def estimated_document_count(self, **kwargs):
raise ConfigurationError(
'estimated_document_count does not support sessions')

def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
if sock_info.max_wire_version >= 12:
# MongoDB 4.9+
pipeline = [
Expand All @@ -1425,15 +1423,17 @@ def _cmd(session, server, sock_info, secondary_ok):
('cursor', {})])
cmd.update(kwargs)
result = self._aggregate_one_result(
sock_info, secondary_ok, cmd, collation=None, session=session)
sock_info, read_preference, cmd, collation=None,
session=session)
if not result:
return 0
return int(result['n'])
else:
# MongoDB < 4.9
cmd = SON([('count', self.__name)])
cmd.update(kwargs)
return self._count_cmd(None, sock_info, secondary_ok, cmd, None)
return self._count_cmd(
None, sock_info, read_preference, cmd, collation=None)

return self.__database.client._retryable_read(
_cmd, self.read_preference, None)
Expand Down Expand Up @@ -1506,9 +1506,9 @@ def count_documents(self, filter, session=None, **kwargs):
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)

def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
result = self._aggregate_one_result(
sock_info, secondary_ok, cmd, collation, session)
sock_info, read_preference, cmd, collation, session)
if not result:
return 0
return result['n']
Expand Down Expand Up @@ -1799,12 +1799,12 @@ def list_indexes(self, session=None):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)

def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
with self.__database.client._tmp_session(session, False) as s:
try:
cursor = self._command(sock_info, cmd, secondary_ok,
read_pref,
cursor = self._command(sock_info, cmd,
read_preference,
codec_options,
session=s)["cursor"]
except OperationFailure as exc:
Expand Down Expand Up @@ -2220,9 +2220,10 @@ def distinct(self, key, filter=None, session=None, **kwargs):
kwargs["query"] = filter
collation = validate_collation_or_none(kwargs.pop('collation', None))
cmd.update(kwargs)
def _cmd(session, server, sock_info, secondary_ok):
def _cmd(session, server, sock_info, read_preference):
return self._command(
sock_info, cmd, secondary_ok, read_concern=self.read_concern,
sock_info, cmd, read_preference=read_preference,
read_concern=self.read_concern,
collation=collation, session=session,
user_fields={"values": 1})["values"]

Expand Down
20 changes: 9 additions & 11 deletions pymongo/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def watch(self, pipeline=None, full_document=None, resume_after=None,
batch_size, collation, start_at_operation_time, session,
start_after)

def _command(self, sock_info, command, secondary_ok=False, value=1, check=True,
def _command(self, sock_info, command, value=1, check=True,
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
codec_options=DEFAULT_CODEC_OPTIONS,
write_concern=None,
Expand All @@ -506,7 +506,6 @@ def _command(self, sock_info, command, secondary_ok=False, value=1, check=True,
return sock_info.command(
self.__name,
command,
secondary_ok,
read_preference,
codec_options,
check,
Expand Down Expand Up @@ -605,8 +604,8 @@ def command(self, command, value=1, check=True,
read_preference = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
with self.__client._socket_for_reads(
read_preference, session) as (sock_info, secondary_ok):
return self._command(sock_info, command, secondary_ok, value,
read_preference, session) as (sock_info, read_preference):
return self._command(sock_info, command, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)

Expand All @@ -618,16 +617,15 @@ def _retryable_read_command(self, command, value=1, check=True,
read_preference = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)

def _cmd(session, server, sock_info, secondary_ok):
return self._command(sock_info, command, secondary_ok, value,
def _cmd(session, server, sock_info, read_preference):
return self._command(sock_info, command, value,
check, allowable_errors, read_preference,
codec_options, session=session, **kwargs)

return self.__client._retryable_read(
_cmd, read_preference, session)

def _list_collections(self, sock_info, secondary_okay, session,
read_preference, **kwargs):
def _list_collections(self, sock_info, session, read_preference, **kwargs):
"""Internal listCollections helper."""

coll = self.get_collection(
Expand All @@ -638,7 +636,7 @@ def _list_collections(self, sock_info, secondary_okay, session,
with self.__client._tmp_session(
session, close=False) as tmp_session:
cursor = self._command(
sock_info, cmd, secondary_okay,
sock_info, cmd,
read_preference=read_preference,
session=tmp_session)["cursor"]
cmd_cursor = CommandCursor(
Expand Down Expand Up @@ -674,9 +672,9 @@ def list_collections(self, session=None, filter=None, **kwargs):
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)

def _cmd(session, server, sock_info, secondary_okay):
def _cmd(session, server, sock_info, read_preference):
return self._list_collections(
sock_info, secondary_okay, session, read_preference=read_pref,
sock_info, session, read_preference=read_preference,
**kwargs)

return self.__client._retryable_read(
Expand Down
22 changes: 8 additions & 14 deletions pymongo/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ def as_command(self, sock_info):
self._as_command = cmd, self.db
return self._as_command

def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
def get_message(self, read_preference, sock_info, use_cmd=False):
"""Get a query message, possibly setting the secondaryOk bit."""
if set_secondary_ok:
if read_preference.mode:
# Set the secondaryOk bit.
flags = self.flags | 4
else:
Expand All @@ -330,8 +330,7 @@ def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
if use_cmd:
spec = self.as_command(sock_info)[0]
request_id, msg, size, _ = _op_msg(
0, spec, self.db, self.read_preference,
set_secondary_ok, self.codec_options,
0, spec, self.db, read_preference, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size

Expand All @@ -346,8 +345,7 @@ def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
ntoreturn = self.limit

if sock_info.is_mongos:
spec = _maybe_add_read_preference(spec,
self.read_preference)
spec = _maybe_add_read_preference(spec, read_preference)

return _query(flags, ns, self.ntoskip, ntoreturn,
spec, None if use_cmd else self.fields,
Expand Down Expand Up @@ -429,8 +427,7 @@ def get_message(self, dummy0, sock_info, use_cmd=False):
else:
flags = 0
request_id, msg, size, _ = _op_msg(
flags, spec, self.db, None,
False, self.codec_options,
flags, spec, self.db, None, self.codec_options,
ctx=sock_info.compression_context)
return request_id, msg, size

Expand Down Expand Up @@ -572,16 +569,13 @@ def _op_msg_uncompressed(flags, command, identifier, docs, opts):
_op_msg_uncompressed = _cmessage._op_msg


def _op_msg(flags, command, dbname, read_preference, secondary_ok,
opts, ctx=None):
def _op_msg(flags, command, dbname, read_preference, opts, ctx=None):
"""Get a OP_MSG message."""
command['$db'] = dbname
# getMore commands do not send $readPreference.
if read_preference is not None and "$readPreference" not in command:
if secondary_ok and not read_preference.mode:
command["$readPreference"] = (
ReadPreference.PRIMARY_PREFERRED.document)
else:
# Only send $readPreference if it's not primary (the default).
if read_preference.mode:
command["$readPreference"] = read_preference.document
name = next(iter(command))
try:
Expand Down
Loading