Skip to content

Commit cb0bee6

Browse files
committed
Merge pull request pandas-dev#146 from manahl/tickstore-querying
Make sure we don't fetch chunks that don't span the start point
2 parents a600275 + c00a533 commit cb0bee6

File tree

3 files changed

+117
-11
lines changed

3 files changed

+117
-11
lines changed

arctic/tickstore/tickstore.py

+16-6
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ def _mongo_date_range_query(self, symbol, date_range):
165165
if date_range.start:
166166
assert date_range.start.tzinfo
167167
start = date_range.start
168+
169+
# If all chunks start inside of the range, we default to capping to our
170+
# range so that we don't fetch any chunks from the beginning of time
171+
start_range['$gte'] = start
172+
168173
match = self._symbol_query(symbol)
169174
match.update({'s': {'$lte': start}})
170175

@@ -175,17 +180,22 @@ def _mongo_date_range_query(self, symbol, date_range):
175180
# Throw away everything but the start of every chunk and the symbol
176181
{'$project': {'_id': 0, 's': 1, 'sy': 1}},
177182
# For every symbol, get the latest chunk start (that is still before
178-
# our sought start, so all of these chunks span the start point)
183+
# our sought start)
179184
{'$group': {'_id': '$sy', 'start': {'$max': '$s'}}},
180-
# Take the earliest one of these chunk starts
181185
{'$sort': {'start': 1}},
182-
{'$limit': 1}])
186+
])
187+
# Now we need to get the earliest start of the chunk that still spans the start point.
188+
# Since we got them sorted by start, we just need to fetch their ends as well and stop
189+
# when we've seen the first such chunk
183190
try:
184-
first_dt = next(result)['start']
191+
for candidate in result:
192+
chunk = self._collection.find_one({'s': candidate['start'], 'sy': candidate['_id']}, {'e': 1})
193+
if chunk['e'].replace(tzinfo=mktz('UTC')) >= start:
194+
start_range['$gte'] = candidate['start'].replace(tzinfo=mktz('UTC'))
195+
break
185196
except StopIteration:
186197
pass
187-
if first_dt:
188-
start_range['$gte'] = first_dt
198+
189199

190200
# Find the end bound
191201
if date_range.end:

tests/integration/tickstore/test_ts_read.py

+87
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,93 @@ def test_read_chunk_boundaries(tickstore_lib):
401401
assert len(tickstore_lib.read(['SYM1', 'SYM2'], columns=None, date_range=DateRange(dt(2013, 6, 1, 12, 45, tzinfo=mktz('UTC')), dt(2013, 6, 1, 15, 00, tzinfo=mktz('UTC'))))) == 4
402402

403403

