Skip to content

Commit 6d0e50b

Browse files
ShaneHarveyjuliusgeo
authored andcommitted
PYTHON-2554 Support aggregate $merge and $out executing on secondaries (mongodb#774)
1 parent b8734f1 commit 6d0e50b

9 files changed

+978
-16
lines changed

doc/changelog.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ Changelog
44
Changes in Version 4.1
55
----------------------
66

7+
PyMongo 4.0 brings a number of improvements including:
8+
79
- :meth:`pymongo.collection.Collection.update_one`,
810
:meth:`pymongo.collection.Collection.update_many`,
911
:meth:`pymongo.collection.Collection.delete_one`,
@@ -15,6 +17,10 @@ Changes in Version 4.1
1517
and :meth:`pymongo.collection.Collection.find` all support a new keyword
1618
argument ``let`` which is a map of parameter names and values. Parameters
1719
can then be accessed as variables in an aggregate expression context.
20+
- :meth:`~pymongo.collection.Collection.aggregate` now supports
21+
$merge and $out executing on secondaries on MongoDB >=5.0.
22+
aggregate() now always obeys the collection's :attr:`read_preference` on
23+
MongoDB >= 5.0.
1824

1925

2026
Changes in Version 4.0

pymongo/aggregation.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from pymongo import common
2020
from pymongo.collation import validate_collation_or_none
2121
from pymongo.errors import ConfigurationError
22-
from pymongo.read_preferences import ReadPreference
22+
from pymongo.read_preferences import _AggWritePref, ReadPreference
2323

2424

2525
class _AggregationCommand(object):
@@ -70,6 +70,7 @@ def __init__(self, target, cursor_class, pipeline, options,
7070
options.pop('collation', None))
7171

7272
self._max_await_time_ms = options.pop('maxAwaitTimeMS', None)
73+
self._write_preference = None
7374

7475
@property
7576
def _aggregation_target(self):
@@ -97,9 +98,12 @@ def _process_result(self, result, session, server, sock_info, secondary_ok):
9798
result, session, server, sock_info, secondary_ok)
9899

99100
def get_read_preference(self, session):
100-
if self._performs_write:
101-
return ReadPreference.PRIMARY
102-
return self._target._read_preference_for(session)
101+
if self._write_preference:
102+
return self._write_preference
103+
pref = self._target._read_preference_for(session)
104+
if self._performs_write and pref != ReadPreference.PRIMARY:
105+
self._write_preference = pref = _AggWritePref(pref)
106+
return pref
103107

104108
def get_cursor(self, session, server, sock_info, secondary_ok):
105109
# Serialize command.

pymongo/collection.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1915,9 +1915,9 @@ def aggregate(self, pipeline, session=None, let=None, **kwargs):
19151915
collection.
19161916
19171917
The :meth:`aggregate` method obeys the :attr:`read_preference` of this
1918-
:class:`Collection`, except when ``$out`` or ``$merge`` are used, in
1919-
which case :attr:`~pymongo.read_preferences.ReadPreference.PRIMARY`
1920-
is used.
1918+
:class:`Collection`, except when ``$out`` or ``$merge`` are used on
1919+
MongoDB <5.0, in which case
1920+
:attr:`~pymongo.read_preferences.ReadPreference.PRIMARY` is used.
19211921
19221922
.. note:: This method does not support the 'explain' option. Please
19231923
use :meth:`~pymongo.database.Database.command` instead. An
@@ -1958,6 +1958,8 @@ def aggregate(self, pipeline, session=None, let=None, **kwargs):
19581958
19591959
.. versionchanged:: 4.1
19601960
Added ``let`` parameter.
1961+
Support $merge and $out executing on secondaries according to the
1962+
collection's :attr:`read_preference`.
19611963
.. versionchanged:: 4.0
19621964
Removed the ``useCursor`` option.
19631965
.. versionchanged:: 3.9

