Skip to content

Commit e973c42

Browse files
ShaneHarveyjuliusgeo
authored andcommitted
PYTHON-1864 PYTHON-2931 Spec complaint $readPreference (mongodb#809)
Stop sending $readPreference to standalone servers. Stop sending $readPreference primary because it's the server default. Remove outdated secondary_ok flag.
1 parent 82fc772 commit e973c42

16 files changed

+123
-121
lines changed

pymongo/aggregation.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ def _database(self):
9292
"""The database against which the aggregation command is run."""
9393
raise NotImplementedError
9494

95-
def _process_result(self, result, session, server, sock_info, secondary_ok):
96-
if self._result_processor:
97-
self._result_processor(
98-
result, session, server, sock_info, secondary_ok)
99-
10095
def get_read_preference(self, session):
10196
if self._write_preference:
10297
return self._write_preference
@@ -105,7 +100,7 @@ def get_read_preference(self, session):
105100
self._write_preference = pref = _AggWritePref(pref)
106101
return pref
107102

108-
def get_cursor(self, session, server, sock_info, secondary_ok):
103+
def get_cursor(self, session, server, sock_info, read_preference):
109104
# Serialize command.
110105
cmd = SON([("aggregate", self._aggregation_target),
111106
("pipeline", self._pipeline)])
@@ -134,8 +129,7 @@ def get_cursor(self, session, server, sock_info, secondary_ok):
134129
result = sock_info.command(
135130
self._database.name,
136131
cmd,
137-
secondary_ok,
138-
self.get_read_preference(session),
132+
read_preference,
139133
self._target.codec_options,
140134
parse_write_concern_error=True,
141135
read_concern=read_concern,
@@ -145,7 +139,8 @@ def get_cursor(self, session, server, sock_info, secondary_ok):
145139
client=self._database.client,
146140
user_fields=self._user_fields)
147141

148-
self._process_result(result, session, server, sock_info, secondary_ok)
142+
if self._result_processor:
143+
self._result_processor(result, sock_info)
149144

150145
# Extract cursor from result or mock/fake one if necessary.
151146
if 'cursor' in result:

pymongo/change_stream.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def _aggregation_pipeline(self):
148148
full_pipeline.extend(self._pipeline)
149149
return full_pipeline
150150

151-
def _process_result(self, result, session, server, sock_info, secondary_ok):
151+
def _process_result(self, result, sock_info):
152152
"""Callback that caches the postBatchResumeToken or
153153
startAtOperationTime from a changeStream aggregate command response
154154
containing an empty batch of change documents.

pymongo/collection.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def _socket_for_reads(self, session):
186186
def _socket_for_writes(self, session):
187187
return self.__database.client._socket_for_writes(session)
188188

189-
def _command(self, sock_info, command, secondary_ok=False,
189+
def _command(self, sock_info, command,
190190
read_preference=None,
191191
codec_options=None, check=True, allowable_errors=None,
192192
read_concern=None,
@@ -200,7 +200,6 @@ def _command(self, sock_info, command, secondary_ok=False,
200200
:Parameters:
201201
- `sock_info` - A SocketInfo instance.
202202
- `command` - The command itself, as a SON instance.
203-
- `secondary_ok`: whether to set the secondaryOkay wire protocol bit.
204203
- `codec_options` (optional) - An instance of
205204
:class:`~bson.codec_options.CodecOptions`.
206205
- `check`: raise OperationFailure if there are errors
@@ -226,7 +225,6 @@ def _command(self, sock_info, command, secondary_ok=False,
226225
return sock_info.command(
227226
self.__database.name,
228227
command,
229-
secondary_ok,
230228
read_preference or self._read_preference_for(session),
231229
codec_options or self.codec_options,
232230
check,
@@ -1356,14 +1354,14 @@ def find_raw_batches(self, *args, **kwargs):
13561354

13571355
return RawBatchCursor(self, *args, **kwargs)
13581356

1359-
def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation):
1357+
def _count_cmd(self, session, sock_info, read_preference, cmd, collation):
13601358
"""Internal count command helper."""
13611359
# XXX: "ns missing" checks can be removed when we drop support for
13621360
# MongoDB 3.0, see SERVER-17051.
13631361
res = self._command(
13641362
sock_info,
13651363
cmd,
1366-
secondary_ok,
1364+
read_preference=read_preference,
13671365
allowable_errors=["ns missing"],
13681366
codec_options=self.__write_response_codec_options,
13691367
read_concern=self.read_concern,
@@ -1374,12 +1372,12 @@ def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation):
13741372
return int(res["n"])
13751373

13761374
def _aggregate_one_result(
1377-
self, sock_info, secondary_ok, cmd, collation, session):
1375+
self, sock_info, read_preference, cmd, collation, session):
13781376
"""Internal helper to run an aggregate that returns a single result."""
13791377
result = self._command(
13801378
sock_info,
13811379
cmd,
1382-
secondary_ok,
1380+
read_preference,
13831381
allowable_errors=[26], # Ignore NamespaceNotFound.
13841382
codec_options=self.__write_response_codec_options,
13851383
read_concern=self.read_concern,
@@ -1413,7 +1411,7 @@ def estimated_document_count(self, **kwargs):
14131411
raise ConfigurationError(
14141412
'estimated_document_count does not support sessions')
14151413

1416-
def _cmd(session, server, sock_info, secondary_ok):
1414+
def _cmd(session, server, sock_info, read_preference):
14171415
if sock_info.max_wire_version >= 12:
14181416
# MongoDB 4.9+
14191417
pipeline = [
@@ -1425,15 +1423,17 @@ def _cmd(session, server, sock_info, secondary_ok):
14251423
('cursor', {})])
14261424
cmd.update(kwargs)
14271425
result = self._aggregate_one_result(
1428-
sock_info, secondary_ok, cmd, collation=None, session=session)
1426+
sock_info, read_preference, cmd, collation=None,
1427+
session=session)
14291428
if not result:
14301429
return 0
14311430
return int(result['n'])
14321431
else:
14331432
# MongoDB < 4.9
14341433
cmd = SON([('count', self.__name)])
14351434
cmd.update(kwargs)
1436-
return self._count_cmd(None, sock_info, secondary_ok, cmd, None)
1435+
return self._count_cmd(
1436+
None, sock_info, read_preference, cmd, collation=None)
14371437

14381438
return self.__database.client._retryable_read(
14391439
_cmd, self.read_preference, None)
@@ -1506,9 +1506,9 @@ def count_documents(self, filter, session=None, **kwargs):
15061506
collation = validate_collation_or_none(kwargs.pop('collation', None))
15071507
cmd.update(kwargs)
15081508

1509-
def _cmd(session, server, sock_info, secondary_ok):
1509+
def _cmd(session, server, sock_info, read_preference):
15101510
result = self._aggregate_one_result(
1511-
sock_info, secondary_ok, cmd, collation, session)
1511+
sock_info, read_preference, cmd, collation, session)
15121512
if not result:
15131513
return 0
15141514
return result['n']
@@ -1799,12 +1799,12 @@ def list_indexes(self, session=None):
17991799
read_pref = ((session and session._txn_read_preference())
18001800
or ReadPreference.PRIMARY)
18011801

1802-
def _cmd(session, server, sock_info, secondary_ok):
1802+
def _cmd(session, server, sock_info, read_preference):
18031803
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
18041804
with self.__database.client._tmp_session(session, False) as s:
18051805
try:
1806-
cursor = self._command(sock_info, cmd, secondary_ok,
1807-
read_pref,
1806+
cursor = self._command(sock_info, cmd,
1807+
read_preference,
18081808
codec_options,
18091809
session=s)["cursor"]
18101810
except OperationFailure as exc:
@@ -2220,9 +2220,10 @@ def distinct(self, key, filter=None, session=None, **kwargs):
22202220
kwargs["query"] = filter
22212221
collation = validate_collation_or_none(kwargs.pop('collation', None))
22222222
cmd.update(kwargs)
2223-
def _cmd(session, server, sock_info, secondary_ok):
2223+
def _cmd(session, server, sock_info, read_preference):
22242224
return self._command(
2225-
sock_info, cmd, secondary_ok, read_concern=self.read_concern,
2225+
sock_info, cmd, read_preference=read_preference,
2226+
read_concern=self.read_concern,
22262227
collation=collation, session=session,
22272228
user_fields={"values": 1})["values"]
22282229

pymongo/database.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ def watch(self, pipeline=None, full_document=None, resume_after=None,
492492
batch_size, collation, start_at_operation_time, session,
493493
start_after)
494494

495-
def _command(self, sock_info, command, secondary_ok=False, value=1, check=True,
495+
def _command(self, sock_info, command, value=1, check=True,
496496
allowable_errors=None, read_preference=ReadPreference.PRIMARY,
497497
codec_options=DEFAULT_CODEC_OPTIONS,
498498
write_concern=None,
@@ -506,7 +506,6 @@ def _command(self, sock_info, command, secondary_ok=False, value=1, check=True,
506506
return sock_info.command(
507507
self.__name,
508508
command,
509-
secondary_ok,
510509
read_preference,
511510
codec_options,
512511
check,
@@ -605,8 +604,8 @@ def command(self, command, value=1, check=True,
605604
read_preference = ((session and session._txn_read_preference())
606605
or ReadPreference.PRIMARY)
607606
with self.__client._socket_for_reads(
608-
read_preference, session) as (sock_info, secondary_ok):
609-
return self._command(sock_info, command, secondary_ok, value,
607+
read_preference, session) as (sock_info, read_preference):
608+
return self._command(sock_info, command, value,
610609
check, allowable_errors, read_preference,
611610
codec_options, session=session, **kwargs)
612611

@@ -618,16 +617,15 @@ def _retryable_read_command(self, command, value=1, check=True,
618617
read_preference = ((session and session._txn_read_preference())
619618
or ReadPreference.PRIMARY)
620619

621-
def _cmd(session, server, sock_info, secondary_ok):
622-
return self._command(sock_info, command, secondary_ok, value,
620+
def _cmd(session, server, sock_info, read_preference):
621+
return self._command(sock_info, command, value,
623622
check, allowable_errors, read_preference,
624623
codec_options, session=session, **kwargs)
625624

626625
return self.__client._retryable_read(
627626
_cmd, read_preference, session)
628627

629-
def _list_collections(self, sock_info, secondary_okay, session,
630-
read_preference, **kwargs):
628+
def _list_collections(self, sock_info, session, read_preference, **kwargs):
631629
"""Internal listCollections helper."""
632630

633631
coll = self.get_collection(
@@ -638,7 +636,7 @@ def _list_collections(self, sock_info, secondary_okay, session,
638636
with self.__client._tmp_session(
639637
session, close=False) as tmp_session:
640638
cursor = self._command(
641-
sock_info, cmd, secondary_okay,
639+
sock_info, cmd,
642640
read_preference=read_preference,
643641
session=tmp_session)["cursor"]
644642
cmd_cursor = CommandCursor(
@@ -674,9 +672,9 @@ def list_collections(self, session=None, filter=None, **kwargs):
674672
read_pref = ((session and session._txn_read_preference())
675673
or ReadPreference.PRIMARY)
676674

677-
def _cmd(session, server, sock_info, secondary_okay):
675+
def _cmd(session, server, sock_info, read_preference):
678676
return self._list_collections(
679-
sock_info, secondary_okay, session, read_preference=read_pref,
677+
sock_info, session, read_preference=read_preference,
680678
**kwargs)
681679

682680
return self.__client._retryable_read(

pymongo/message.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,9 @@ def as_command(self, sock_info):
316316
self._as_command = cmd, self.db
317317
return self._as_command
318318

319-
def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
319+
def get_message(self, read_preference, sock_info, use_cmd=False):
320320
"""Get a query message, possibly setting the secondaryOk bit."""
321-
if set_secondary_ok:
321+
if read_preference.mode:
322322
# Set the secondaryOk bit.
323323
flags = self.flags | 4
324324
else:
@@ -330,8 +330,7 @@ def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
330330
if use_cmd:
331331
spec = self.as_command(sock_info)[0]
332332
request_id, msg, size, _ = _op_msg(
333-
0, spec, self.db, self.read_preference,
334-
set_secondary_ok, self.codec_options,
333+
0, spec, self.db, read_preference, self.codec_options,
335334
ctx=sock_info.compression_context)
336335
return request_id, msg, size
337336

@@ -346,8 +345,7 @@ def get_message(self, set_secondary_ok, sock_info, use_cmd=False):
346345
ntoreturn = self.limit
347346

348347
if sock_info.is_mongos:
349-
spec = _maybe_add_read_preference(spec,
350-
self.read_preference)
348+
spec = _maybe_add_read_preference(spec, read_preference)
351349

352350
return _query(flags, ns, self.ntoskip, ntoreturn,
353351
spec, None if use_cmd else self.fields,
@@ -429,8 +427,7 @@ def get_message(self, dummy0, sock_info, use_cmd=False):
429427
else:
430428
flags = 0
431429
request_id, msg, size, _ = _op_msg(
432-
flags, spec, self.db, None,
433-
False, self.codec_options,
430+
flags, spec, self.db, None, self.codec_options,
434431
ctx=sock_info.compression_context)
435432
return request_id, msg, size
436433

@@ -572,16 +569,13 @@ def _op_msg_uncompressed(flags, command, identifier, docs, opts):
572569
_op_msg_uncompressed = _cmessage._op_msg
573570

574571

575-
def _op_msg(flags, command, dbname, read_preference, secondary_ok,
576-
opts, ctx=None):
572+
def _op_msg(flags, command, dbname, read_preference, opts, ctx=None):
577573
"""Get a OP_MSG message."""
578574
command['$db'] = dbname
579575
# getMore commands do not send $readPreference.
580576
if read_preference is not None and "$readPreference" not in command:
581-
if secondary_ok and not read_preference.mode:
582-
command["$readPreference"] = (
583-
ReadPreference.PRIMARY_PREFERRED.document)
584-
else:
577+
# Only send $readPreference if it's not primary (the default).
578+
if read_preference.mode:
585579
command["$readPreference"] = read_preference.document
586580
name = next(iter(command))
587581
try:

0 commit comments

Comments
 (0)