404+
def test_read_spanning_chunks(tickstore_lib):
405+
SYM1_DATA = [
406+
{'a': 1.,
407+
'b': 2.,
408+
'index': dt(2013, 6, 1, 11, 00, tzinfo=mktz('UTC'))
409+
},
410+
{'a': 3.,
411+
'b': 4.,
412+
'index': dt(2013, 6, 1, 12, 00, tzinfo=mktz('UTC'))
413+
},
414+
# Chunk boundary here
415+
{'a': 5.,
416+
'b': 6.,
417+
'index': dt(2013, 6, 1, 14, 00, tzinfo=mktz('UTC'))
418+
}
419+
]
420+
SYM2_DATA = [
421+
{'a': 7.,
422+
'b': 8.,
423+
'index': dt(2013, 6, 1, 12, 30, tzinfo=mktz('UTC'))
424+
},
425+
{'a': 9.,
426+
'b': 10.,
427+
'index': dt(2013, 6, 1, 13, 30, tzinfo=mktz('UTC'))
428+
},
429+
# Chunk boundary here
430+
{'a': 11.,
431+
'b': 12.,
432+
'index': dt(2013, 6, 1, 14, 30, tzinfo=mktz('UTC'))
433+
}
434+
]
435+
tickstore_lib._chunk_size = 2
436+
tickstore_lib.write('SYM1', SYM1_DATA)
437+
tickstore_lib.write('SYM2', SYM2_DATA)
438+
439+
# Even though the latest chunk that's the closest to the start point for SYM1 starts at 11:00, it ends before the start point,
440+
# so we want to ignore it and start from SYM2 (12:30) instead.
441+
assert tickstore_lib._mongo_date_range_query(
442+
['SYM1', 'SYM2'],
443+
date_range=DateRange(dt(2013, 6, 1, 12, 45, tzinfo=mktz('UTC')),
444+
dt(2013, 6, 1, 15, 00, tzinfo=mktz('UTC')))) == \
445+
{'s': {'$gte': dt(2013, 6, 1, 12, 30, tzinfo=mktz('UTC')), '$lte': dt(2013, 6, 1, 15, 0, tzinfo=mktz('UTC'))}}
446+
447+
448+
def test_read_inside_range(tickstore_lib):
449+
SYM1_DATA = [
450+
{'a': 1.,
451+
'b': 2.,
452+
'index': dt(2013, 6, 1, 0, 00, tzinfo=mktz('UTC'))
453+
},
454+
{'a': 3.,
455+
'b': 4.,
456+
'index': dt(2013, 6, 1, 1, 00, tzinfo=mktz('UTC'))
457+
},
458+
# Chunk boundary here
459+
{'a': 5.,
460+
'b': 6.,
461+
'index': dt(2013, 6, 1, 14, 00, tzinfo=mktz('UTC'))
462+
}
463+
]
464+
SYM2_DATA = [
465+
{'a': 7.,
466+
'b': 8.,
467+
'index': dt(2013, 6, 1, 12, 30, tzinfo=mktz('UTC'))
468+
},
469+
{'a': 9.,
470+
'b': 10.,
471+
'index': dt(2013, 6, 1, 13, 30, tzinfo=mktz('UTC'))
472+
},
473+
# Chunk boundary here
474+
{'a': 11.,
475+
'b': 12.,
476+
'index': dt(2013, 6, 1, 14, 30, tzinfo=mktz('UTC'))
477+
}
478+
]
479+
tickstore_lib._chunk_size = 2
480+
tickstore_lib.write('SYM1', SYM1_DATA)
481+
tickstore_lib.write('SYM2', SYM2_DATA)
482+
483+
# If there are no chunks spanning the range, we still cap the start range so that we don't
484+
# fetch SYM1's 0am--1am chunk
485+
assert tickstore_lib._mongo_date_range_query(
486+
['SYM1', 'SYM2'],
487+
date_range=DateRange(dt(2013, 6, 1, 10, 0, tzinfo=mktz('UTC')),
488+
dt(2013, 6, 1, 15, 0, tzinfo=mktz('UTC')))) == \
489+
{'s': {'$gte': dt(2013, 6, 1, 10, 0, tzinfo=mktz('UTC')), '$lte': dt(2013, 6, 1, 15, 0, tzinfo=mktz('UTC'))}}
490+
404491
def test_read_longs(tickstore_lib):
405492
DUMMY_DATA = [
406493
{'a': 1,

tests/unit/tickstore/test_tickstore.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,27 @@ def test_mongo_date_range_query():
1919
self = create_autospec(TickStore)
2020
self._collection = create_autospec(Collection)
2121
self._symbol_query.return_value = {"sy": { "$in" : [ "s1" , "s2"]}}
22-
self._collection.aggregate.return_value = iter([{"_id": "s1", "start": dt(2014, 1, 1, 0, 0, tzinfo=mktz())}])
22+
self._collection.aggregate.return_value = iter([{"_id": "s1", "start": dt(2014, 1, 1, 0, 0, tzinfo=mktz())},
23+
{"_id": "s2", "start": dt(2014, 1, 1, 12, 0, tzinfo=mktz())}])
24+
25+
self._collection.find_one.side_effect = [
26+
{'e': dt(2014, 1, 1, 15, 0, tzinfo=mktz())},
27+
{'e': dt(2014, 1, 2, 12, 0, tzinfo=mktz())}]
2328

2429
query = TickStore._mongo_date_range_query(self, 'sym', DateRange(dt(2014, 1, 2, 0, 0, tzinfo=mktz()),
2530
dt(2014, 1, 3, 0, 0, tzinfo=mktz())))
26-
31+
2732
assert self._collection.aggregate.call_args_list == [call([
2833
{"$match": {"s": {"$lte": dt(2014, 1, 2, 0, 0, tzinfo=mktz())}, "sy": { "$in" : [ "s1" , "s2"]}}},
2934
{"$project": {"_id": 0, "s": 1, "sy": 1}},
3035
{"$group": {"_id": "$sy", "start": {"$max": "$s"}}},
31-
{"$sort": {"start": 1}},
32-
{"$limit": 1}])]
33-
assert query == {'s': {'$gte': dt(2014, 1, 1, 0, 0, tzinfo=mktz()), '$lte': dt(2014, 1, 3, 0, 0, tzinfo=mktz())}}
36+
{"$sort": {"start": 1}}])]
37+
38+
assert self._collection.find_one.call_args_list == [
39+
call({'sy': 's1', 's': dt(2014, 1, 1, 0, 0, tzinfo=mktz())}, {'e': 1}),
40+
call({'sy': 's2', 's': dt(2014, 1, 1, 12, 0, tzinfo=mktz())}, {'e': 1})]
41+
42+
assert query == {'s': {'$gte': dt(2014, 1, 1, 12, 0, tzinfo=mktz()), '$lte': dt(2014, 1, 3, 0, 0, tzinfo=mktz())}}
3443

3544

3645
def test_mongo_date_range_query_asserts():

0 commit comments

Comments
 (0)