Skip to content

Commit c79179d

Browse files
committed
Fix pandas-dev#587 Add a best effort cache option for list_libraries
list_libraries is a frequently called operation that can get quite slow with a high amount of libraries. I noticed 4-5s+ for 7-8k libraries. Also the number of libraries is fairly constant, so not much point in refetching the entire list every time. This adds a new collection in mongo which acts as a global cache for all queries. The meta_db can be used for more accounting and metadata storage in future, and this collection can be used for caching other stuff as well. It's best effort as if you don't have permissions or the data does not exist in the cache it will just use the list_libraries. The idea is to not require admin for reads and not expect the user to have write access to a collection on writes.
1 parent aefdfd6 commit c79179d

File tree

4 files changed

+168
-5
lines changed

4 files changed

+168
-5
lines changed

arctic/_cache.py

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import logging
2+
from datetime import datetime, timedelta
3+
4+
from pymongo.errors import OperationFailure
5+
6+
from ._config import CACHE_COLL, CACHE_DB
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class Cache:
12+
def __init__(self, client, cache_expiry=3600, cache_db=CACHE_DB, cache_col=CACHE_COLL):
13+
self._client = client
14+
self._cachedb = client[cache_db]
15+
self._cachecol = None
16+
try:
17+
if cache_col not in self._cachedb.collection_names():
18+
self._cachedb.create_collection(cache_col).create_index("date", expireAfterSeconds=cache_expiry)
19+
except OperationFailure as op:
20+
logging.debug("This is fine if you are not admin. The collection should already be created for you: %s", op)
21+
22+
self._cachecol = self._cachedb[cache_col]
23+
24+
def get(self, key, newer_than_secs=-1):
25+
"""
26+
27+
:param key: Key for the dataset. eg. list_libraries.
28+
:param newer_than_secs: -1 to indicate use cache if available. Used to indicate what level of staleness
29+
in seconds is tolerable.
30+
:return: None unless if there is non stale data present in the cache.
31+
"""
32+
try:
33+
if not self._cachecol:
34+
# Collection not created or no permissions to read from it.
35+
return None
36+
coll_data = self._cachecol.find_one({"type": key})
37+
# Check that there is data in cache and it's not stale.
38+
if coll_data and (
39+
newer_than_secs == -1 or
40+
datetime.utcnow() < coll_data['date'] + timedelta(seconds=newer_than_secs)
41+
):
42+
return coll_data['data']
43+
except OperationFailure as op:
44+
logging.warning("Could not read from cache due to: %s. Ask your admin to give read permissions on %s:%s",
45+
op, CACHE_DB, CACHE_COLL)
46+
47+
return None
48+
49+
def set(self, key, data):
50+
try:
51+
self._cachecol.update_one(
52+
{"type": key},
53+
{"$set": {"type": key, "date": datetime.utcnow(), "data": data}},
54+
upsert=True
55+
)
56+
except OperationFailure as op:
57+
logging.debug("This operation is to be run with admin permissions. Should be fine: %s", op)
58+
59+
def append(self, key, append_data):
60+
# Not upserting here as this is meant to be use when there is already data for this key.
61+
try:
62+
self._cachecol.update_one(
63+
{'type': key},
64+
{'$push': {'data': append_data}}
65+
)
66+
except OperationFailure as op:
67+
logging.debug("Admin is required to append to the cache: %s", op)

arctic/_config.py

+8
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,11 @@ class FwPointersCfg(Enum):
9292
# ---------------------------
9393
# Configures the size of the workers pools used for async arctic requests
9494
ARCTIC_ASYNC_NWORKERS = os.environ.get('ARCTIC_ASYNC_NWORKERS', 4)
95+
96+
97+
# -------------------------------
98+
# Flag used for indicating caching levels. For now just for list_libraries.
99+
# -------------------------------
100+
ENABLE_CACHE = not bool(os.environ.get('DISABLE_CACHE'))
101+
CACHE_COLL = 'cache'
102+
CACHE_DB = 'meta_db'

arctic/arctic.py

+34-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from pymongo.errors import OperationFailure, AutoReconnect
88
from six import string_types
99

