diff --git a/pymongo/aggregation.py b/pymongo/aggregation.py index a5a7abaed7..8fb0225eb3 100644 --- a/pymongo/aggregation.py +++ b/pymongo/aggregation.py @@ -92,11 +92,6 @@ def _database(self): """The database against which the aggregation command is run.""" raise NotImplementedError - def _process_result(self, result, session, server, sock_info, secondary_ok): - if self._result_processor: - self._result_processor( - result, session, server, sock_info, secondary_ok) - def get_read_preference(self, session): if self._write_preference: return self._write_preference @@ -105,7 +100,7 @@ def get_read_preference(self, session): self._write_preference = pref = _AggWritePref(pref) return pref - def get_cursor(self, session, server, sock_info, secondary_ok): + def get_cursor(self, session, server, sock_info, read_preference): # Serialize command. cmd = SON([("aggregate", self._aggregation_target), ("pipeline", self._pipeline)]) @@ -134,8 +129,7 @@ def get_cursor(self, session, server, sock_info, secondary_ok): result = sock_info.command( self._database.name, cmd, - secondary_ok, - self.get_read_preference(session), + read_preference, self._target.codec_options, parse_write_concern_error=True, read_concern=read_concern, @@ -145,7 +139,8 @@ def get_cursor(self, session, server, sock_info, secondary_ok): client=self._database.client, user_fields=self._user_fields) - self._process_result(result, session, server, sock_info, secondary_ok) + if self._result_processor: + self._result_processor(result, sock_info) # Extract cursor from result or mock/fake one if necessary. if 'cursor' in result: diff --git a/pymongo/change_stream.py b/pymongo/change_stream.py index 00d049a838..54bf98d83e 100644 --- a/pymongo/change_stream.py +++ b/pymongo/change_stream.py @@ -148,7 +148,7 @@ def _aggregation_pipeline(self): full_pipeline.extend(self._pipeline) return full_pipeline - def _process_result(self, result, session, server, sock_info, secondary_ok): + def _process_result(self, result, sock_info): """Callback that caches the postBatchResumeToken or startAtOperationTime from a changeStream aggregate command response containing an empty batch of change documents. diff --git a/pymongo/collection.py b/pymongo/collection.py index 70c13c34f4..82e29f4061 100644 --- a/pymongo/collection.py +++ b/pymongo/collection.py @@ -186,7 +186,7 @@ def _socket_for_reads(self, session): def _socket_for_writes(self, session): return self.__database.client._socket_for_writes(session) - def _command(self, sock_info, command, secondary_ok=False, + def _command(self, sock_info, command, read_preference=None, codec_options=None, check=True, allowable_errors=None, read_concern=None, @@ -200,7 +200,6 @@ def _command(self, sock_info, command, secondary_ok=False, :Parameters: - `sock_info` - A SocketInfo instance. - `command` - The command itself, as a SON instance. - - `secondary_ok`: whether to set the secondaryOkay wire protocol bit. - `codec_options` (optional) - An instance of :class:`~bson.codec_options.CodecOptions`. - `check`: raise OperationFailure if there are errors @@ -226,7 +225,6 @@ def _command(self, sock_info, command, secondary_ok=False, return sock_info.command( self.__database.name, command, - secondary_ok, read_preference or self._read_preference_for(session), codec_options or self.codec_options, check, @@ -1356,14 +1354,14 @@ def find_raw_batches(self, *args, **kwargs): return RawBatchCursor(self, *args, **kwargs) - def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation): + def _count_cmd(self, session, sock_info, read_preference, cmd, collation): """Internal count command helper.""" # XXX: "ns missing" checks can be removed when we drop support for # MongoDB 3.0, see SERVER-17051. res = self._command( sock_info, cmd, - secondary_ok, + read_preference=read_preference, allowable_errors=["ns missing"], codec_options=self.__write_response_codec_options, read_concern=self.read_concern, @@ -1374,12 +1372,12 @@ def _count_cmd(self, session, sock_info, secondary_ok, cmd, collation): return int(res["n"]) def _aggregate_one_result( - self, sock_info, secondary_ok, cmd, collation, session): + self, sock_info, read_preference, cmd, collation, session): """Internal helper to run an aggregate that returns a single result.""" result = self._command( sock_info, cmd, - secondary_ok, + read_preference, allowable_errors=[26], # Ignore NamespaceNotFound. codec_options=self.__write_response_codec_options, read_concern=self.read_concern, @@ -1413,7 +1411,7 @@ def estimated_document_count(self, **kwargs): raise ConfigurationError( 'estimated_document_count does not support sessions') - def _cmd(session, server, sock_info, secondary_ok): + def _cmd(session, server, sock_info, read_preference): if sock_info.max_wire_version >= 12: # MongoDB 4.9+ pipeline = [ @@ -1425,7 +1423,8 @@ def _cmd(session, server, sock_info, secondary_ok): ('cursor', {})]) cmd.update(kwargs) result = self._aggregate_one_result( - sock_info, secondary_ok, cmd, collation=None, session=session) + sock_info, read_preference, cmd, collation=None, + session=session) if not result: return 0 return int(result['n']) @@ -1433,7 +1432,8 @@ def _cmd(session, server, sock_info, secondary_ok): # MongoDB < 4.9 cmd = SON([('count', self.__name)]) cmd.update(kwargs) - return self._count_cmd(None, sock_info, secondary_ok, cmd, None) + return self._count_cmd( + None, sock_info, read_preference, cmd, collation=None) return self.__database.client._retryable_read( _cmd, self.read_preference, None) @@ -1506,9 +1506,9 @@ def count_documents(self, filter, session=None, **kwargs): collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - def _cmd(session, server, sock_info, secondary_ok): + def _cmd(session, server, sock_info, read_preference): result = self._aggregate_one_result( - sock_info, secondary_ok, cmd, collation, session) + sock_info, read_preference, cmd, collation, session) if not result: return 0 return result['n'] @@ -1799,12 +1799,12 @@ def list_indexes(self, session=None): read_pref = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) - def _cmd(session, server, sock_info, secondary_ok): + def _cmd(session, server, sock_info, read_preference): cmd = SON([("listIndexes", self.__name), ("cursor", {})]) with self.__database.client._tmp_session(session, False) as s: try: - cursor = self._command(sock_info, cmd, secondary_ok, - read_pref, + cursor = self._command(sock_info, cmd, + read_preference, codec_options, session=s)["cursor"] except OperationFailure as exc: @@ -2220,9 +2220,10 @@ def distinct(self, key, filter=None, session=None, **kwargs): kwargs["query"] = filter collation = validate_collation_or_none(kwargs.pop('collation', None)) cmd.update(kwargs) - def _cmd(session, server, sock_info, secondary_ok): + def _cmd(session, server, sock_info, read_preference): return self._command( - sock_info, cmd, secondary_ok, read_concern=self.read_concern, + sock_info, cmd, read_preference=read_preference, + read_concern=self.read_concern, collation=collation, session=session, user_fields={"values": 1})["values"] diff --git a/pymongo/database.py b/pymongo/database.py index 33ae4038c8..a6c1275126 100644 --- a/pymongo/database.py +++ b/pymongo/database.py @@ -492,7 +492,7 @@ def watch(self, pipeline=None, full_document=None, resume_after=None, batch_size, collation, start_at_operation_time, session, start_after) - def _command(self, sock_info, command, secondary_ok=False, value=1, check=True, + def _command(self, sock_info, command, value=1, check=True, allowable_errors=None, read_preference=ReadPreference.PRIMARY, codec_options=DEFAULT_CODEC_OPTIONS, write_concern=None, @@ -506,7 +506,6 @@ def _command(self, sock_info, command, secondary_ok=False, value=1, check=True, return sock_info.command( self.__name, command, - secondary_ok, read_preference, codec_options, check, @@ -605,8 +604,8 @@ def command(self, command, value=1, check=True, read_preference = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) with self.__client._socket_for_reads( - read_preference, session) as (sock_info, secondary_ok): - return self._command(sock_info, command, secondary_ok, value, + read_preference, session) as (sock_info, read_preference): + return self._command(sock_info, command, value, check, allowable_errors, read_preference, codec_options, session=session, **kwargs) @@ -618,16 +617,15 @@ def _retryable_read_command(self, command, value=1, check=True, read_preference = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) - def _cmd(session, server, sock_info, secondary_ok): - return self._command(sock_info, command, secondary_ok, value, + def _cmd(session, server, sock_info, read_preference): + return self._command(sock_info, command, value, check, allowable_errors, read_preference, codec_options, session=session, **kwargs) return self.__client._retryable_read( _cmd, read_preference, session) - def _list_collections(self, sock_info, secondary_okay, session, - read_preference, **kwargs): + def _list_collections(self, sock_info, session, read_preference, **kwargs): """Internal listCollections helper.""" coll = self.get_collection( @@ -638,7 +636,7 @@ def _list_collections(self, sock_info, secondary_okay, session, with self.__client._tmp_session( session, close=False) as tmp_session: cursor = self._command( - sock_info, cmd, secondary_okay, + sock_info, cmd, read_preference=read_preference, session=tmp_session)["cursor"] cmd_cursor = CommandCursor( @@ -674,9 +672,9 @@ def list_collections(self, session=None, filter=None, **kwargs): read_pref = ((session and session._txn_read_preference()) or ReadPreference.PRIMARY) - def _cmd(session, server, sock_info, secondary_okay): + def _cmd(session, server, sock_info, read_preference): return self._list_collections( - sock_info, secondary_okay, session, read_preference=read_pref, + sock_info, session, read_preference=read_preference, **kwargs) return self.__client._retryable_read( diff --git a/pymongo/message.py b/pymongo/message.py index bccf0a9f51..2e09df457e 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -316,9 +316,9 @@ def as_command(self, sock_info): self._as_command = cmd, self.db return self._as_command - def get_message(self, set_secondary_ok, sock_info, use_cmd=False): + def get_message(self, read_preference, sock_info, use_cmd=False): """Get a query message, possibly setting the secondaryOk bit.""" - if set_secondary_ok: + if read_preference.mode: # Set the secondaryOk bit. flags = self.flags | 4 else: @@ -330,8 +330,7 @@ def get_message(self, set_secondary_ok, sock_info, use_cmd=False): if use_cmd: spec = self.as_command(sock_info)[0] request_id, msg, size, _ = _op_msg( - 0, spec, self.db, self.read_preference, - set_secondary_ok, self.codec_options, + 0, spec, self.db, read_preference, self.codec_options, ctx=sock_info.compression_context) return request_id, msg, size @@ -346,8 +345,7 @@ def get_message(self, set_secondary_ok, sock_info, use_cmd=False): ntoreturn = self.limit if sock_info.is_mongos: - spec = _maybe_add_read_preference(spec, - self.read_preference) + spec = _maybe_add_read_preference(spec, read_preference) return _query(flags, ns, self.ntoskip, ntoreturn, spec, None if use_cmd else self.fields, @@ -429,8 +427,7 @@ def get_message(self, dummy0, sock_info, use_cmd=False): else: flags = 0 request_id, msg, size, _ = _op_msg( - flags, spec, self.db, None, - False, self.codec_options, + flags, spec, self.db, None, self.codec_options, ctx=sock_info.compression_context) return request_id, msg, size @@ -572,16 +569,13 @@ def _op_msg_uncompressed(flags, command, identifier, docs, opts): _op_msg_uncompressed = _cmessage._op_msg -def _op_msg(flags, command, dbname, read_preference, secondary_ok, - opts, ctx=None): +def _op_msg(flags, command, dbname, read_preference, opts, ctx=None): """Get a OP_MSG message.""" command['$db'] = dbname # getMore commands do not send $readPreference. if read_preference is not None and "$readPreference" not in command: - if secondary_ok and not read_preference.mode: - command["$readPreference"] = ( - ReadPreference.PRIMARY_PREFERRED.document) - else: + # Only send $readPreference if it's not primary (the default). + if read_preference.mode: command["$readPreference"] = read_preference.document name = next(iter(command)) try: diff --git a/pymongo/mongo_client.py b/pymongo/mongo_client.py index 87c87c0241..a133c96a7f 100644 --- a/pymongo/mongo_client.py +++ b/pymongo/mongo_client.py @@ -1025,7 +1025,7 @@ def _end_sessions(self, session_ids): # another session. with self._socket_for_reads( ReadPreference.PRIMARY_PREFERRED, - None) as (sock_info, secondary_ok): + None) as (sock_info, read_pref): if not sock_info.supports_sessions: return @@ -1033,7 +1033,7 @@ def _end_sessions(self, session_ids): spec = SON([('endSessions', session_ids[i:i + common._MAX_END_SESSIONS])]) sock_info.command( - 'admin', spec, secondary_ok=secondary_ok, client=self) + 'admin', spec, read_preference=read_pref, client=self) except PyMongoError: # Drivers MUST ignore any errors returned by the endSessions # command. @@ -1136,39 +1136,33 @@ def _socket_for_writes(self, session): return self._get_socket(server, session) @contextlib.contextmanager - def _secondaryok_for_server(self, read_preference, server, session): + def _socket_from_server(self, read_preference, server, session): assert read_preference is not None, "read_preference must not be None" # Get a socket for a server matching the read preference, and yield - # sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must - # be sent to mongods with topology type Single. If the server type is - # Mongos, follow the rules for passing read preference to mongos, even - # for topology type Single." + # sock_info with the effective read preference. The Server Selection + # Spec says not to send any $readPreference to standalones and to + # always send primaryPreferred when directly connected to a repl set + # member. # Thread safe: if the type is single it cannot change. topology = self._get_topology() single = topology.description.topology_type == TOPOLOGY_TYPE.Single with self._get_socket(server, session) as sock_info: - secondary_ok = (single and not sock_info.is_mongos) or ( - read_preference.mode != ReadPreference.PRIMARY.mode) - yield sock_info, secondary_ok + if single: + if sock_info.is_repl: + # Use primary preferred to ensure any repl set member + # can handle the request. + read_preference = ReadPreference.PRIMARY_PREFERRED + elif sock_info.is_standalone: + # Don't send read preference to standalones. + read_preference = ReadPreference.PRIMARY + yield sock_info, read_preference - @contextlib.contextmanager def _socket_for_reads(self, read_preference, session): assert read_preference is not None, "read_preference must not be None" - # Get a socket for a server matching the read preference, and yield - # sock_info, secondary_ok. Server Selection Spec: "SecondaryOK must be - # sent to mongods with topology type Single. If the server type is - # Mongos, follow the rules for passing read preference to mongos, even - # for topology type Single." - # Thread safe: if the type is single it cannot change. topology = self._get_topology() server = self._select_server(read_preference, session) - single = topology.description.topology_type == TOPOLOGY_TYPE.Single - - with self._get_socket(server, session) as sock_info: - secondary_ok = (single and not sock_info.is_mongos) or ( - read_preference != ReadPreference.PRIMARY) - yield sock_info, secondary_ok + return self._socket_from_server(read_preference, server, session) def _should_pin_cursor(self, session): return (self.__options.load_balanced and @@ -1195,9 +1189,9 @@ def _run_operation(self, operation, unpack_res, address=None): operation.sock_mgr.sock, operation, True, self._event_listeners, unpack_res) - def _cmd(session, server, sock_info, secondary_ok): + def _cmd(session, server, sock_info, read_preference): return server.run_operation( - sock_info, operation, secondary_ok, self._event_listeners, + sock_info, operation, read_preference, self._event_listeners, unpack_res) return self._retryable_read( @@ -1292,13 +1286,13 @@ def _retryable_read(self, func, read_pref, session, address=None, try: server = self._select_server( read_pref, session, address=address) - with self._secondaryok_for_server(read_pref, server, session) as ( - sock_info, secondary_ok): + with self._socket_from_server(read_pref, server, session) as ( + sock_info, read_pref): if retrying and not retryable: # A retry is not possible because this server does # not support retryable reads, raise the last error. raise last_error - return func(session, server, sock_info, secondary_ok) + return func(session, server, sock_info, read_pref) except ServerSelectionTimeoutError: if retrying: # The application may think the write was never attempted diff --git a/pymongo/network.py b/pymongo/network.py index 10d71308f6..a14e9924a4 100644 --- a/pymongo/network.py +++ b/pymongo/network.py @@ -38,7 +38,7 @@ _UNPACK_HEADER = struct.Struct(" max_bson_size + message._COMMAND_OVERHEAD): diff --git a/pymongo/pool.py b/pymongo/pool.py index 99e64d8b2b..88b0e09737 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -608,6 +608,10 @@ def _hello(self, cluster_time, topology_version, heartbeat_frequency): self.supports_sessions = ( hello.logical_session_timeout_minutes is not None) self.hello_ok = hello.hello_ok + self.is_repl = hello.server_type in ( + SERVER_TYPE.RSPrimary, SERVER_TYPE.RSSecondary, + SERVER_TYPE.RSArbiter, SERVER_TYPE.RSOther, SERVER_TYPE.RSGhost) + self.is_standalone = hello.server_type == SERVER_TYPE.Standalone self.is_mongos = hello.server_type == SERVER_TYPE.Mongos if performing_handshake and self.compression_settings: ctx = self.compression_settings.get_compression_context( @@ -641,7 +645,7 @@ def _next_reply(self): response_doc.pop('serviceId', None) return response_doc - def command(self, dbname, spec, secondary_ok=False, + def command(self, dbname, spec, read_preference=ReadPreference.PRIMARY, codec_options=DEFAULT_CODEC_OPTIONS, check=True, allowable_errors=None, @@ -660,7 +664,6 @@ def command(self, dbname, spec, secondary_ok=False, :Parameters: - `dbname`: name of the database on which to run the command - `spec`: a command document as a dict, SON, or mapping object - - `secondary_ok`: whether to set the secondaryOkay wire protocol bit - `read_preference`: a read preference - `codec_options`: a CodecOptions instance - `check`: raise OperationFailure if there are errors @@ -703,7 +706,7 @@ def command(self, dbname, spec, secondary_ok=False, if self.op_msg_enabled: self._raise_if_not_writable(unacknowledged) try: - return command(self, dbname, spec, secondary_ok, + return command(self, dbname, spec, self.is_mongos, read_preference, codec_options, session, client, check, allowable_errors, self.address, listeners, diff --git a/pymongo/server.py b/pymongo/server.py index 2a0a7267b7..8464cbbc6e 100644 --- a/pymongo/server.py +++ b/pymongo/server.py @@ -68,7 +68,7 @@ def request_check(self): """Check the server's state soon.""" self._monitor.request_check() - def run_operation(self, sock_info, operation, set_secondary_okay, listeners, + def run_operation(self, sock_info, operation, read_preference, listeners, unpack_res): """Run a _Query or _GetMore operation and return a Response object. @@ -95,7 +95,7 @@ def run_operation(self, sock_info, operation, set_secondary_okay, listeners, request_id = 0 else: message = operation.get_message( - set_secondary_okay, sock_info, use_cmd) + read_preference, sock_info, use_cmd) request_id, data, max_doc_size = self._split_message(message) if publish: diff --git a/test/crud/unified/aggregate-write-readPreference.json b/test/crud/unified/aggregate-write-readPreference.json index 28327e8d83..bc887e83cb 100644 --- a/test/crud/unified/aggregate-write-readPreference.json +++ b/test/crud/unified/aggregate-write-readPreference.json @@ -237,7 +237,7 @@ } ], "$readPreference": { - "mode": "primary" + "$$exists": false }, "readConcern": { "level": "local" @@ -425,7 +425,7 @@ } ], "$readPreference": { - "mode": "primary" + "$$exists": false }, "readConcern": { "level": "local" diff --git a/test/crud/unified/db-aggregate-write-readPreference.json b/test/crud/unified/db-aggregate-write-readPreference.json index 269299e3c7..2a81282de8 100644 --- a/test/crud/unified/db-aggregate-write-readPreference.json +++ b/test/crud/unified/db-aggregate-write-readPreference.json @@ -222,7 +222,7 @@ } ], "$readPreference": { - "mode": "primary" + "$$exists": false }, "readConcern": { "level": "local" @@ -416,7 +416,7 @@ } ], "$readPreference": { - "mode": "primary" + "$$exists": false }, "readConcern": { "level": "local" diff --git a/test/mockupdb/test_mongos_command_read_mode.py b/test/mockupdb/test_mongos_command_read_mode.py index ccd40c2cd7..49aee27047 100644 --- a/test/mockupdb/test_mongos_command_read_mode.py +++ b/test/mockupdb/test_mongos_command_read_mode.py @@ -79,8 +79,11 @@ def test(self): slave_ok = False elif operation.op_type == 'may-use-secondary': slave_ok = mode != 'primary' - self.assertEqual(pref.document, - request.doc.get('$readPreference')) + actual_pref = request.doc.get('$readPreference') + if mode == 'primary': + self.assertIsNone(actual_pref) + else: + self.assertEqual(pref.document, actual_pref) else: self.fail('unrecognized op_type %r' % operation.op_type) diff --git a/test/mockupdb/test_op_msg_read_preference.py b/test/mockupdb/test_op_msg_read_preference.py index 6ecc229ea1..d9adfe17eb 100644 --- a/test/mockupdb/test_op_msg_read_preference.py +++ b/test/mockupdb/test_op_msg_read_preference.py @@ -148,24 +148,31 @@ def test(self): expected_pref = ReadPreference.SECONDARY elif operation.op_type == 'must-use-primary': expected_server = self.primary - expected_pref = ReadPreference.PRIMARY + expected_pref = None elif operation.op_type == 'may-use-secondary': - if mode in ('primary', 'primaryPreferred'): + if mode == 'primary': expected_server = self.primary + expected_pref = None + elif mode == 'primaryPreferred': + expected_server = self.primary + expected_pref = pref else: expected_server = self.secondary - expected_pref = pref + expected_pref = pref else: self.fail('unrecognized op_type %r' % operation.op_type) - # For single mongod we send primaryPreferred instead of primary. - if expected_pref == ReadPreference.PRIMARY and self.single_mongod: - expected_pref = ReadPreference.PRIMARY_PREFERRED + # For single mongod we omit the read preference. + if self.single_mongod: + expected_pref = None with going(operation.function, client): request = expected_server.receive() request.reply(operation.reply) - self.assertEqual(expected_pref.document, - request.doc.get('$readPreference')) + actual_pref = request.doc.get('$readPreference') + if expected_pref: + self.assertEqual(expected_pref.document, actual_pref) + else: + self.assertIsNone(actual_pref) self.assertNotIn('$query', request.doc) return test diff --git a/test/mockupdb/test_query_read_pref_sharded.py b/test/mockupdb/test_query_read_pref_sharded.py index 21813f7b8e..88dcdd8351 100644 --- a/test/mockupdb/test_query_read_pref_sharded.py +++ b/test/mockupdb/test_query_read_pref_sharded.py @@ -47,17 +47,18 @@ def test_query_and_read_mode_sharded_op_msg(self): SecondaryPreferred([{'tag': 'value'}]),) for query in ({'a': 1}, {'$query': {'a': 1}},): - for mode in read_prefs: + for pref in read_prefs: collection = client.db.get_collection('test', - read_preference=mode) + read_preference=pref) cursor = collection.find(query.copy()) with going(next, cursor): request = server.receives() # Command is not nested in $query. - request.assert_matches(OpMsg( - SON([('find', 'test'), - ('filter', {'a': 1}), - ('$readPreference', mode.document)]))) + expected_cmd = SON([('find', 'test'), + ('filter', {'a': 1})]) + if pref.mode: + expected_cmd['$readPreference'] = pref.document + request.assert_matches(OpMsg(expected_cmd)) request.replies({'cursor': {'id': 0, 'firstBatch': [{}]}}) diff --git a/test/test_cursor.py b/test/test_cursor.py index 8c27544b80..8bea12228d 100644 --- a/test/test_cursor.py +++ b/test/test_cursor.py @@ -36,6 +36,7 @@ InvalidOperation, OperationFailure) from pymongo.read_concern import ReadConcern +from pymongo.read_preferences import ReadPreference from pymongo.write_concern import WriteConcern from test import (client_context, unittest, @@ -1257,7 +1258,9 @@ def test_getMore_does_not_send_readPreference(self): client = rs_or_single_client( event_listeners=[listener]) self.addCleanup(client.close) - coll = client[self.db.name].test + # We never send primary read preference so override the default. + coll = client[self.db.name].get_collection( + 'test', read_preference=ReadPreference.PRIMARY_PREFERRED) coll.delete_many({}) coll.insert_many([{} for _ in range(5)]) @@ -1267,7 +1270,10 @@ def test_getMore_does_not_send_readPreference(self): started = listener.results['started'] self.assertEqual(2, len(started)) self.assertEqual('find', started[0].command_name) - self.assertIn('$readPreference', started[0].command) + if client_context.is_rs or client_context.is_mongos: + self.assertIn('$readPreference', started[0].command) + else: + self.assertNotIn('$readPreference', started[0].command) self.assertEqual('getMore', started[1].command_name) self.assertNotIn('$readPreference', started[1].command) diff --git a/test/test_read_preferences.py b/test/test_read_preferences.py index bbc89b9d14..a63df72545 100644 --- a/test/test_read_preferences.py +++ b/test/test_read_preferences.py @@ -309,17 +309,17 @@ def __init__(self, *args, **kwargs): def _socket_for_reads(self, read_preference, session): context = super(ReadPrefTester, self)._socket_for_reads( read_preference, session) - with context as (sock_info, secondary_ok): + with context as (sock_info, read_preference): self.record_a_read(sock_info.address) - yield sock_info, secondary_ok + yield sock_info, read_preference @contextlib.contextmanager - def _secondaryok_for_server(self, read_preference, server, session): - context = super(ReadPrefTester, self)._secondaryok_for_server( + def _socket_from_server(self, read_preference, server, session): + context = super(ReadPrefTester, self)._socket_from_server( read_preference, server, session) - with context as (sock_info, secondary_ok): + with context as (sock_info, read_preference): self.record_a_read(sock_info.address) - yield sock_info, secondary_ok + yield sock_info, read_preference def record_a_read(self, address): server = self._get_topology().select_server_by_address(address, 0) @@ -597,8 +597,11 @@ def test_send_hedge(self): started = listener.results['started'] self.assertEqual(len(started), 1, started) cmd = started[0].command - self.assertIn('$readPreference', cmd) - self.assertEqual(cmd['$readPreference'], pref.document) + if client_context.is_rs or client_context.is_mongos: + self.assertIn('$readPreference', cmd) + self.assertEqual(cmd['$readPreference'], pref.document) + else: + self.assertNotIn('$readPreference', cmd) def test_maybe_add_read_preference(self):