From 3f4372c0bc4db1bded7f82da5d6d60a7b0f9b1b4 Mon Sep 17 00:00:00 2001 From: julius Date: Thu, 27 Jan 2022 20:38:35 -0800 Subject: [PATCH 01/18] first commit, lots of bugs --- pymongo/pool.py | 3 +- test/mockupdb/test_handshake.py | 49 ++++++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/pymongo/pool.py b/pymongo/pool.py index a0868c9916..39b5134f06 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -552,7 +552,8 @@ def unpin(self): def hello_cmd(self): # Handshake spec requires us to use OP_MSG+hello command for the # initial handshake in load balanced or versioned api mode. - if self.opts.server_api or self.hello_ok or self.opts.load_balanced: + if (self.opts.server_api or self.hello_ok or self.opts.load_balanced + or self.max_wire_version >= 6): self.op_msg_enabled = True return SON([(HelloCompat.CMD, 1)]) else: diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 34028a637f..6899c2c937 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -16,13 +16,14 @@ Command, go) from pymongo import MongoClient, version as pymongo_version -from pymongo.errors import OperationFailure +from pymongo.errors import OperationFailure, AutoReconnect from pymongo.server_api import ServerApi, ServerApiVersion from bson.objectid import ObjectId import unittest + def test_hello_with_option(self, protocol, **kwargs): hello = "ismaster" if isinstance(protocol(), OpQuery) else "hello" # `db.command("hello"|"ismaster")` commands are the same for primaries and @@ -217,5 +218,51 @@ def test_handshake_not_either(self): with self.assertRaisesRegex(AssertionError, "does not match"): test_hello_with_option(self, OpMsg) + def test_handshake_max_wire(self): + server = MockupDB() + server.run() + self.addCleanup(server.stop) + client = MongoClient(server.uri, + username='username', + password='password', + appname='my app', + heartBeatFrequencyMS=500, + ) + primary_response = OpMsgReply(hello=1, + minWireVersion=0, maxWireVersion=6) + + # New monitoring sockets send data during handshake. + heartbeat = server.receives(Command('ismaster')) + heartbeat.ok(primary_response) + + future = go(client.db.command, 'whatever') + for request in server: + if request.matches('hello'): + if request.client_port == heartbeat.client_port: + # This is the monitor again, keep going. + print("got the monitor") + request.ok(primary_response) + elif request.matches(OpMsg({})): + # Handshaking a new application socket should send + # saslSupportedMechs and speculativeAuthenticate. + self.assertEqual(request['saslSupportedMechs'], + 'admin.username') + self.assertIn( + 'saslStart', request['speculativeAuthenticate']) + auth = {'conversationId': 1, 'done': False, + 'payload': b'r=wPleNM8S5p8gMaffMDF7Py4ru9bnmmoqb0' + b'1WNPsil6o=pAvr6B1garhlwc6MKNQ93ZfFky' + b'tXdF9r,s=4dcxugMJq2P4hQaDbGXZR8uR3ei' + b'PHrSmh4uhkg==,i=15000'} + request.ok('ismaster', True, + saslSupportedMechs=['SCRAM-SHA-256'], + speculativeAuthenticate=auth, + minWireVersion=2, maxWireVersion=6) + # Authentication should immediately fail with: + # OperationFailure: Server returned an invalid nonce. + with self.assertRaises(OperationFailure): + future() + return + if __name__ == '__main__': unittest.main() From 7d9dbed3ad9e59ebc608f2a93f755dcec5b1a145 Mon Sep 17 00:00:00 2001 From: julius Date: Fri, 28 Jan 2022 12:20:32 -0800 Subject: [PATCH 02/18] the main test working, but now stuff with the heartbeat is messed up --- .gitignore | 2 +- pymongo/pool.py | 1 - test/mockupdb/test_handshake.py | 104 +++++++++++++++----------------- 3 files changed, 51 insertions(+), 56 deletions(-) diff --git a/.gitignore b/.gitignore index de435d109e..696d9f2062 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,4 @@ pymongo.egg-info/ *.egg .tox mongocryptd.pid -.idea/ +.idea/ \ No newline at end of file diff --git a/pymongo/pool.py b/pymongo/pool.py index 39b5134f06..dcb7b8a9f6 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -691,7 +691,6 @@ def command(self, dbname, spec, # Ensure command name remains in first place. if not isinstance(spec, ORDERED_TYPES): spec = SON(spec) - if not (write_concern is None or write_concern.acknowledged or collation is None): raise ConfigurationError( diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 6899c2c937..e5418ced25 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -94,7 +94,7 @@ def test_client_handshake_data(self): secondary_response = OpReply('ismaster', False, setName='rs', hosts=hosts, secondary=True, - minWireVersion=2, maxWireVersion=6) + minWireVersion=2, maxWireVersion=5) client = MongoClient(primary.uri, replicaSet='rs', @@ -104,57 +104,67 @@ def test_client_handshake_data(self): self.addCleanup(client.close) # New monitoring sockets send data during handshake. - heartbeat = primary.receives('ismaster') + heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) heartbeat.ok(primary_response) - heartbeat = secondary.receives('ismaster') + heartbeat = secondary.receives(Command('ismaster')) _check_handshake_data(heartbeat) heartbeat.ok(secondary_response) # Subsequent heartbeats have no client data. - primary.receives('ismaster', 1, client=absent).ok(error_response) - secondary.receives('ismaster', 1, client=absent).ok(error_response) + primary.receives(OpMsg('hello', 1, client=absent)).ok(error_response) + secondary.receives(OpMsg('ismaster', 1, client=absent)).ok( + error_response) # The heartbeat retry (on a new connection) does have client data. - heartbeat = primary.receives('ismaster') + heartbeat = secondary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(primary_response) + heartbeat.ok(secondary_response) - heartbeat = secondary.receives('ismaster') + heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(secondary_response) + heartbeat.ok(primary_response) + + # Still no client data. - primary.receives('ismaster', 1, client=absent).ok(primary_response) - secondary.receives('ismaster', 1, client=absent).ok(secondary_response) + primary.receives(OpMsg('hello', 1, client=absent)).ok(primary_response) + secondary.receives(Command('ismaster', 1, client=absent)).ok( + secondary_response) # After a disconnect, next ismaster has client data again. - primary.receives('ismaster', 1, client=absent).hangup() + primary.receives('hello', 1, client=absent).hangup() heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) heartbeat.ok(primary_response) - secondary.autoresponds('ismaster', secondary_response) + secondary.autoresponds('hello', secondary_response) # Start a command, so the client opens an application socket. future = go(client.db.command, 'whatever') for request in primary: - if request.matches(Command('ismaster')): + if request.matches(OpMsg('ismaster')): if request.client_port == heartbeat.client_port: + print("found monitor") # This is the monitor again, keep going. request.ok(primary_response) else: # Handshaking a new application socket. + print("handshaking socket") _check_handshake_data(request) request.ok(primary_response) - else: + elif request.matches(OpMsg('whatever')): # Command succeeds. - request.assert_matches(OpMsg('whatever')) request.ok() assert future() - return + else: + request.ok(primary_response) + is_success = lambda f: (f.done() and not f.cancelled() and + not f.exception) + print(is_success(future)) + assert is_success(future) def test_client_handshake_saslSupportedMechs(self): server = MockupDB() @@ -220,49 +230,35 @@ def test_handshake_not_either(self): def test_handshake_max_wire(self): server = MockupDB() - server.run() + primary_response = {"hello":1, "ok":1, + "minWireVersion":0, "maxWireVersion":6} + self.found_auth_msg = False + def responder(request): + if request.matches(OpMsg, saslStart=1): + self.found_auth_msg = True + request.reply(OpMsgReply(**primary_response, + **{'payload': + b'r=wPleNM8S5p8gMaffMDF7Py4ru9bnmmoqb0' + b'1WNPsil6o=pAvr6B1garhlwc6MKNQ93ZfFky' + b'tXdF9r,' + b's=4dcxugMJq2P4hQaDbGXZR8uR3ei' + b'PHrSmh4uhkg==,i=15000', + "saslSupportedMechs": ["SCRAM-SHA-1"]})) + else: + return request.reply(OpMsgReply(**primary_response)) + + server.autoresponds(responder) self.addCleanup(server.stop) + server.run() client = MongoClient(server.uri, username='username', password='password', appname='my app', - heartBeatFrequencyMS=500, ) - primary_response = OpMsgReply(hello=1, - minWireVersion=0, maxWireVersion=6) - - # New monitoring sockets send data during handshake. - heartbeat = server.receives(Command('ismaster')) - heartbeat.ok(primary_response) - - future = go(client.db.command, 'whatever') - for request in server: - if request.matches('hello'): - if request.client_port == heartbeat.client_port: - # This is the monitor again, keep going. - print("got the monitor") - request.ok(primary_response) - elif request.matches(OpMsg({})): - # Handshaking a new application socket should send - # saslSupportedMechs and speculativeAuthenticate. - self.assertEqual(request['saslSupportedMechs'], - 'admin.username') - self.assertIn( - 'saslStart', request['speculativeAuthenticate']) - auth = {'conversationId': 1, 'done': False, - 'payload': b'r=wPleNM8S5p8gMaffMDF7Py4ru9bnmmoqb0' - b'1WNPsil6o=pAvr6B1garhlwc6MKNQ93ZfFky' - b'tXdF9r,s=4dcxugMJq2P4hQaDbGXZR8uR3ei' - b'PHrSmh4uhkg==,i=15000'} - request.ok('ismaster', True, - saslSupportedMechs=['SCRAM-SHA-256'], - speculativeAuthenticate=auth, - minWireVersion=2, maxWireVersion=6) - # Authentication should immediately fail with: - # OperationFailure: Server returned an invalid nonce. - with self.assertRaises(OperationFailure): - future() - return + self.addCleanup(client.close) + self.assertRaises(OperationFailure, client.db.collection.find_one, {"a":1}) + assert self.found_auth_msg, """Could not find authentication command + with correct protocol""" if __name__ == '__main__': unittest.main() From 6c20dd55f527caae808bcb1665dc1c19c6b45d27 Mon Sep 17 00:00:00 2001 From: julius Date: Fri, 28 Jan 2022 15:39:20 -0800 Subject: [PATCH 03/18] still not sending anything on application socket, not sure why --- pymongo/pool.py | 2 +- test/mockupdb/test_handshake.py | 77 +++++++++++++++++---------------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/pymongo/pool.py b/pymongo/pool.py index dcb7b8a9f6..a93b65479a 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -553,7 +553,7 @@ def hello_cmd(self): # Handshake spec requires us to use OP_MSG+hello command for the # initial handshake in load balanced or versioned api mode. if (self.opts.server_api or self.hello_ok or self.opts.load_balanced - or self.max_wire_version >= 6): + or self.max_wire_version >= 6): self.op_msg_enabled = True return SON([(HelloCompat.CMD, 1)]) else: diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index e5418ced25..a10ebba575 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -21,7 +21,7 @@ from bson.objectid import ObjectId import unittest - +from copy import deepcopy def test_hello_with_option(self, protocol, **kwargs): @@ -85,16 +85,16 @@ def test_client_handshake_data(self): self.addCleanup(server.stop) hosts = [server.address_string for server in (primary, secondary)] - primary_response = OpReply('ismaster', True, - setName='rs', hosts=hosts, - minWireVersion=2, maxWireVersion=6) - error_response = OpReply( + primary_response = {'hello': 1, + "setName":'rs', "hosts":hosts, + "minWireVersion":2, "maxWireVersion":13} + error_response = OpMsgReply( 0, errmsg='Cache Reader No keys found for HMAC ...', code=211) - secondary_response = OpReply('ismaster', False, - setName='rs', hosts=hosts, - secondary=True, - minWireVersion=2, maxWireVersion=5) + secondary_response = {'hello': 1, + "setName":'rs', "hosts":hosts, + "secondary": True, + "minWireVersion":2, "maxWireVersion":13} client = MongoClient(primary.uri, replicaSet='rs', @@ -106,65 +106,67 @@ def test_client_handshake_data(self): # New monitoring sockets send data during handshake. heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(primary_response) + heartbeat.ok(**primary_response) heartbeat = secondary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(secondary_response) + heartbeat.ok(**secondary_response) # Subsequent heartbeats have no client data. primary.receives(OpMsg('hello', 1, client=absent)).ok(error_response) - secondary.receives(OpMsg('ismaster', 1, client=absent)).ok( + secondary.receives(OpMsg('hello', 1, client=absent)).ok( error_response) # The heartbeat retry (on a new connection) does have client data. - heartbeat = secondary.receives(Command('ismaster')) - _check_handshake_data(heartbeat) - heartbeat.ok(secondary_response) - heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(primary_response) + heartbeat.ok(**primary_response) + heartbeat = secondary.receives(Command('ismaster')) + _check_handshake_data(heartbeat) + heartbeat.ok(**secondary_response) # Still no client data. - primary.receives(OpMsg('hello', 1, client=absent)).ok(primary_response) - secondary.receives(Command('ismaster', 1, client=absent)).ok( - secondary_response) + primary.receives(OpMsg('hello', 1, client=absent)).ok(**primary_response) + secondary.receives(OpMsg('hello', 1, client=absent)).ok( + **secondary_response) # After a disconnect, next ismaster has client data again. primary.receives('hello', 1, client=absent).hangup() heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) - heartbeat.ok(primary_response) + heartbeat.ok(**primary_response) - secondary.autoresponds('hello', secondary_response) + secondary.autoresponds('ismaster', **secondary_response) + secondary.autoresponds('hello', **secondary_response) # Start a command, so the client opens an application socket. - future = go(client.db.command, 'whatever') + future = go(client.db.collection.find_one, {"_id":1}) for request in primary: - if request.matches(OpMsg('ismaster')): + print(request) + if request.matches(OpMsg('hello')): if request.client_port == heartbeat.client_port: - print("found monitor") # This is the monitor again, keep going. - request.ok(primary_response) + print("Monitor message:", type(request), request) + request.ok(**primary_response) else: - # Handshaking a new application socket. - print("handshaking socket") - _check_handshake_data(request) - request.ok(primary_response) - elif request.matches(OpMsg('whatever')): + print("Found an op_msg hello") + with self.assertRaises(AssertionError): + _check_handshake_data(request) + request.ok() + elif request.matches(Command('ismaster')): + print(request) + # Handshaking a new application socket. + _check_handshake_data(request) + request.ok(**primary_response) + else: # Command succeeds. + request.assert_matches(OpMsg('whatever')) request.ok() assert future() - else: - request.ok(primary_response) - is_success = lambda f: (f.done() and not f.cancelled() and - not f.exception) - print(is_success(future)) - assert is_success(future) + return def test_client_handshake_saslSupportedMechs(self): server = MockupDB() @@ -236,6 +238,7 @@ def test_handshake_max_wire(self): def responder(request): if request.matches(OpMsg, saslStart=1): self.found_auth_msg = True + # Immediately closes the connection with request.reply(OpMsgReply(**primary_response, **{'payload': b'r=wPleNM8S5p8gMaffMDF7Py4ru9bnmmoqb0' From feb7f408788178b8b6c17d8a59c54305c7a706d1 Mon Sep 17 00:00:00 2001 From: julius Date: Fri, 28 Jan 2022 15:51:02 -0800 Subject: [PATCH 04/18] add some debugging, still isn't producing any non monitor messages --- test/mockupdb/test_handshake.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index a10ebba575..513da05fe4 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -87,6 +87,7 @@ def test_client_handshake_data(self): hosts = [server.address_string for server in (primary, secondary)] primary_response = {'hello': 1, "setName":'rs', "hosts":hosts, + "secondary": True, "minWireVersion":2, "maxWireVersion":13} error_response = OpMsgReply( 0, errmsg='Cache Reader No keys found for HMAC ...', code=211) @@ -136,37 +137,40 @@ def test_client_handshake_data(self): primary.receives('hello', 1, client=absent).hangup() heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) + heartbeat_client_port = deepcopy(heartbeat.client_port) heartbeat.ok(**primary_response) secondary.autoresponds('ismaster', **secondary_response) secondary.autoresponds('hello', **secondary_response) # Start a command, so the client opens an application socket. - future = go(client.db.collection.find_one, {"_id":1}) + future = go(client.db.command, "whatever") for request in primary: - print(request) + print("Message from port", request.client_port, ":", type(request), + request) if request.matches(OpMsg('hello')): if request.client_port == heartbeat.client_port: # This is the monitor again, keep going. - print("Monitor message:", type(request), request) request.ok(**primary_response) else: print("Found an op_msg hello") with self.assertRaises(AssertionError): _check_handshake_data(request) - request.ok() + request.ok(**primary_response) elif request.matches(Command('ismaster')): print(request) # Handshaking a new application socket. _check_handshake_data(request) request.ok(**primary_response) - else: + elif request.matches(OpMsg("whatever")): # Command succeeds. request.assert_matches(OpMsg('whatever')) - request.ok() + request.ok(**primary_response) assert future() return + else: + request.ok(**primary_response) def test_client_handshake_saslSupportedMechs(self): server = MockupDB() From 61d2b19c241d18e93ee3c8eef288fa08c9a9429e Mon Sep 17 00:00:00 2001 From: julius Date: Mon, 31 Jan 2022 12:32:23 -0800 Subject: [PATCH 05/18] fixed up a few things, still not working --- test/mockupdb/test_handshake.py | 74 ++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 513da05fe4..2a25f7e0b2 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -31,6 +31,7 @@ def test_hello_with_option(self, protocol, **kwargs): primary = MockupDB() # Set up a custom handler to save the first request from the driver. self.handshake_req = None + def respond(r): # Only save the very first request from the driver. if self.handshake_req == None: @@ -39,19 +40,20 @@ def respond(r): "loadBalanced") else {} return r.reply(OpMsgReply(minWireVersion=0, maxWireVersion=13, **kwargs, **load_balanced_kwargs)) + primary.autoresponds(respond) primary.run() self.addCleanup(primary.stop) # We need a special dict because MongoClient uses "server_api" and all # of the commands use "apiVersion". - k_map = {("apiVersion", "1"):("server_api", ServerApi( - ServerApiVersion.V1))} - client = MongoClient("mongodb://"+primary.address_string, - appname='my app', # For _check_handshake_data() + k_map = {("apiVersion", "1"): ("server_api", ServerApi( + ServerApiVersion.V1))} + client = MongoClient("mongodb://" + primary.address_string, + appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) - + self.addCleanup(client.close) # We have an autoresponder luckily, so no need for `go()`. @@ -86,16 +88,16 @@ def test_client_handshake_data(self): hosts = [server.address_string for server in (primary, secondary)] primary_response = {'hello': 1, - "setName":'rs', "hosts":hosts, - "secondary": True, - "minWireVersion":2, "maxWireVersion":13} + "setName": 'rs', "hosts": hosts, + "secondary": True, + "minWireVersion": 2, "maxWireVersion": 13} error_response = OpMsgReply( 0, errmsg='Cache Reader No keys found for HMAC ...', code=211) secondary_response = {'hello': 1, - "setName":'rs', "hosts":hosts, - "secondary": True, - "minWireVersion":2, "maxWireVersion":13} + "setName": 'rs', "hosts": hosts, + "secondary": True, + "minWireVersion": 2, "maxWireVersion": 13} client = MongoClient(primary.uri, replicaSet='rs', @@ -127,17 +129,16 @@ def test_client_handshake_data(self): _check_handshake_data(heartbeat) heartbeat.ok(**secondary_response) - # Still no client data. - primary.receives(OpMsg('hello', 1, client=absent)).ok(**primary_response) + primary.receives(OpMsg('hello', 1, client=absent)).ok( + **primary_response) secondary.receives(OpMsg('hello', 1, client=absent)).ok( - **secondary_response) + **secondary_response) # After a disconnect, next ismaster has client data again. primary.receives('hello', 1, client=absent).hangup() heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) - heartbeat_client_port = deepcopy(heartbeat.client_port) heartbeat.ok(**primary_response) secondary.autoresponds('ismaster', **secondary_response) @@ -152,25 +153,26 @@ def test_client_handshake_data(self): if request.matches(OpMsg('hello')): if request.client_port == heartbeat.client_port: # This is the monitor again, keep going. - request.ok(**primary_response) + request.ok(OpMsgReply(**primary_response)) else: print("Found an op_msg hello") with self.assertRaises(AssertionError): _check_handshake_data(request) - request.ok(**primary_response) + request.ok(OpMsgReply(**primary_response)) elif request.matches(Command('ismaster')): print(request) + print("found a new application socket") # Handshaking a new application socket. _check_handshake_data(request) - request.ok(**primary_response) + request.ok(OpMsgReply(**primary_response)) elif request.matches(OpMsg("whatever")): # Command succeeds. request.assert_matches(OpMsg('whatever')) - request.ok(**primary_response) + request.ok(OpMsgReply(**primary_response)) assert future() return else: - request.ok(**primary_response) + request.ok(OpMsgReply(**primary_response)) def test_client_handshake_saslSupportedMechs(self): server = MockupDB() @@ -236,36 +238,40 @@ def test_handshake_not_either(self): def test_handshake_max_wire(self): server = MockupDB() - primary_response = {"hello":1, "ok":1, - "minWireVersion":0, "maxWireVersion":6} + primary_response = {"hello": 1, "ok": 1, + "minWireVersion": 0, "maxWireVersion": 6} self.found_auth_msg = False + def responder(request): if request.matches(OpMsg, saslStart=1): self.found_auth_msg = True # Immediately closes the connection with request.reply(OpMsgReply(**primary_response, **{'payload': - b'r=wPleNM8S5p8gMaffMDF7Py4ru9bnmmoqb0' - b'1WNPsil6o=pAvr6B1garhlwc6MKNQ93ZfFky' - b'tXdF9r,' - b's=4dcxugMJq2P4hQaDbGXZR8uR3ei' - b'PHrSmh4uhkg==,i=15000', - "saslSupportedMechs": ["SCRAM-SHA-1"]})) + b'r=wPleNM8S5p8gMaffMDF7Py4ru9bnmmoqb0' + b'1WNPsil6o=pAvr6B1garhlwc6MKNQ93ZfFky' + b'tXdF9r,' + b's=4dcxugMJq2P4hQaDbGXZR8uR3ei' + b'PHrSmh4uhkg==,i=15000', + "saslSupportedMechs": [ + "SCRAM-SHA-1"]})) else: return request.reply(OpMsgReply(**primary_response)) server.autoresponds(responder) self.addCleanup(server.stop) server.run() - client = MongoClient(server.uri, - username='username', - password='password', - appname='my app', - ) + client = MongoClient(server.uri, + username='username', + password='password', + appname='my app', + ) self.addCleanup(client.close) - self.assertRaises(OperationFailure, client.db.collection.find_one, {"a":1}) + self.assertRaises(OperationFailure, client.db.collection.find_one, + {"a": 1}) assert self.found_auth_msg, """Could not find authentication command with correct protocol""" + if __name__ == '__main__': unittest.main() From 88e41007a89b779a0f9e2a58ca74030589d2d2cc Mon Sep 17 00:00:00 2001 From: julius Date: Mon, 31 Jan 2022 12:56:19 -0800 Subject: [PATCH 06/18] got it working, removed comments --- test/mockupdb/test_handshake.py | 55 ++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 2a25f7e0b2..e64bdabdc0 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -88,6 +88,7 @@ def test_client_handshake_data(self): hosts = [server.address_string for server in (primary, secondary)] primary_response = {'hello': 1, + 'ok': 1, "setName": 'rs', "hosts": hosts, "secondary": True, "minWireVersion": 2, "maxWireVersion": 13} @@ -95,6 +96,7 @@ def test_client_handshake_data(self): 0, errmsg='Cache Reader No keys found for HMAC ...', code=211) secondary_response = {'hello': 1, + 'ok': 1, "setName": 'rs', "hosts": hosts, "secondary": True, "minWireVersion": 2, "maxWireVersion": 13} @@ -109,11 +111,11 @@ def test_client_handshake_data(self): # New monitoring sockets send data during handshake. heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(**primary_response) + heartbeat.ok(OpMsgReply(**primary_response)) heartbeat = secondary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(**secondary_response) + heartbeat.ok(OpMsgReply(**secondary_response)) # Subsequent heartbeats have no client data. primary.receives(OpMsg('hello', 1, client=absent)).ok(error_response) @@ -123,56 +125,59 @@ def test_client_handshake_data(self): # The heartbeat retry (on a new connection) does have client data. heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(**primary_response) + heartbeat.reply(OpMsgReply(**primary_response)) heartbeat = secondary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(**secondary_response) + heartbeat.reply(OpMsgReply(**secondary_response)) # Still no client data. - primary.receives(OpMsg('hello', 1, client=absent)).ok( - **primary_response) - secondary.receives(OpMsg('hello', 1, client=absent)).ok( - **secondary_response) + primary.receives(OpMsg('hello', 1, client=absent)).reply( + OpMsgReply(**primary_response)) + secondary.receives(OpMsg('hello', 1, client=absent)).reply( + OpMsgReply(**secondary_response)) # After a disconnect, next ismaster has client data again. primary.receives('hello', 1, client=absent).hangup() heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) - heartbeat.ok(**primary_response) - - secondary.autoresponds('ismaster', **secondary_response) - secondary.autoresponds('hello', **secondary_response) - + hb_port = deepcopy(heartbeat.client_port) + heartbeat.reply(OpMsgReply(**primary_response)) + primary.receives(OpMsg('hello')).hangup() + secondary.receives(OpMsg('hello')).hangup() + secondary.autoresponds('ismaster', OpMsgReply(**secondary_response)) + secondary.autoresponds('hello', OpMsgReply(**secondary_response)) + secondary.autoresponds('whatever', OpMsgReply(**secondary_response)) # Start a command, so the client opens an application socket. future = go(client.db.command, "whatever") - + message_counter = 0 for request in primary: - print("Message from port", request.client_port, ":", type(request), - request) if request.matches(OpMsg('hello')): - if request.client_port == heartbeat.client_port: + if request.client_port == hb_port: # This is the monitor again, keep going. - request.ok(OpMsgReply(**primary_response)) + request.reply(OpMsgReply(**primary_response)) else: - print("Found an op_msg hello") + # Subsequent hellos do not have client data + message_counter += 1 with self.assertRaises(AssertionError): _check_handshake_data(request) - request.ok(OpMsgReply(**primary_response)) + request.reply(OpMsgReply(isWritablePrimary=True, + **primary_response)) elif request.matches(Command('ismaster')): - print(request) - print("found a new application socket") + message_counter += 1 # Handshaking a new application socket. _check_handshake_data(request) - request.ok(OpMsgReply(**primary_response)) + request.reply(OpMsgReply(**primary_response)) elif request.matches(OpMsg("whatever")): + message_counter +=1 # Command succeeds. request.assert_matches(OpMsg('whatever')) - request.ok(OpMsgReply(**primary_response)) + request.reply(OpMsgReply(**primary_response)) assert future() return else: - request.ok(OpMsgReply(**primary_response)) + request.reply(OpMsgReply(**primary_response)) + assert message_counter == 3 def test_client_handshake_saslSupportedMechs(self): server = MockupDB() From 07457644499cf7bc3177a927cd3d8a759fb1fb23 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 12:29:05 -0800 Subject: [PATCH 07/18] fixed up test a bit --- test/mockupdb/test_handshake.py | 39 ++++++++++++--------------------- 1 file changed, 14 insertions(+), 25 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index e64bdabdc0..9959155c96 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -90,7 +90,7 @@ def test_client_handshake_data(self): primary_response = {'hello': 1, 'ok': 1, "setName": 'rs', "hosts": hosts, - "secondary": True, + "secondary": False, "minWireVersion": 2, "maxWireVersion": 13} error_response = OpMsgReply( 0, errmsg='Cache Reader No keys found for HMAC ...', code=211) @@ -137,47 +137,36 @@ def test_client_handshake_data(self): secondary.receives(OpMsg('hello', 1, client=absent)).reply( OpMsgReply(**secondary_response)) + primary.receives(OpMsg('hello', 1, client=absent)).hangup() + secondary.receives(OpMsg('hello')).hangup() # After a disconnect, next ismaster has client data again. - primary.receives('hello', 1, client=absent).hangup() heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) hb_port = deepcopy(heartbeat.client_port) heartbeat.reply(OpMsgReply(**primary_response)) - primary.receives(OpMsg('hello')).hangup() - secondary.receives(OpMsg('hello')).hangup() - secondary.autoresponds('ismaster', OpMsgReply(**secondary_response)) - secondary.autoresponds('hello', OpMsgReply(**secondary_response)) - secondary.autoresponds('whatever', OpMsgReply(**secondary_response)) + # Start a command, so the client opens an application socket. future = go(client.db.command, "whatever") - message_counter = 0 + handshook = None for request in primary: - if request.matches(OpMsg('hello')): - if request.client_port == hb_port: - # This is the monitor again, keep going. - request.reply(OpMsgReply(**primary_response)) + if request.matches(Command('ismaster')): + if handshook is None: + handshook = True else: - # Subsequent hellos do not have client data - message_counter += 1 - with self.assertRaises(AssertionError): - _check_handshake_data(request) - request.reply(OpMsgReply(isWritablePrimary=True, - **primary_response)) - elif request.matches(Command('ismaster')): - message_counter += 1 + handshook = False # Handshaking a new application socket. _check_handshake_data(request) request.reply(OpMsgReply(**primary_response)) elif request.matches(OpMsg("whatever")): - message_counter +=1 # Command succeeds. - request.assert_matches(OpMsg('whatever')) request.reply(OpMsgReply(**primary_response)) - assert future() + # If handshook is false it means it tried to handshake + # multiple times. If it's None it means it never tried. + assert future() and handshook return else: - request.reply(OpMsgReply(**primary_response)) - assert message_counter == 3 + request.reply(OpMsgReply(isWritablePrimary=True, + **primary_response)) def test_client_handshake_saslSupportedMechs(self): server = MockupDB() From 69a5e34ee6dec4a5d5b7a967e43d7e54ff0b48d0 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 12:35:02 -0800 Subject: [PATCH 08/18] fix whitespace and erroneous imports --- .gitignore | 2 +- pymongo/pool.py | 1 + test/mockupdb/test_handshake.py | 7 +++---- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 696d9f2062..de435d109e 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,4 @@ pymongo.egg-info/ *.egg .tox mongocryptd.pid -.idea/ \ No newline at end of file +.idea/ diff --git a/pymongo/pool.py b/pymongo/pool.py index a93b65479a..d07d2eaaa3 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -691,6 +691,7 @@ def command(self, dbname, spec, # Ensure command name remains in first place. if not isinstance(spec, ORDERED_TYPES): spec = SON(spec) + if not (write_concern is None or write_concern.acknowledged or collation is None): raise ConfigurationError( diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 9959155c96..d438c0ac41 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -16,7 +16,7 @@ Command, go) from pymongo import MongoClient, version as pymongo_version -from pymongo.errors import OperationFailure, AutoReconnect +from pymongo.errors import OperationFailure from pymongo.server_api import ServerApi, ServerApiVersion from bson.objectid import ObjectId @@ -47,10 +47,9 @@ def respond(r): # We need a special dict because MongoClient uses "server_api" and all # of the commands use "apiVersion". - k_map = {("apiVersion", "1"): ("server_api", ServerApi( - ServerApiVersion.V1))} + k_map = {("apiVersion", "1"): ("server_api", ServerApi(ServerApiVersion.V1))} client = MongoClient("mongodb://" + primary.address_string, - appname='my app', # For _check_handshake_data() + appname='my app',# For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) From 29579d27af80af35081f369ab99f799e270fb0e4 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 12:36:57 -0800 Subject: [PATCH 09/18] actually fix whitespace errors --- test/mockupdb/test_handshake.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index d438c0ac41..ddbc7b9d29 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -47,9 +47,10 @@ def respond(r): # We need a special dict because MongoClient uses "server_api" and all # of the commands use "apiVersion". - k_map = {("apiVersion", "1"): ("server_api", ServerApi(ServerApiVersion.V1))} + k_map = {("apiVersion", "1"): ("server_api", ServerApi( + ServerApiVersion.V1))} client = MongoClient("mongodb://" + primary.address_string, - appname='my app',# For _check_handshake_data() + appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) From 9f0a2f013fa863bf42d999100c416d2880dffece Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 12:38:21 -0800 Subject: [PATCH 10/18] more whitespace... --- pymongo/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymongo/pool.py b/pymongo/pool.py index d07d2eaaa3..c497744b7e 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -691,7 +691,7 @@ def command(self, dbname, spec, # Ensure command name remains in first place. if not isinstance(spec, ORDERED_TYPES): spec = SON(spec) - + if not (write_concern is None or write_concern.acknowledged or collation is None): raise ConfigurationError( From b354d6bed637cdf864d6264c4d1075f65ef7cb7f Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 12:49:48 -0800 Subject: [PATCH 11/18] should simplify test_handshake_args --- test/mockupdb/test_handshake.py | 34 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index ddbc7b9d29..c07b2e57f3 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -48,7 +48,7 @@ def respond(r): # We need a special dict because MongoClient uses "server_api" and all # of the commands use "apiVersion". k_map = {("apiVersion", "1"): ("server_api", ServerApi( - ServerApiVersion.V1))} + ServerApiVersion.V1))} client = MongoClient("mongodb://" + primary.address_string, appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v @@ -87,18 +87,12 @@ def test_client_handshake_data(self): self.addCleanup(server.stop) hosts = [server.address_string for server in (primary, secondary)] - primary_response = {'hello': 1, - 'ok': 1, - "setName": 'rs', "hosts": hosts, - "secondary": False, + primary_response = {"setName": 'rs', "hosts": hosts, "minWireVersion": 2, "maxWireVersion": 13} error_response = OpMsgReply( 0, errmsg='Cache Reader No keys found for HMAC ...', code=211) - secondary_response = {'hello': 1, - 'ok': 1, - "setName": 'rs', "hosts": hosts, - "secondary": True, + secondary_response = {"setName": 'rs', "hosts": hosts, "minWireVersion": 2, "maxWireVersion": 13} client = MongoClient(primary.uri, @@ -111,11 +105,11 @@ def test_client_handshake_data(self): # New monitoring sockets send data during handshake. heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(OpMsgReply(**primary_response)) + heartbeat.ok(**primary_response) heartbeat = secondary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.ok(OpMsgReply(**secondary_response)) + heartbeat.ok(**secondary_response) # Subsequent heartbeats have no client data. primary.receives(OpMsg('hello', 1, client=absent)).ok(error_response) @@ -125,17 +119,15 @@ def test_client_handshake_data(self): # The heartbeat retry (on a new connection) does have client data. heartbeat = primary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.reply(OpMsgReply(**primary_response)) + heartbeat.reply(primary_response) heartbeat = secondary.receives(Command('ismaster')) _check_handshake_data(heartbeat) - heartbeat.reply(OpMsgReply(**secondary_response)) + heartbeat.reply(secondary_response) # Still no client data. - primary.receives(OpMsg('hello', 1, client=absent)).reply( - OpMsgReply(**primary_response)) - secondary.receives(OpMsg('hello', 1, client=absent)).reply( - OpMsgReply(**secondary_response)) + primary.receives(OpMsg('hello', 1, client=absent)).reply(**primary_response) + secondary.receives(OpMsg('hello', 1, client=absent)).reply(**secondary_response) primary.receives(OpMsg('hello', 1, client=absent)).hangup() secondary.receives(OpMsg('hello')).hangup() @@ -143,7 +135,7 @@ def test_client_handshake_data(self): heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) hb_port = deepcopy(heartbeat.client_port) - heartbeat.reply(OpMsgReply(**primary_response)) + heartbeat.reply(**primary_response) # Start a command, so the client opens an application socket. future = go(client.db.command, "whatever") @@ -156,7 +148,7 @@ def test_client_handshake_data(self): handshook = False # Handshaking a new application socket. _check_handshake_data(request) - request.reply(OpMsgReply(**primary_response)) + request.reply(**primary_response) elif request.matches(OpMsg("whatever")): # Command succeeds. request.reply(OpMsgReply(**primary_response)) @@ -165,8 +157,8 @@ def test_client_handshake_data(self): assert future() and handshook return else: - request.reply(OpMsgReply(isWritablePrimary=True, - **primary_response)) + request.reply(isWritablePrimary=True, + **primary_response) def test_client_handshake_saslSupportedMechs(self): server = MockupDB() From 18b126e1424b9e236c7aa8845fc228d9a7809dc3 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 12:57:15 -0800 Subject: [PATCH 12/18] actually fix spacing stuff --- test/mockupdb/test_handshake.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index c07b2e57f3..b3eb55c05d 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -31,7 +31,6 @@ def test_hello_with_option(self, protocol, **kwargs): primary = MockupDB() # Set up a custom handler to save the first request from the driver. self.handshake_req = None - def respond(r): # Only save the very first request from the driver. if self.handshake_req == None: @@ -40,7 +39,6 @@ def respond(r): "loadBalanced") else {} return r.reply(OpMsgReply(minWireVersion=0, maxWireVersion=13, **kwargs, **load_balanced_kwargs)) - primary.autoresponds(respond) primary.run() self.addCleanup(primary.stop) @@ -48,9 +46,9 @@ def respond(r): # We need a special dict because MongoClient uses "server_api" and all # of the commands use "apiVersion". k_map = {("apiVersion", "1"): ("server_api", ServerApi( - ServerApiVersion.V1))} - client = MongoClient("mongodb://" + primary.address_string, - appname='my app', # For _check_handshake_data() + ServerApiVersion.V1))} + client = MongoClient("mongodb://"+primary.address_string, + appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) @@ -99,7 +97,7 @@ def test_client_handshake_data(self): replicaSet='rs', appname='my app', heartbeatFrequencyMS=500) # Speed up the test. - + self.addCleanup(client.close) # New monitoring sockets send data during handshake. @@ -135,7 +133,7 @@ def test_client_handshake_data(self): heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) hb_port = deepcopy(heartbeat.client_port) - heartbeat.reply(**primary_response) + heartbeat.reply(primary_response) # Start a command, so the client opens an application socket. future = go(client.db.command, "whatever") From fcd048ec892a2cfa03582f5ba0f4d2f50d8f06e4 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 13:24:37 -0800 Subject: [PATCH 13/18] more whitespace fixes --- test/mockupdb/test_handshake.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index b3eb55c05d..003d6fd2d4 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -45,13 +45,13 @@ def respond(r): # We need a special dict because MongoClient uses "server_api" and all # of the commands use "apiVersion". - k_map = {("apiVersion", "1"): ("server_api", ServerApi( - ServerApiVersion.V1))} + k_map = {("apiVersion", "1"):("server_api", ServerApi( + ServerApiVersion.V1))} client = MongoClient("mongodb://"+primary.address_string, appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) - + self.addCleanup(client.close) # We have an autoresponder luckily, so no need for `go()`. @@ -97,7 +97,7 @@ def test_client_handshake_data(self): replicaSet='rs', appname='my app', heartbeatFrequencyMS=500) # Speed up the test. - + self.addCleanup(client.close) # New monitoring sockets send data during handshake. From 8ca1f78d5f288b4e3aeb71203d4313116687c15b Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 13:46:28 -0800 Subject: [PATCH 14/18] revert test_client_handshake_data and make fixes to other test --- pymongo/pool.py | 2 +- test/mockupdb/test_handshake.py | 83 ++++++++++++++++----------------- 2 files changed, 41 insertions(+), 44 deletions(-) diff --git a/pymongo/pool.py b/pymongo/pool.py index c497744b7e..584d7ecd52 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -553,7 +553,7 @@ def hello_cmd(self): # Handshake spec requires us to use OP_MSG+hello command for the # initial handshake in load balanced or versioned api mode. if (self.opts.server_api or self.hello_ok or self.opts.load_balanced - or self.max_wire_version >= 6): + ): self.op_msg_enabled = True return SON([(HelloCompat.CMD, 1)]) else: diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 003d6fd2d4..64dcf3a939 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -51,7 +51,7 @@ def respond(r): appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) - + self.addCleanup(client.close) # We have an autoresponder luckily, so no need for `go()`. @@ -85,13 +85,16 @@ def test_client_handshake_data(self): self.addCleanup(server.stop) hosts = [server.address_string for server in (primary, secondary)] - primary_response = {"setName": 'rs', "hosts": hosts, - "minWireVersion": 2, "maxWireVersion": 13} - error_response = OpMsgReply( + primary_response = OpReply('ismaster', True, + setName='rs', hosts=hosts, + minWireVersion=2, maxWireVersion=6) + error_response = OpReply( 0, errmsg='Cache Reader No keys found for HMAC ...', code=211) - secondary_response = {"setName": 'rs', "hosts": hosts, - "minWireVersion": 2, "maxWireVersion": 13} + secondary_response = OpReply('ismaster', False, + setName='rs', hosts=hosts, + secondary=True, + minWireVersion=2, maxWireVersion=6) client = MongoClient(primary.uri, replicaSet='rs', @@ -101,62 +104,57 @@ def test_client_handshake_data(self): self.addCleanup(client.close) # New monitoring sockets send data during handshake. - heartbeat = primary.receives(Command('ismaster')) + heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) - heartbeat.ok(**primary_response) + heartbeat.ok(primary_response) - heartbeat = secondary.receives(Command('ismaster')) + heartbeat = secondary.receives('ismaster') _check_handshake_data(heartbeat) - heartbeat.ok(**secondary_response) + heartbeat.ok(secondary_response) # Subsequent heartbeats have no client data. - primary.receives(OpMsg('hello', 1, client=absent)).ok(error_response) - secondary.receives(OpMsg('hello', 1, client=absent)).ok( - error_response) + primary.receives('ismaster', 1, client=absent).ok(error_response) + secondary.receives('ismaster', 1, client=absent).ok(error_response) # The heartbeat retry (on a new connection) does have client data. - heartbeat = primary.receives(Command('ismaster')) + heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) - heartbeat.reply(primary_response) + heartbeat.ok(primary_response) - heartbeat = secondary.receives(Command('ismaster')) + heartbeat = secondary.receives('ismaster') _check_handshake_data(heartbeat) - heartbeat.reply(secondary_response) + heartbeat.ok(secondary_response) # Still no client data. - primary.receives(OpMsg('hello', 1, client=absent)).reply(**primary_response) - secondary.receives(OpMsg('hello', 1, client=absent)).reply(**secondary_response) + primary.receives('ismaster', 1, client=absent).ok(primary_response) + secondary.receives('ismaster', 1, client=absent).ok(secondary_response) - primary.receives(OpMsg('hello', 1, client=absent)).hangup() - secondary.receives(OpMsg('hello')).hangup() # After a disconnect, next ismaster has client data again. + primary.receives('ismaster', 1, client=absent).hangup() heartbeat = primary.receives('ismaster') _check_handshake_data(heartbeat) - hb_port = deepcopy(heartbeat.client_port) - heartbeat.reply(primary_response) + heartbeat.ok(primary_response) + + secondary.autoresponds('ismaster', secondary_response) # Start a command, so the client opens an application socket. - future = go(client.db.command, "whatever") - handshook = None + future = go(client.db.command, 'whatever') + for request in primary: if request.matches(Command('ismaster')): - if handshook is None: - handshook = True + if request.client_port == heartbeat.client_port: + # This is the monitor again, keep going. + request.ok(primary_response) else: - handshook = False - # Handshaking a new application socket. - _check_handshake_data(request) - request.reply(**primary_response) - elif request.matches(OpMsg("whatever")): + # Handshaking a new application socket. + _check_handshake_data(request) + request.ok(primary_response) + else: # Command succeeds. - request.reply(OpMsgReply(**primary_response)) - # If handshook is false it means it tried to handshake - # multiple times. If it's None it means it never tried. - assert future() and handshook + request.assert_matches(OpMsg('whatever')) + request.ok() + assert future() return - else: - request.reply(isWritablePrimary=True, - **primary_response) def test_client_handshake_saslSupportedMechs(self): server = MockupDB() @@ -240,7 +238,7 @@ def responder(request): "saslSupportedMechs": [ "SCRAM-SHA-1"]})) else: - return request.reply(OpMsgReply(**primary_response)) + return request.reply(**primary_response) server.autoresponds(responder) self.addCleanup(server.stop) @@ -248,13 +246,12 @@ def responder(request): client = MongoClient(server.uri, username='username', password='password', - appname='my app', ) self.addCleanup(client.close) self.assertRaises(OperationFailure, client.db.collection.find_one, {"a": 1}) - assert self.found_auth_msg, """Could not find authentication command - with correct protocol""" + self.assertTrue(self.found_auth_msg, "Could not find authentication " + "command with correct protocol") if __name__ == '__main__': From 1b5792cb770d8b1ac8af7e92d4e5d8546b6c3429 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 13:47:36 -0800 Subject: [PATCH 15/18] fix if statement with parentheses --- pymongo/pool.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pymongo/pool.py b/pymongo/pool.py index 584d7ecd52..a0868c9916 100644 --- a/pymongo/pool.py +++ b/pymongo/pool.py @@ -552,8 +552,7 @@ def unpin(self): def hello_cmd(self): # Handshake spec requires us to use OP_MSG+hello command for the # initial handshake in load balanced or versioned api mode. - if (self.opts.server_api or self.hello_ok or self.opts.load_balanced - ): + if self.opts.server_api or self.hello_ok or self.opts.load_balanced: self.op_msg_enabled = True return SON([(HelloCompat.CMD, 1)]) else: From 66cc32da015b6b82ff2961bc0f94cabdd4a054a4 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 13:48:52 -0800 Subject: [PATCH 16/18] remove stuff that is no longer needed now that test is reverted --- test/mockupdb/test_handshake.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index 64dcf3a939..aebf83d616 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -21,7 +21,6 @@ from bson.objectid import ObjectId import unittest -from copy import deepcopy def test_hello_with_option(self, protocol, **kwargs): @@ -39,6 +38,7 @@ def respond(r): "loadBalanced") else {} return r.reply(OpMsgReply(minWireVersion=0, maxWireVersion=13, **kwargs, **load_balanced_kwargs)) + primary.autoresponds(respond) primary.run() self.addCleanup(primary.stop) From 4f253deb8910fea0b1c2a15a44d34e2acd140c28 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 13:50:04 -0800 Subject: [PATCH 17/18] fixed the wrong whitespace --- test/mockupdb/test_handshake.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index aebf83d616..f5059752df 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -38,7 +38,6 @@ def respond(r): "loadBalanced") else {} return r.reply(OpMsgReply(minWireVersion=0, maxWireVersion=13, **kwargs, **load_balanced_kwargs)) - primary.autoresponds(respond) primary.run() self.addCleanup(primary.stop) @@ -51,7 +50,7 @@ def respond(r): appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) - + self.addCleanup(client.close) # We have an autoresponder luckily, so no need for `go()`. From 2c6315c9a7f761f8d33f5b324760530f50cb0d98 Mon Sep 17 00:00:00 2001 From: julius Date: Wed, 2 Feb 2022 15:18:16 -0800 Subject: [PATCH 18/18] fix missing comment line --- test/mockupdb/test_handshake.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/mockupdb/test_handshake.py b/test/mockupdb/test_handshake.py index f5059752df..c15aaff9b8 100644 --- a/test/mockupdb/test_handshake.py +++ b/test/mockupdb/test_handshake.py @@ -50,7 +50,7 @@ def respond(r): appname='my app', # For _check_handshake_data() **dict([k_map.get((k, v), (k, v)) for k, v in kwargs.items()])) - + self.addCleanup(client.close) # We have an autoresponder luckily, so no need for `go()`. @@ -227,6 +227,7 @@ def responder(request): if request.matches(OpMsg, saslStart=1): self.found_auth_msg = True # Immediately closes the connection with + # OperationFailure: Server returned an invalid nonce. request.reply(OpMsgReply(**primary_response, **{'payload': b'r=wPleNM8S5p8gMaffMDF7Py4ru9bnmmoqb0'