Skip to content

Commit 58e29a8

Browse files
dimospedbmoscon
authored andcommitted
Optimize auth (fix for pandas-dev#464 , also potential fix for pandas-dev#250 ) (pandas-dev#460)
* Don't re-authenticate with every call to read/access the top level collection, unless connection is reset. Thread-safe. Add arctic_reset method which is called by Store implementation upon init, to force reconnect/re-auth Instead of resetting Arctic connection everytime we instantiate a new Store, only trigger re-authentication for the Databases added multiprocessing safety for Arctic connection and tests make only one getpid call simplified the design, auth is called either at instantiation of the artict top level lib, or if Arctic is reset (explicitly or afer fork) * enhanced the integration tests for arctic, excercised also the reset() * incorporating PR comment, print is not necessary here * PR comment change: don't require/expect from Store implementation to trigger _reset_auth(), trigger it on ArcticLibraryBinding instantiation and whenever Acttic's connection is reset * added integration test to verify re-auth is triggered when Arctic is reset * leftover comment was removed * fixed bug with ArcticLibraryBinding not picking up the arctic reset, improved test coverage, split the multithreading tests to separate files * nicer call count handling in test * added the changes to the change log
1 parent 8690f27 commit 58e29a8

File tree

6 files changed

+145
-11
lines changed

6 files changed

+145
-11
lines changed

CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
* Bugfix: #385 exceptions during quota statistics no longer kill a write
99
* Feature: PR#161 TickStore.max_date now returns a datetime in the 'local' timezone
1010
* Feature: #425 user defined metadata for tickstore
11+
* Feature: #464 performance optimisation by avoiding unnecessary re-authentication
12+
* Bugfix: #250 Added multiprocessing safety, check for initialized MongoClient after fork.
1113

1214
### 1.54 (2017-10-18)
1315
* Bugfix: #440 Fix read empty MultiIndex+tz Series

arctic/arctic.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import os
23

34
import pymongo
45
from pymongo.errors import OperationFailure, AutoReconnect
@@ -99,6 +100,7 @@ def __init__(self, mongo_host, app_name=APPLICATION_NAME, allow_secondary=False,
99100
self._connect_timeout = connectTimeoutMS
100101
self._server_selection_timeout = serverSelectionTimeoutMS
101102
self._lock = threading.RLock()
103+
self._pid = os.getpid()
102104

103105
if isinstance(mongo_host, string_types):
104106
self.mongo_host = mongo_host
@@ -113,6 +115,13 @@ def __init__(self, mongo_host, app_name=APPLICATION_NAME, allow_secondary=False,
113115
@mongo_retry
114116
def _conn(self):
115117
with self._lock:
118+
# We must make sure that no MongoClient instances are used from parent after fork:
119+
# http://api.mongodb.com/python/current/faq.html#using-pymongo-with-multiprocessing
120+
curr_pid = os.getpid()
121+
if curr_pid != self._pid:
122+
self._pid = curr_pid # this line has to precede reset() otherwise we get to eternal recursion
123+
self.reset() # also triggers re-auth
124+
116125
if self.__conn is None:
117126
host = get_mongodb_uri(self.mongo_host)
118127
logger.info("Connecting to mongo: {0} ({1})".format(self.mongo_host, host))
@@ -140,7 +149,8 @@ def reset(self):
140149
self.__conn.close()
141150
self.__conn = None
142151
for _, l in self._library_cache.items():
143-
l._reset()
152+
if hasattr(l, '_reset') and callable(l._reset):
153+
l._reset() # the existence of _reset() is not guaranteed/enforced, it also triggers re-auth
144154

145155
def __str__(self):
146156
return "<Arctic at %s, connected to %s>" % (hex(id(self)), str(self._conn))
@@ -393,16 +403,21 @@ def _parse_db_lib(clz, library):
393403

394404
def __init__(self, arctic, library):
395405
self.arctic = arctic
406+
self._curr_conn = self.arctic._conn
407+
self._lock = threading.RLock()
396408
database_name, library = self._parse_db_lib(library)
397409
self.library = library
398410
self.database_name = database_name
399-
self.get_top_level_collection() # Eagerly trigger auth
411+
self._auth(self.arctic._conn[self.database_name])
400412

401413
@property
402414
def _db(self):
403-
db = self.arctic._conn[self.database_name]
404-
self._auth(db)
405-
return db
415+
with self._lock:
416+
arctic_conn = self.arctic._conn
417+
if arctic_conn is self._curr_conn:
418+
self._auth(arctic_conn[self.database_name]) # trigger re-authentication if Arctic has been reset
419+
self._curr_conn = arctic_conn
420+
return self.arctic._conn[self.database_name]
406421

407422
@property
408423
def _library_coll(self):
@@ -431,6 +446,9 @@ def _auth(self, database):
431446
if auth:
432447
authenticate(database, auth.user, auth.password)
433448

449+
def reset_auth(self):
450+
self._auth(self._db)
451+
434452
def get_name(self):
435453
return self._db.name + '.' + self._library_coll.name
436454

arctic/store/bson_store.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class BSONStore(object):
2424

2525
def __init__(self, arctic_lib):
2626
self._arctic_lib = arctic_lib
27-
self._collection = self._arctic_lib.get_top_level_collection()
27+
self._reset()
2828

2929
@classmethod
3030
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
@@ -38,6 +38,11 @@ def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
3838
logger.warning(("Library created, but couldn't enable sharding: "
3939
"%s. This is OK if you're not 'admin'"), exception)
4040

41+
@mongo_retry
42+
def _reset(self):
43+
# The default collection
44+
self._collection = self._arctic_lib.get_top_level_collection()
45+
4146
@mongo_retry
4247
def stats(self):
4348
"""

arctic/tickstore/tickstore.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def __init__(self, arctic_lib, chunk_size=100000):
9898
"""
9999
Parameters
100100
----------
101-
arctic_lib : TickStore
101+
arctic_lib : ArcticLibraryBinding
102102
Arctic Library
103103
chunk_size : int
104104
Number of ticks to store in a document before splitting to another document.

tests/integration/test_arctic.py

+25-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from datetime import datetime as dt
2-
from mock import patch
3-
from pandas.util.testing import assert_frame_equal
41
import pytest
52
import time
63

4+
from datetime import datetime as dt
5+
from mock import patch
6+
from pandas.util.testing import assert_frame_equal
77

88
from arctic.arctic import Arctic, VERSION_STORE
99
from arctic.exceptions import LibraryNotFoundException, QuotaExceededException
@@ -27,10 +27,31 @@ def test_reset_Arctic(mongo_host, library_name):
2727
arctic = Arctic(mongo_host=mongo_host)
2828
arctic.list_libraries()
2929
arctic.initialize_library(library_name, VERSION_STORE)
30-
arctic[library_name]
3130
c = arctic._conn
31+
assert arctic[library_name]._arctic_lib._curr_conn is c
3232
arctic.reset()
33+
assert c is not arctic._conn
3334
assert len(c.nodes) == 0
35+
assert arctic[library_name]._arctic_lib._curr_conn is c
36+
37+
38+
def test_re_authenticate_on_arctic_reset(mongo_host, library_name):
39+
from collections import namedtuple
40+
Cred = namedtuple('Cred', 'user, password')
41+
with patch('arctic.arctic.authenticate') as auth_mock, \
42+
patch('arctic.arctic.get_auth') as get_auth_mock:
43+
auth_mock.return_value = True
44+
get_auth_mock.return_value = Cred(user='a_username', password='a_passwd')
45+
arctic = Arctic(mongo_host=mongo_host)
46+
arctic.initialize_library(library_name, VERSION_STORE)
47+
vstore = arctic[library_name]
48+
vstore.list_symbols()
49+
auth_mock.reset_mock()
50+
arctic.reset()
51+
assert auth_mock.call_count > 0
52+
auth_mock.reset_mock()
53+
vstore.list_symbols()
54+
assert auth_mock.call_count == 0
3455

3556

3657
def test_simple(library):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import os
2+
import pytest
3+
import time
4+
from multiprocessing import Process
5+
from random import random
6+
7+
from arctic import Arctic, VERSION_STORE
8+
from arctic.exceptions import LibraryNotFoundException
9+
from arctic.store.version_store import VersionStore
10+
11+
MY_ARCTIC = None # module-level Arctic singleton
12+
13+
14+
def f(library_name, total_writes):
15+
my_pid = os.getpid()
16+
data = [str(my_pid)] * 100
17+
while True:
18+
try:
19+
vstore = MY_ARCTIC[library_name] # wait for parent to initialize
20+
break
21+
except LibraryNotFoundException:
22+
pass
23+
time.sleep(random() * 0.2)
24+
for i in range(total_writes):
25+
if i % 20 == 0: # add some randomisation, make sure that processes are multiplexed across time
26+
time.sleep(random())
27+
key = "{}_{}".format(my_pid, i)
28+
vstore.write(key, data + [key])
29+
for i in range(total_writes):
30+
key = "{}_{}".format(my_pid, i)
31+
assert vstore.read(key).data == data + [key]
32+
33+
34+
@pytest.mark.timeout(300)
35+
def test_multiprocessing_safety(mongo_host, library_name):
36+
# Create/initialize library at the parent process, then spawn children, and start them aligned in time
37+
total_processes = 64
38+
total_writes_per_child = 100
39+
40+
global MY_ARCTIC
41+
MY_ARCTIC = Arctic(mongo_host=mongo_host)
42+
43+
MY_ARCTIC.initialize_library(library_name, VERSION_STORE)
44+
assert isinstance(MY_ARCTIC.get_library(library_name), VersionStore)
45+
46+
processes = [Process(target=f, args=(library_name, total_writes_per_child)) for _ in range(total_processes)]
47+
48+
for p in processes:
49+
p.start()
50+
51+
for p in processes:
52+
p.join()
53+
54+
for p in processes:
55+
assert p.exitcode == 0
56+
57+
assert isinstance(MY_ARCTIC.get_library(library_name), VersionStore)
58+
59+
60+
@pytest.mark.timeout(300)
61+
def test_multiprocessing_safety_parent_children_race(mongo_host, library_name):
62+
# Create Arctic and directly fork/start children (no wait)
63+
total_iterations = 12
64+
total_processes = 6
65+
total_writes_per_child = 20
66+
67+
global MY_ARCTIC
68+
69+
for i in range(total_iterations):
70+
processes = list()
71+
72+
MY_ARCTIC = Arctic(mongo_host=mongo_host)
73+
for j in range(total_processes):
74+
p = Process(target=f, args=(library_name, total_writes_per_child))
75+
p.start() # start directly, don't wait to create first all children procs
76+
processes.append(p)
77+
78+
MY_ARCTIC.initialize_library(library_name, VERSION_STORE) # this will unblock spinning children
79+
80+
for p in processes:
81+
p.join()
82+
83+
for p in processes:
84+
assert p.exitcode == 0
85+
86+
MY_ARCTIC.reset()
87+
88+
assert isinstance(MY_ARCTIC.get_library(library_name), VersionStore)

0 commit comments

Comments
 (0)