Skip to content

Commit c09d5bb

Browse files
authored
Merge pull request pandas-dev#173 from manahl/chunkstore-delete-range
Fixes pandas-dev#171 and pandas-dev#172
2 parents ca429c3 + 05d02e0 commit c09d5bb

File tree

7 files changed

+127
-11
lines changed

7 files changed

+127
-11
lines changed

CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
* Bugfix: Periodically re-cache the library.quota to pick up any changes
99
* Bugfix: #166 Add index on SHA for ChunkStore
1010
* Bugfix: #169 Dtype mismatch in chunkstore updates
11+
* Feature: #171 allow deleting of values within a date range in ChunkStore
12+
* Bugfix: #172 Fix date range bug when querying dates in the middle of chunks
1113

1214
### 1.25 (2016-05-23)
1315

arctic/chunkstore/_chunker.py

+10
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,13 @@ def filter(self, data, range_obj):
6161
data, filtered by range_obj
6262
"""
6363
raise NotImplementedError
64+
65+
def exclude(self, data, range_obj):
66+
"""
67+
Removes data within the bounds of the range object (inclusive)
68+
69+
returns
70+
-------
71+
data, filtered by range_obj
72+
"""
73+
raise NotImplementedError

arctic/chunkstore/chunkstore.py

+24-8
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,31 @@ def __str__(self):
7373
def __repr__(self):
7474
return str(self)
7575

76-
def delete(self, symbol):
76+
def delete(self, symbol, chunk_range=None):
7777
"""
78-
Delete all chunks for a symbol.
78+
Delete all chunks for a symbol, or optionally, chunks within a range
7979
8080
Parameters
8181
----------
8282
symbol : `str`
8383
symbol name for the item
8484
"""
85-
query = {"symbol": symbol}
86-
self._collection.delete_many(query)
87-
self._collection.symbols.delete_many(query)
85+
if chunk_range:
86+
# read out chunks that fall within the range and filter out
87+
# data within the range
88+
df = self.read(symbol, chunk_range=chunk_range, filter_data=False)
89+
df = self.chunker.exclude(df, chunk_range)
90+
91+
# remove chunks, and update any remaining data
92+
query = {'symbol': symbol}
93+
query.update(self.chunker.to_mongo(chunk_range))
94+
self._collection.delete_many(query)
95+
self.update(symbol, df)
96+
97+
else:
98+
query = {"symbol": symbol}
99+
self._collection.delete_many(query)
100+
self._collection.symbols.delete_many(query)
88101

89102
def list_symbols(self):
90103
"""
@@ -99,7 +112,7 @@ def list_symbols(self):
99112
def _get_symbol_info(self, symbol):
100113
return self._symbols.find_one({'symbol': symbol})
101114

