Skip to content

PYTHON-2554 Support $merge and $out executing on secondaries #774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changelog
Changes in Version 4.1
----------------------

PyMongo 4.0 brings a number of improvements including:

- :meth:`pymongo.collection.Collection.update_one`,
:meth:`pymongo.collection.Collection.update_many`,
:meth:`pymongo.collection.Collection.delete_one`,
Expand All @@ -15,6 +17,10 @@ Changes in Version 4.1
and :meth:`pymongo.collection.Collection.find` all support a new keyword
argument ``let`` which is a map of parameter names and values. Parameters
can then be accessed as variables in an aggregate expression context.
- :meth:`~pymongo.collection.Collection.aggregate` now supports
$merge and $out executing on secondaries on MongoDB >=5.0.
aggregate() now always obeys the collection's :attr:`read_preference` on
MongoDB >= 5.0.


Changes in Version 4.0
Expand Down
12 changes: 8 additions & 4 deletions pymongo/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pymongo import common
from pymongo.collation import validate_collation_or_none
from pymongo.errors import ConfigurationError
from pymongo.read_preferences import ReadPreference
from pymongo.read_preferences import _AggWritePref, ReadPreference


class _AggregationCommand(object):
Expand Down Expand Up @@ -70,6 +70,7 @@ def __init__(self, target, cursor_class, pipeline, options,
options.pop('collation', None))

self._max_await_time_ms = options.pop('maxAwaitTimeMS', None)
self._write_preference = None

@property
def _aggregation_target(self):
Expand Down Expand Up @@ -97,9 +98,12 @@ def _process_result(self, result, session, server, sock_info, secondary_ok):
result, session, server, sock_info, secondary_ok)

def get_read_preference(self, session):
if self._performs_write:
return ReadPreference.PRIMARY
return self._target._read_preference_for(session)
if self._write_preference:
return self._write_preference
pref = self._target._read_preference_for(session)
if self._performs_write and pref != ReadPreference.PRIMARY:
self._write_preference = pref = _AggWritePref(pref)
return pref

def get_cursor(self, session, server, sock_info, secondary_ok):
# Serialize command.
Expand Down
8 changes: 5 additions & 3 deletions pymongo/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1937,9 +1937,9 @@ def aggregate(self, pipeline, session=None, let=None, **kwargs):
collection.

The :meth:`aggregate` method obeys the :attr:`read_preference` of this
:class:`Collection`, except when ``$out`` or ``$merge`` are used, in
which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`
is used.
:class:`Collection`, except when ``$out`` or ``$merge`` are used on
MongoDB <5.0, in which case
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` is used.

.. note:: This method does not support the 'explain' option. Please
use :meth:`~pymongo.database.Database.command` instead. An
Expand Down Expand Up @@ -1981,6 +1981,8 @@ def aggregate(self, pipeline, session=None, let=None, **kwargs):

.. versionchanged:: 4.1
Added ``let`` parameter.
Support $merge and $out executing on secondaries according to the
collection's :attr:`read_preference`.
.. versionchanged:: 4.0
Removed the ``useCursor`` option.
.. versionchanged:: 3.9
Expand Down
2 changes: 1 addition & 1 deletion pymongo/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ def _secondaryok_for_server(self, read_preference, server, session):

with self._get_socket(server, session) as sock_info:
secondary_ok = (single and not sock_info.is_mongos) or (
read_preference != ReadPreference.PRIMARY)
read_preference.mode != ReadPreference.PRIMARY.mode)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required because read_preference might be a _AggWritePref.

yield sock_info, secondary_ok

@contextlib.contextmanager
Expand Down
39 changes: 39 additions & 0 deletions pymongo/read_preferences.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,45 @@ def __call__(self, selection):
self.max_staleness, selection))


class _AggWritePref:
"""Agg $out/$merge write preference.

* If there are readable servers and there is any pre-5.0 server, use
primary read preference.
* Otherwise use `pref` read preference.

:Parameters:
- `pref`: The read preference to use on MongoDB 5.0+.
"""

__slots__ = ('pref', 'effective_pref')

def __init__(self, pref):
self.pref = pref
self.effective_pref = ReadPreference.PRIMARY

def selection_hook(self, topology_description):
common_wv = topology_description.common_wire_version
if (topology_description.has_readable_server(
ReadPreference.PRIMARY_PREFERRED) and
common_wv and common_wv < 13):
self.effective_pref = ReadPreference.PRIMARY
else:
self.effective_pref = self.pref

def __call__(self, selection):
"""Apply this read preference to a Selection."""
return self.effective_pref(selection)

def __repr__(self):
return "_AggWritePref(pref=%r)" % (self.pref,)

# Proxy other calls to the effective_pref so that _AggWritePref can be
# used in place of an actual read preference.
def __getattr__(self, name):
return getattr(self.effective_pref, name)


_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
Secondary, SecondaryPreferred, Nearest)

Expand Down
17 changes: 10 additions & 7 deletions pymongo/topology_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from pymongo import common
from pymongo.errors import ConfigurationError
from pymongo.read_preferences import ReadPreference
from pymongo.read_preferences import ReadPreference, _AggWritePref
from pymongo.server_description import ServerDescription
from pymongo.server_selectors import Selection
from pymongo.server_type import SERVER_TYPE
Expand Down Expand Up @@ -263,21 +263,24 @@ def apply_selector(self, selector, address=None, custom_selector=None):
selector.min_wire_version,
common_wv))

if isinstance(selector, _AggWritePref):
selector.selection_hook(self)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I was expecting to place the "selection_hook" logic inside _AggWritePref.__call__ but it turns out that we skip calling the selector in many cases. In fact the only time we actually do call the selector is for replica sets. I attempted to refactor this code to actually call the selector in all cases but that doesn't work either because read preference selectors are written to only work with replica sets.

So "selection_hook" is what I came up with instead.


if self.topology_type == TOPOLOGY_TYPE.Unknown:
return []
elif self.topology_type in (TOPOLOGY_TYPE.Single,
TOPOLOGY_TYPE.LoadBalanced):
# Ignore selectors for standalone and load balancer mode.
return self.known_servers
elif address:
if address:
# Ignore selectors when explicit address is requested.
description = self.server_descriptions().get(address)
return [description] if description else []
elif self.topology_type == TOPOLOGY_TYPE.Sharded:
# Ignore read preference.
selection = Selection.from_topology_description(self)
else:
selection = selector(Selection.from_topology_description(self))

selection = Selection.from_topology_description(self)
# Ignore read preference for sharded clusters.
if self.topology_type != TOPOLOGY_TYPE.Sharded:
selection = selector(selection)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit is just a small cleanup with no functional change.


# Apply custom selector followed by localThresholdMS.
if custom_selector is not None and selection:
Expand Down
Loading