Skip to content

Commit aebfe81

Browse files
authored
Add advanced query for list_symbols() similar to VersionStore (pandas-dev#562)
* Add advanced query for list_symbols() similar to VersionStore * Optimize aggregation pipeline * Fix tests: - Arctic uses older 'block' level LZ4 compression - fix imports - move LZ4 related tests from unit to integration tests - pandas import for newer versions (>= 0.23.x)
1 parent 08d3648 commit aebfe81

File tree

6 files changed

+190
-39
lines changed

6 files changed

+190
-39
lines changed

arctic/serialization/numpy_arrays.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,15 @@
99
from pandas.lib import infer_dtype
1010

1111
try:
12-
from pandas._libs.lib import max_len_string_array
12+
# pandas >= 0.23.0
13+
from pandas._libs.writers import max_len_string_array
1314
except ImportError:
14-
from pandas.lib import max_len_string_array
15+
try:
16+
# pandas [0.20.0, 0.22.x]
17+
from pandas._libs.lib import max_len_string_array
18+
except ImportError:
19+
# pandas <= 0.19.x
20+
from pandas.lib import max_len_string_array
1521

1622
from bson import Binary, SON
1723

arctic/store/bson_store.py

+7
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ def count(self, filter, **kwargs):
163163
"""
164164
return self._collection.count(filter, **kwargs)
165165

166+
@mongo_retry
167+
def aggregate(self, pipeline, **kwargs):
168+
"""
169+
See http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.aggregate
170+
"""
171+
return self._collection.aggregate(pipeline, **kwargs)
172+
166173
@mongo_retry
167174
def distinct(self, key, **kwargs):
168175
"""

arctic/store/metadata_store.py

+58-2
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
from ..decorators import mongo_retry
1010
from ..exceptions import NoDataFoundException
1111
from .bson_store import BSONStore
12+
import six
1213

1314
logger = logging.getLogger(__name__)
1415

1516
METADATA_STORE_TYPE = 'MetadataStore'
1617

18+
1719
class MetadataStore(BSONStore):
1820
"""
1921
Metadata Store. This stores metadata with timestamps to allow temporal queries.
@@ -57,8 +59,62 @@ def __repr__(self):
5759
return str(self)
5860

5961
@mongo_retry
60-
def list_symbols(self):
61-
return self.distinct('symbol')
62+
def list_symbols(self, regex=None, as_of=None, **kwargs):
63+
"""
64+
Return the symbols in this library.
65+
66+
Parameters
67+
----------
68+
as_of : `datetime.datetime`
69+
filter symbols valid at given time
70+
regex : `str`
71+
filter symbols by the passed in regular expression
72+
kwargs :
73+
kwarg keys are used as fields to query for symbols with metadata matching
74+
the kwargs query
75+
76+
Returns
77+
-------
78+
String list of symbols in the library
79+
"""
80+
81+
# Skip aggregation pipeline
82+
if not (regex or as_of or kwargs):
83+
return self.distinct('symbol')
84+
85+
# Index-based query part
86+
index_query = {}
87+
if as_of is not None:
88+
index_query['start_time'] = {'$lte': as_of}
89+
90+
if regex or as_of:
91+
# make sure that symbol is present in query even if only as_of is specified to avoid document scans
92+
# see 'Pipeline Operators and Indexes' at https://docs.mongodb.com/manual/core/aggregation-pipeline/#aggregation-pipeline-operators-and-performance
93+
index_query['symbol'] = {'$regex': regex or '^'}
94+
95+
# Document query part
96+
data_query = {}
97+
if kwargs:
98+
for k, v in six.iteritems(kwargs):
99+
data_query['metadata.' + k] = v
100+
101+
# Sort using index, relying on https://docs.mongodb.com/manual/core/aggregation-pipeline-optimization/
102+
pipeline = [{'$sort': {'symbol': pymongo.ASCENDING,
103+
'start_time': pymongo.DESCENDING}}]
104+
105+
# Index-based filter on symbol and start_time
106+
if index_query:
107+
pipeline.append({'$match': index_query})
108+
# Group by 'symbol' and get the latest known data
109+
pipeline.append({'$group': {'_id': '$symbol',
110+
'metadata': {'$first': '$metadata'}}})
111+
# Match the data fields
112+
if data_query:
113+
pipeline.append({'$match': data_query})
114+
# Return only 'symbol' field value
115+
pipeline.append({'$project': {'_id': 0, 'symbol': '$_id'}})
116+
117+
return sorted(r['symbol'] for r in self.aggregate(pipeline))
62118

63119
@mongo_retry
64120
def has_symbol(self, symbol):

tests/integration/test_compress_integration.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,28 @@
33
try:
44
from lz4 import compressHC as lz4_compress, decompress as lz4_decompress
55
except ImportError as e:
6-
from lz4.frame import compress as lz4_compress, decompress as lz4_decompress
6+
from lz4.block import compress as lz4_compress, decompress as lz4_decompress
77

88
import string
99
import pytest
1010
import six
1111
from datetime import datetime as dt
12-
1312
import arctic._compress as c
1413

1514

15+
@pytest.mark.parametrize("compress,decompress", [
16+
(c.compress, lz4_decompress),
17+
(c.compressHC, lz4_decompress),
18+
(lz4_compress, c.decompress)
19+
], ids=('arctic/lz4',
20+
'arcticHC/lz4',
21+
'lz4/arctic'))
22+
def test_roundtrip(compress, decompress):
23+
_str = b"hello world"
24+
cstr = compress(_str)
25+
assert _str == decompress(cstr)
26+
27+
1628
@pytest.mark.parametrize("n, length", [(300, 5e4), # micro TS
1729
(5, 2e6), # Futures TS
1830
(10, 2e6), # Futures TS
+98-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,105 @@
1+
import datetime as dt
2+
13
from arctic.store.metadata_store import MetadataStore
24
from mock import create_autospec, call
35

46

57
def test_ensure_index():
68
ms = create_autospec(MetadataStore)
79
MetadataStore._ensure_index(ms)
8-
assert ms.create_index.call_args_list == [call([('symbol', 1), ('start_time', -1)], unique=True, background=True)]
10+
assert ms.create_index.call_args_list == [call([('symbol', 1),
11+
('start_time', -1)],
12+
unique=True,
13+
background=True)]
14+
15+
16+
def test_list_symbols_simple():
17+
ms = create_autospec(MetadataStore)
18+
ms.distinct.return_value = []
19+
20+
MetadataStore.list_symbols(ms)
21+
ms.distinct.assert_called_once_with('symbol')
22+
23+
24+
def test_list_symbols_regex():
25+
ms = create_autospec(MetadataStore)
26+
ms.aggregate.return_value = []
27+
28+
expected_pipeline = [
29+
{'$sort': {'symbol': 1, 'start_time': -1}},
30+
{'$match': {'symbol': {'$regex': 'test.*'}}},
31+
{'$group': {'_id': '$symbol', 'metadata': {'$first': '$metadata'}}},
32+
{'$project': {'_id': 0, 'symbol': '$_id'}}
33+
]
34+
35+
MetadataStore.list_symbols(ms, regex='test.*')
36+
ms.aggregate.assert_called_once_with(expected_pipeline)
37+
38+
39+
def test_list_symbols_as_of():
40+
ms = create_autospec(MetadataStore)
41+
ms.aggregate.return_value = []
42+
43+
expected_pipeline = [
44+
{'$sort': {'symbol': 1, 'start_time': -1}},
45+
{'$match': {'symbol': {'$regex': '^'},
46+
'start_time': {'$lte': dt.datetime(2018, 5, 11)}}},
47+
{'$group': {'_id': '$symbol', 'metadata': {'$first': '$metadata'}}},
48+
{'$project': {'_id': 0, 'symbol': '$_id'}}
49+
]
50+
51+
MetadataStore.list_symbols(ms, as_of=dt.datetime(2018, 5, 11))
52+
ms.aggregate.assert_called_once_with(expected_pipeline)
53+
54+
55+
def test_list_symbols_as_of_regex():
56+
ms = create_autospec(MetadataStore)
57+
ms.aggregate.return_value = []
58+
59+
expected_pipeline = [
60+
{'$sort': {'symbol': 1, 'start_time': -1}},
61+
{'$match': {'symbol': {'$regex': 'test.*'},
62+
'start_time': {'$lte': dt.datetime(2018, 5, 11)}}},
63+
{'$group': {'_id': '$symbol', 'metadata': {'$first': '$metadata'}}},
64+
{'$project': {'_id': 0, 'symbol': '$_id'}}
65+
]
66+
67+
MetadataStore.list_symbols(ms,
68+
regex='test.*',
69+
as_of=dt.datetime(2018, 5, 11))
70+
ms.aggregate.assert_called_once_with(expected_pipeline)
71+
72+
73+
def test_list_symbols_metadata_query():
74+
ms = create_autospec(MetadataStore)
75+
ms.aggregate.return_value = []
76+
77+
expected_pipeline = [
78+
{'$sort': {'symbol': 1, 'start_time': -1}},
79+
{'$group': {'_id': '$symbol', 'metadata': {'$first': '$metadata'}}},
80+
{'$match': {'metadata.foo': 'bar'}},
81+
{'$project': {'_id': 0, 'symbol': '$_id'}}
82+
]
83+
84+
MetadataStore.list_symbols(ms, foo='bar')
85+
ms.aggregate.assert_called_once_with(expected_pipeline)
86+
87+
88+
def test_list_symbols_all_options():
89+
ms = create_autospec(MetadataStore)
90+
ms.aggregate.return_value = []
91+
92+
expected_pipeline = [
93+
{'$sort': {'symbol': 1, 'start_time': -1}},
94+
{'$match': {'symbol': {'$regex': 'test.*'},
95+
'start_time': {'$lte': dt.datetime(2018, 5, 11)}}},
96+
{'$group': {'_id': '$symbol', 'metadata': {'$first': '$metadata'}}},
97+
{'$match': {'metadata.foo': 'bar'}},
98+
{'$project': {'_id': 0, 'symbol': '$_id'}}
99+
]
100+
101+
MetadataStore.list_symbols(ms,
102+
regex='test.*',
103+
as_of=dt.datetime(2018, 5, 11),
104+
foo='bar')
105+
ms.aggregate.assert_called_once_with(expected_pipeline)

tests/unit/test_compress.py

+5-32
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
import lz4
21
import pytest
32
import random
43
import string
54

65
import arctic._compress as c
76

87

9-
def test_roundtrip():
8+
@pytest.mark.parametrize("compress",
9+
[c.compress, c.compressHC],
10+
ids=('arctic', 'arcticHC'))
11+
def test_roundtrip(compress):
1012
_str = b"hello world"
11-
cstr = c.compress(_str)
13+
cstr = compress(_str)
1214
assert _str == c.decompress(cstr)
1315

1416

@@ -19,35 +21,6 @@ def test_roundtrip_multi(n):
1921
assert _str == c.decompress(cstr)
2022

2123

22-
def test_roundtripHC():
23-
_str = b"hello world"
24-
cstr = c.compressHC(_str)
25-
assert _str == c.decompress(cstr)
26-
27-
28-
def test_roundtripLZ4():
29-
_str = b"hello world"
30-
cstr = lz4.compress(_str)
31-
assert _str == c.decompress(cstr)
32-
33-
34-
def test_roundtripLZ4Back():
35-
_str = b"hello world"
36-
cstr = c.compress(_str)
37-
assert _str == lz4.decompress(cstr)
38-
39-
40-
def test_roundtripLZ4HC():
41-
_str = b"hello world"
42-
cstr = lz4.compressHC(_str)
43-
assert _str == c.decompress(cstr)
44-
45-
46-
def test_roundtripLZ4HCBack():
47-
_str = b"hello world"
48-
cstr = c.compressHC(_str)
49-
assert _str == lz4.decompress(cstr)
50-
5124

5225
@pytest.mark.parametrize("n, length", [(1, 10), (100, 10), (1000, 10)])
5326
def test_roundtrip_arr(n, length):

0 commit comments

Comments
 (0)