pymongo/mongo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1155,7 +1155,7 @@ def _secondaryok_for_server(self, read_preference, server, session):
11551155

11561156
with self._get_socket(server, session) as sock_info:
11571157
secondary_ok = (single and not sock_info.is_mongos) or (
1158-
read_preference != ReadPreference.PRIMARY)
1158+
read_preference.mode != ReadPreference.PRIMARY.mode)
11591159
yield sock_info, secondary_ok
11601160

11611161
@contextlib.contextmanager

pymongo/read_preferences.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,45 @@ def __call__(self, selection):
424424
self.max_staleness, selection))
425425

426426

427+
class _AggWritePref:
428+
"""Agg $out/$merge write preference.
429+
430+
* If there are readable servers and there is any pre-5.0 server, use
431+
primary read preference.
432+
* Otherwise use `pref` read preference.
433+
434+
:Parameters:
435+
- `pref`: The read preference to use on MongoDB 5.0+.
436+
"""
437+
438+
__slots__ = ('pref', 'effective_pref')
439+
440+
def __init__(self, pref):
441+
self.pref = pref
442+
self.effective_pref = ReadPreference.PRIMARY
443+
444+
def selection_hook(self, topology_description):
445+
common_wv = topology_description.common_wire_version
446+
if (topology_description.has_readable_server(
447+
ReadPreference.PRIMARY_PREFERRED) and
448+
common_wv and common_wv < 13):
449+
self.effective_pref = ReadPreference.PRIMARY
450+
else:
451+
self.effective_pref = self.pref
452+
453+
def __call__(self, selection):
454+
"""Apply this read preference to a Selection."""
455+
return self.effective_pref(selection)
456+
457+
def __repr__(self):
458+
return "_AggWritePref(pref=%r)" % (self.pref,)
459+
460+
# Proxy other calls to the effective_pref so that _AggWritePref can be
461+
# used in place of an actual read preference.
462+
def __getattr__(self, name):
463+
return getattr(self.effective_pref, name)
464+
465+
427466
_ALL_READ_PREFERENCES = (Primary, PrimaryPreferred,
428467
Secondary, SecondaryPreferred, Nearest)
429468

pymongo/topology_description.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from pymongo import common
2121
from pymongo.errors import ConfigurationError
22-
from pymongo.read_preferences import ReadPreference
22+
from pymongo.read_preferences import ReadPreference, _AggWritePref
2323
from pymongo.server_description import ServerDescription
2424
from pymongo.server_selectors import Selection
2525
from pymongo.server_type import SERVER_TYPE
@@ -263,21 +263,24 @@ def apply_selector(self, selector, address=None, custom_selector=None):
263263
selector.min_wire_version,
264264
common_wv))
265265

266+
if isinstance(selector, _AggWritePref):
267+
selector.selection_hook(self)
268+
266269
if self.topology_type == TOPOLOGY_TYPE.Unknown:
267270
return []
268271
elif self.topology_type in (TOPOLOGY_TYPE.Single,
269272
TOPOLOGY_TYPE.LoadBalanced):
270273
# Ignore selectors for standalone and load balancer mode.
271274
return self.known_servers
272-
elif address:
275+
if address:
273276
# Ignore selectors when explicit address is requested.
274277
description = self.server_descriptions().get(address)
275278
return [description] if description else []
276-
elif self.topology_type == TOPOLOGY_TYPE.Sharded:
277-
# Ignore read preference.
278-
selection = Selection.from_topology_description(self)
279-
else:
280-
selection = selector(Selection.from_topology_description(self))
279+
280+
selection = Selection.from_topology_description(self)
281+
# Ignore read preference for sharded clusters.
282+
if self.topology_type != TOPOLOGY_TYPE.Sharded:
283+
selection = selector(selection)
281284

282285
# Apply custom selector followed by localThresholdMS.
283286
if custom_selector is not None and selection:

0 commit comments

Comments
 (0)