102-
def read(self, symbol, chunk_range=None):
115+
def read(self, symbol, chunk_range=None, filter_data=True):
103116
"""
104117
Reads data for a given symbol from the database.
105118
@@ -110,6 +123,9 @@ def read(self, symbol, chunk_range=None):
110123
chunk_range: object
111124
corresponding range object for the specified chunker (for
112125
DateChunker it is a DateRange object)
126+
filter: boolean
127+
perform chunk level filtering on the data (see filter() in _chunker)
128+
only applicable when chunk_range is specified
113129
114130
Returns
115131
-------
@@ -124,7 +140,7 @@ def read(self, symbol, chunk_range=None):
124140
}
125141

126142
if chunk_range:
127-
spec['start'] = self.chunker.to_mongo(chunk_range)
143+
spec.update(self.chunker.to_mongo(chunk_range))
128144

129145
segments = []
130146
for _, x in enumerate(self._collection.find(spec, sort=[('start', pymongo.ASCENDING)],)):
@@ -137,7 +153,7 @@ def read(self, symbol, chunk_range=None):
137153

138154
data = deserialize(records, sym['type'])
139155

140-
if chunk_range is None:
156+
if not filter_data or chunk_range is None:
141157
return data
142158
return self.chunker.filter(data, chunk_range)
143159

arctic/chunkstore/date_chunker.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,17 @@ def to_start_end(self, data):
7474
return self._get_date_range(data)
7575

7676
def to_mongo(self, range_obj):
77-
return range_obj.mongo_query()
77+
if range_obj.start and range_obj.end:
78+
return {'$and': [{'start': {'$lte': range_obj.end}}, {'end': {'$gte': range_obj.start}}]}
79+
elif range_obj.start:
80+
return {'end': {'$gte': range_obj.start}}
81+
elif range_obj.end:
82+
return {'start': {'$lte': range_obj.end}}
83+
else:
84+
return {}
7885

7986
def filter(self, data, range_obj):
80-
return data.ix[range_obj[0]:range_obj[1]]
87+
return data.ix[range_obj.start:range_obj.end]
88+
89+
def exclude(self, data, range_obj):
90+
return data[(data.index.get_level_values('date') < range_obj.start) | (data.index.get_level_values('date') > range_obj.end)]

tests/integration/chunkstore/test_chunkstore.py

+49-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def test_monthly_df(chunkstore_lib):
135135
chunkstore_lib.write('chunkstore_test', df, chunk_size='M')
136136
ret = chunkstore_lib.read('chunkstore_test', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 2)))
137137
assert len(ret) == 2
138-
# assert_frame_equal(df, chunkstore_lib.read('chunkstore_test') )
138+
assert_frame_equal(df, chunkstore_lib.read('chunkstore_test'))
139139

140140

141141
def test_yearly_df(chunkstore_lib):
@@ -638,3 +638,51 @@ def test_dtype_mismatch_error(chunkstore_lib):
638638
assert('Dtype mismatch' in str(e))
639639

640640
assert_series_equal(s, chunkstore_lib.read('test'))
641+
642+
643+
def test_delete_range(chunkstore_lib):
644+
df = DataFrame(data={'data': [1, 2, 3, 4, 5, 6]},
645+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
646+
(dt(2016, 1, 2), 1),
647+
(dt(2016, 2, 1), 1),
648+
(dt(2016, 2, 2), 1),
649+
(dt(2016, 3, 1), 1),
650+
(dt(2016, 3, 2), 1)],
651+
names=['date', 'id'])
652+
)
653+
654+
df_result = DataFrame(data={'data': [1, 6]},
655+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
656+
(dt(2016, 3, 2), 1)],
657+
names=['date', 'id'])
658+
)
659+
660+
chunkstore_lib.write('test', df, 'M')
661+
chunkstore_lib.delete('test', chunk_range=DateRange(dt(2016, 1, 2), dt(2016, 3, 1)))
662+
assert_frame_equal(chunkstore_lib.read('test'), df_result)
663+
664+
665+
def test_read_chunk_range(chunkstore_lib):
666+
df = DataFrame(data={'data': [1, 2, 3, 4, 5, 6, 7, 8, 9]},
667+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
668+
(dt(2016, 1, 2), 1),
669+
(dt(2016, 1, 3), 1),
670+
(dt(2016, 2, 1), 1),
671+
(dt(2016, 2, 2), 1),
672+
(dt(2016, 2, 3), 1),
673+
(dt(2016, 3, 1), 1),
674+
(dt(2016, 3, 2), 1),
675+
(dt(2016, 3, 3), 1)],
676+
names=['date', 'id'])
677+
)
678+
679+
chunkstore_lib.write('test', df, 'M')
680+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 1))).index.get_level_values('date')[0] == dt(2016,1,1))
681+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 1, 2), dt(2016, 1, 2))).index.get_level_values('date')[0] == dt(2016, 1, 2))
682+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 1, 3), dt(2016, 1, 3))).index.get_level_values('date')[0] == dt(2016, 1, 3))
683+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 2, 2), dt(2016, 2, 2))).index.get_level_values('date')[0] == dt(2016, 2, 2))
684+
685+
assert(len(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 2, 2), dt(2016, 2, 2)), filter_data=False)) == 3)
686+
687+
df2 = chunkstore_lib.read('test', chunk_range=DateRange(None, None))
688+
assert_frame_equal(df, df2)

tests/unit/chunkstore/__init__.py

Whitespace-only changes.
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from arctic.chunkstore.date_chunker import DateChunker
2+
from pandas import DataFrame, MultiIndex
3+
from datetime import datetime as dt
4+
from arctic.date import DateRange
5+
from pandas.util.testing import assert_frame_equal
6+
7+
8+
def test_date_filter():
9+
c = DateChunker()
10+
df = DataFrame(data={'data': [1, 2, 3]},
11+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
12+
(dt(2016, 1, 2), 1),
13+
(dt(2016, 1, 3), 1)],
14+
names=['date', 'id'])
15+
)
16+
17+
# OPEN - CLOSED
18+
assert_frame_equal(c.filter(df, DateRange(None, dt(2016, 1, 3))), df)
19+
# CLOSED - OPEN
20+
assert_frame_equal(c.filter(df, DateRange(dt(2016, 1, 1), None)), df)
21+
# OPEN - OPEN
22+
assert_frame_equal(c.filter(df, DateRange(None, None)), df)
23+
# CLOSED - OPEN (far before data range)
24+
assert_frame_equal(c.filter(df, DateRange(dt(2000, 1, 1), None)), df)
25+
# CLOSED - OPEN (far after range)
26+
assert(c.filter(df, DateRange(dt(2020, 1, 2), None)).empty)
27+
# OPEN - CLOSED
28+
assert_frame_equal(c.filter(df, DateRange(None, dt(2020, 1, 1))), df)
29+
# CLOSED - CLOSED (after range)
30+
assert(c.filter(df, DateRange(dt(2017, 1, 1), dt(2018, 1, 1))).empty)

0 commit comments

Comments
 (0)