10+
from ._cache import Cache
11+
from ._config import ENABLE_CACHE
1012
from ._util import indent
1113
from .auth import authenticate, get_auth
1214
from .chunkstore import chunkstore
@@ -107,6 +109,7 @@ def __init__(self, mongo_host, app_name=APPLICATION_NAME, allow_secondary=False,
107109
self._lock = threading.RLock()
108110
self._pid = os.getpid()
109111
self._pymongo_kwargs = kwargs
112+
self._cache = None
110113

111114
if isinstance(mongo_host, string_types):
112115
self._given_instance = False
@@ -118,6 +121,7 @@ def __init__(self, mongo_host, app_name=APPLICATION_NAME, allow_secondary=False,
118121
mongo_host.server_info()
119122
self.mongo_host = ",".join(["{}:{}".format(x[0], x[1]) for x in mongo_host.nodes])
120123
self._adminDB = self._conn.admin
124+
self._cache = Cache(self._conn)
121125

122126
@property
123127
@mongo_retry
@@ -143,6 +147,7 @@ def _conn(self):
143147
serverSelectionTimeoutMS=self._server_selection_timeout,
144148
**self._pymongo_kwargs)
145149
self._adminDB = self.__conn.admin
150+
self._cache = Cache(self.__conn)
146151

147152
# Authenticate against admin for the user
148153
auth = get_auth(self.mongo_host, self._application_name, 'admin')
@@ -183,13 +188,16 @@ def __getstate__(self):
183188
def __setstate__(self, state):
184189
return Arctic.__init__(self, **state)
185190

186-
@mongo_retry
187-
def list_libraries(self):
191+
def list_libraries(self, newer_than_secs=-1):
188192
"""
189193
Returns
190194
-------
191195
list of Arctic library names
192196
"""
197+
return self._list_libraries_cached(newer_than_secs) if ENABLE_CACHE else self._list_libraries()
198+
199+
@mongo_retry
200+
def _list_libraries(self):
193201
libs = []
194202
for db in self._conn.list_database_names():
195203
if db.startswith(self.DB_PREFIX + '_'):
@@ -202,6 +210,25 @@ def list_libraries(self):
202210
libs.append(coll[:-1 * len(self.METADATA_COLL) - 1])
203211
return libs
204212

213+
# Better to be pessimistic here and not retry.
214+
def _list_libraries_cached(self, newer_than_secs=-1):
215+
"""
216+
Returns
217+
-------
218+
List of Arctic library names from a cached collection (global per mongo cluster) in mongo.
219+
Long term list_libraries should have a use_cached argument.
220+
"""
221+
_ = self._conn # Ensures the connection exists and cache is initialized with it.
222+
cache_data = self._cache.get('list_libraries', newer_than_secs)
223+
if cache_data:
224+
logger.debug('Library names are in cache.')
225+
return cache_data
226+
227+
return self._list_libraries()
228+
229+
def reload_cache(self):
230+
self._cache.set('list_libraries', self._list_libraries())
231+
205232
def library_exists(self, library):
206233
"""
207234
Check whether a given library exists.
@@ -260,6 +287,8 @@ def initialize_library(self, library, lib_type=VERSION_STORE, **kwargs):
260287
if not lib.get_quota():
261288
lib.set_quota(10 * 1024 * 1024 * 1024)
262289

290+
self._cache.append('list_libraries', library)
291+
263292
@mongo_retry
264293
def delete_library(self, library):
265294
"""
@@ -550,9 +579,9 @@ def to_gigabytes(bytes_):
550579
count = stats['totals']['count']
551580
if size >= self.quota:
552581
raise QuotaExceededException("Mongo Quota Exceeded: %s %.3f / %.0f GB used" % (
553-
'.'.join([self.database_name, self.library]),
554-
to_gigabytes(size),
555-
to_gigabytes(self.quota)))
582+
'.'.join([self.database_name, self.library]),
583+
to_gigabytes(size),
584+
to_gigabytes(self.quota)))
556585

557586
# Quota not exceeded, print an informational message and return
558587
try:

tests/integration/test_arctic.py

+59
Original file line numberDiff line numberDiff line change
@@ -233,3 +233,62 @@ def test_library_exists_no_auth(arctic):
233233
assert arctic.library_exists('test')
234234
assert AB.return_value.get_library_type.called
235235
assert not arctic.library_exists('nonexistentlib')
236+
237+
238+
def test_list_libraries_cached(arctic):
239+
libs = ['test1', 'test2']
240+
for lib in libs:
241+
arctic.initialize_library(lib)
242+
243+
# Should use uncached data as reload_cache is required for initializing for the first time.
244+
assert sorted(libs) == sorted(arctic.list_libraries())
245+
246+
# Check that the cached collection does not exist right now.
247+
cache = arctic._cache
248+
assert not cache.get('list_libraries')
249+
250+
# Should call list_libraries if cache is empty.
251+
with patch('arctic.arctic.Arctic._list_libraries', return_value=libs) as uncached_list_libraries:
252+
assert arctic._list_libraries_cached() == libs
253+
uncached_list_libraries.assert_called()
254+
255+
# Reload cache and check that it has data
256+
arctic.reload_cache()
257+
assert sorted(cache.get('list_libraries')) == sorted(libs)
258+
259+
# Should fetch it from cache the second time.
260+
with patch('arctic.arctic.Arctic._list_libraries', return_value=libs) as uncached_list_libraries:
261+
assert arctic._list_libraries_cached() == libs
262+
uncached_list_libraries.assert_not_called()
263+
264+
265+
def test_initialize_library_adds_to_cache(arctic):
266+
libs = ['test1', 'test2']
267+
268+
for lib in libs:
269+
arctic.initialize_library(lib)
270+
271+
arctic.reload_cache()
272+
assert arctic._list_libraries_cached() == arctic._list_libraries()
273+
274+
# Add another lib
275+
arctic.initialize_library('test3')
276+
277+
assert sorted(arctic._cache.get('list_libraries')) == ['test1', 'test2', 'test3']
278+
279+
280+
def test_cache_does_not_return_stale_data(arctic):
281+
libs = ['test1', 'test2']
282+
283+
for lib in libs:
284+
arctic.initialize_library(lib)
285+
286+
arctic.reload_cache()
287+
assert arctic._list_libraries_cached() == arctic._list_libraries()
288+
289+
time.sleep(0.2)
290+
291+
# Should call uncached list_libraries if the data is stale according to caller.
292+
with patch('arctic.arctic.Arctic._list_libraries', return_value=libs) as uncached_list_libraries:
293+
assert arctic._list_libraries_cached(newer_than_secs=0.1) == libs
294+
uncached_list_libraries.assert_called()

0 commit comments

Comments
 (0)