Skip to content

Commit aba157a

Browse files
authored
MetadataStore (pandas-dev#392)
* Initial commit on MetadataStore and preliminary unit tests * Fix bugs and add integration tests * Remove possible duplicate in write_metadata_history and update strings and names * Move _collection to top_level_collection.metadata * Fix unit tests * Push query methods to BSONStore * Change names of class methods * Remove unused _insert() * Fix magical return in read() for readability * Add prepend() and read_history() and change input for write_history() to agree with the output of read_history() * Fix typo and add integrity check for end_time in tests * Add mongo_retry at various places
1 parent 8e18bd2 commit aba157a

File tree

7 files changed

+438
-3
lines changed

7 files changed

+438
-3
lines changed

arctic/arctic.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,28 @@
99
from .decorators import mongo_retry
1010
from .exceptions import LibraryNotFoundException, ArcticException, QuotaExceededException
1111
from .hooks import get_mongodb_uri
12-
from .store import version_store, bson_store
12+
from .store import version_store, bson_store, metadata_store
1313
from .tickstore import tickstore, toplevel
1414
from .chunkstore import chunkstore
1515
from six import string_types
1616

1717

18-
__all__ = ['Arctic', 'VERSION_STORE', 'TICK_STORE', 'CHUNK_STORE', 'register_library_type']
18+
__all__ = ['Arctic', 'VERSION_STORE', 'METADATA_STORE', 'TICK_STORE', 'CHUNK_STORE', 'register_library_type']
1919

2020
logger = logging.getLogger(__name__)
2121

2222
# Default Arctic application name: 'arctic'
2323
APPLICATION_NAME = 'arctic'
2424
VERSION_STORE = version_store.VERSION_STORE_TYPE
25+
METADATA_STORE = metadata_store.METADATA_STORE_TYPE
2526
TICK_STORE = tickstore.TICK_STORE_TYPE
2627
CHUNK_STORE = chunkstore.CHUNK_STORE_TYPE
2728
LIBRARY_TYPES = {version_store.VERSION_STORE_TYPE: version_store.VersionStore,
2829
tickstore.TICK_STORE_TYPE: tickstore.TickStore,
2930
toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore,
3031
chunkstore.CHUNK_STORE_TYPE: chunkstore.ChunkStore,
31-
bson_store.BSON_STORE_TYPE: bson_store.BSONStore
32+
bson_store.BSON_STORE_TYPE: bson_store.BSONStore,
33+
metadata_store.METADATA_STORE_TYPE: metadata_store.MetadataStore
3234
}
3335

3436

@@ -53,6 +55,7 @@ class Arctic(object):
5355
(other Python types are pickled)
5456
- arctic.TICK_STORE - Tick specific library. Supports 'snapshots', efficiently
5557
stores updates, not versioned.
58+
- arctic.METADATA_STORE - Stores metadata with timestamps
5659
5760
Arctic and ArcticLibrary are responsible for Connection setup, authentication,
5861
dispatch to the appropriate library implementation, and quotas.

arctic/fixtures/arctic.py

+6
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,9 @@ def tickstore_lib(arctic, library_name):
9696
def chunkstore_lib(arctic, library_name):
9797
arctic.initialize_library(library_name, CHUNK_STORE_TYPE)
9898
return arctic.get_library(library_name)
99+
100+
101+
@pytest.fixture(scope="function")
102+
def ms_lib(arctic, library_name):
103+
arctic.initialize_library(library_name, m.METADATA_STORE)
104+
return arctic.get_library(library_name)

arctic/store/bson_store.py

+21
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ def find(self, *args, **kwargs):
5858
"""
5959
return self._collection.find(*args, **kwargs)
6060

61+
@mongo_retry
62+
def find_one(self, *args, **kwargs):
63+
"""
64+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.find_one
65+
"""
66+
return self._collection.find_one(*args, **kwargs)
67+
6168
@mongo_retry
6269
def insert_one(self, document, **kwargs):
6370
"""
@@ -119,6 +126,20 @@ def find_one_and_replace(self, filter, replacement, **kwargs):
119126
self._arctic_lib.check_quota()
120127
return self._collection.find_one_and_replace(filter, replacement, **kwargs)
121128

129+
@mongo_retry
130+
def find_one_and_update(self, filter, update, **kwargs):
131+
"""
132+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.find_one_and_update
133+
"""
134+
self._arctic_lib.check_quota()
135+
return self._collection.find_one_and_update(filter, update, **kwargs)
136+
137+
def find_one_and_delete(self, filter, **kwargs):
138+
"""
139+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.find_one_and_delete
140+
"""
141+
return self._collection.find_one_and_delete(filter, **kwargs)
142+
122143
@mongo_retry
123144
def bulk_write(self, requests, **kwargs):
124145
"""

arctic/store/metadata_store.py

+235
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
from datetime import datetime as dt
2+
import logging
3+
4+
import pandas as pd
5+
6+
import bson
7+
import pymongo
8+
from ..decorators import mongo_retry
9+
from ..exceptions import NoDataFoundException
10+
11+
from .bson_store import BSONStore
12+
13+
logger = logging.getLogger(__name__)
14+
15+
METADATA_STORE_TYPE = 'MetadataStore'
16+
17+
class MetadataStore(BSONStore):
18+
"""
19+
Metadata Store. This stores metadata with timestamps to allow temporal queries.
20+
21+
Entries are stored in the following format:
22+
'symbol': symbol name
23+
'metadata': metadata to be persisted
24+
'start_time': when entry becomes effective
25+
'end_time': (Optional) when entry expires. If not set, it is still in effect
26+
27+
For each symbol end_time of a entry should match start_time of the next one except for the current entry.
28+
"""
29+
30+
@classmethod
31+
def initialize_library(cls, arctic_lib, hashed=True, **kwargs):
32+
MetadataStore(arctic_lib)._ensure_index()
33+
BSONStore.initialize_library(arctic_lib, hashed, **kwargs)
34+
35+
@mongo_retry
36+
def _ensure_index(self):
37+
self.create_index([('symbol', pymongo.ASCENDING), ('start_time', pymongo.DESCENDING)],
38+
unique=True, background=True)
39+
40+
def __init__(self, arctic_lib):
41+
self._arctic_lib = arctic_lib
42+
self._reset()
43+
44+
def _reset(self):
45+
self._collection = self._arctic_lib.get_top_level_collection().metadata
46+
47+
@mongo_retry
48+
def list_symbols(self):
49+
return self.distinct('symbol')
50+
51+
@mongo_retry
52+
def has_symbol(self, symbol):
53+
return self.find_one({'symbol': symbol}) is not None
54+
55+
@mongo_retry
56+
def read_history(self, symbol):
57+
"""
58+
Return all metadata saved for `symbol`
59+
60+
Parameters
61+
----------
62+
symbol : `str`
63+
symbol name for the item
64+
65+
Returns
66+
-------
67+
pandas.DateFrame containing timestamps and metadata entries
68+
"""
69+
find = self.find({'symbol': symbol}, sort=[('start_time', pymongo.ASCENDING)])
70+
times = []
71+
entries = []
72+
for item in find:
73+
times.append(item['start_time'])
74+
entries.append(item['metadata'])
75+
return pd.DataFrame({symbol: entries}, times)
76+
77+
@mongo_retry
78+
def read(self, symbol):
79+
"""
80+
Return current metadata saved for `symbol`
81+
82+
Parameters
83+
----------
84+
symbol : `str`
85+
symbol name for the item
86+
87+
Returns
88+
-------
89+
metadata
90+
"""
91+
res = self.find_one({'symbol': symbol}, sort=[('start_time', pymongo.DESCENDING)])
92+
return res['metadata'] if res is not None else None
93+
94+
95+
def write_history(self, collection):
96+
"""
97+
Manually overwrite entire metadata history for symbols in `collection`
98+
99+
Parameters
100+
----------
101+
collection : `list of pandas.DataFrame`
102+
with symbol names as headers and timestamps as indices
103+
(the same format as output of read_history)
104+
Example:
105+
[pandas.DataFrame({'symbol': [{}]}, [datetime.datetime.utcnow()])]
106+
"""
107+
documents = []
108+
for dataframe in collection:
109+
if len(dataframe.columns) != 1:
110+
raise ValueError('More than one symbol found in a DataFrame')
111+
symbol = dataframe.columns[0]
112+
times = dataframe.index
113+
entries = dataframe[symbol].values
114+
if self.has_symbol(symbol):
115+
self.purge(symbol)
116+
doc = {'symbol': symbol, 'metadata': entries[0], 'start_time': times[0]}
117+
for metadata, start_time in zip(entries[1:], times[1:]):
118+
if metadata == doc['metadata']:
119+
continue
120+
doc['end_time'] = start_time
121+
documents.append(doc)
122+
doc = {'symbol': symbol, 'metadata': metadata, 'start_time': start_time}
123+
documents.append(doc)
124+
125+
self.insert_many(documents)
126+
127+
def append(self, symbol, metadata, start_time=None):
128+
"""
129+
Update metadata entry for `symbol`
130+
131+
Parameters
132+
----------
133+
symbol : `str`
134+
symbol name for the item
135+
metadata : `dict`
136+
to be persisted
137+
start_time : `datetime.datetime`
138+
when metadata becomes effective
139+
Default: datetime.datetime.utcnow()
140+
"""
141+
if start_time is None:
142+
start_time = dt.utcnow()
143+
old_metadata = self.find_one({'symbol': symbol}, sort=[('start_time', pymongo.DESCENDING)])
144+
if old_metadata is not None:
145+
if old_metadata['start_time'] >= start_time:
146+
raise ValueError('start_time={} is earlier than the last metadata @{}'.format(start_time,
147+
old_metadata['start_time']))
148+
if old_metadata['metadata'] == metadata:
149+
logger.warning('No change to metadata')
150+
return metadata
151+
elif metadata is None:
152+
return
153+
154+
self.find_one_and_update({'symbol': symbol}, {'$set': {'end_time': start_time}},
155+
sort=[('start_time', pymongo.DESCENDING)])
156+
document = {'_id': bson.ObjectId(), 'symbol': symbol, 'metadata': metadata, 'start_time': start_time}
157+
mongo_retry(self.insert_one)(document)
158+
159+
logger.debug('Finished writing metadata for %s', symbol)
160+
return document
161+
162+
def prepend(self, symbol, metadata, start_time=None):
163+
"""
164+
Prepend a metadata entry for `symbol`
165+
166+
Parameters
167+
----------
168+
symbol : `str`
169+
symbol name for the item
170+
metadata : `dict`
171+
to be persisted
172+
start_time : `datetime.datetime`
173+
when metadata becomes effective
174+
Default: datetime.datetime.min
175+
"""
176+
if metadata is None:
177+
return
178+
if start_time is None:
179+
start_time = dt.min
180+
old_metadata = self.find_one({'symbol': symbol}, sort=[('start_time', pymongo.ASCENDING)])
181+
if old_metadata is not None:
182+
if old_metadata['start_time'] <= start_time:
183+
raise ValueError('start_time={} is later than the first metadata @{}'.format(start_time,
184+
old_metadata['start_time']))
185+
if old_metadata['metadata'] == metadata:
186+
self.find_one_and_update({'symbol': symbol}, {'$set': {'start_time': start_time}},
187+
sort=[('start_time', pymongo.ASCENDING)])
188+
return metadata
189+
end_time = old_metadata.get('start_time')
190+
else:
191+
end_time = None
192+
193+
document = {'_id': bson.ObjectId(), 'symbol': symbol, 'metadata': metadata, 'start_time': start_time}
194+
if end_time is not None:
195+
document['end_time'] = end_time
196+
mongo_retry(self.insert_one)(document)
197+
198+
logger.debug('Finished writing metadata for %s', symbol)
199+
return document
200+
201+
def pop(self, symbol):
202+
"""
203+
Delete current metadata of `symbol`
204+
205+
Parameters
206+
----------
207+
symbol : `str`
208+
symbol name to delete
209+
210+
Returns
211+
-------
212+
Deleted metadata
213+
"""
214+
last_metadata = self.find_one({'symbol': symbol}, sort=[('start_time', pymongo.DESCENDING)])
215+
if last_metadata is None:
216+
raise NoDataFoundException('No metadata found for symbol {}'.format(symbol))
217+
218+
self.find_one_and_delete({'symbol': symbol}, sort=[('start_time', pymongo.DESCENDING)])
219+
mongo_retry(self.find_one_and_update)({'symbol': symbol}, {'$unset': {'end_time': ''}},
220+
sort=[('start_time', pymongo.DESCENDING)])
221+
222+
return last_metadata
223+
224+
@mongo_retry
225+
def purge(self, symbol):
226+
"""
227+
Delete all metadata of `symbol`
228+
229+
Parameters
230+
----------
231+
symbol : `str`
232+
symbol name to delete
233+
"""
234+
logger.warning("Deleting entire metadata history for %r from %r" % (symbol, self._arctic_lib.get_name()))
235+
self.delete_many({'symbol': symbol})

0 commit comments

Comments
 (0)