Skip to content

Commit 8491532

Browse files
committed
1 parent ca429c3 commit 8491532

File tree

3 files changed

+83
-10
lines changed

3 files changed

+83
-10
lines changed

arctic/chunkstore/chunkstore.py

+25-8
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,32 @@ def __str__(self):
7373
def __repr__(self):
7474
return str(self)
7575

76-
def delete(self, symbol):
76+
def delete(self, symbol, chunk_range=None):
7777
"""
78-
Delete all chunks for a symbol.
78+
Delete all chunks for a symbol, or optionally, chunks within a range
7979
8080
Parameters
8181
----------
8282
symbol : `str`
8383
symbol name for the item
8484
"""
85-
query = {"symbol": symbol}
86-
self._collection.delete_many(query)
87-
self._collection.symbols.delete_many(query)
85+
if chunk_range:
86+
# read out chunks that fall within the range and filter out
87+
# data within the range
88+
df = self.read(symbol, chunk_range=chunk_range, no_filter=True)
89+
df = df[(df.index.get_level_values('date') < chunk_range[0]) | (df.index.get_level_values('date') > chunk_range[1])]
90+
91+
92+
# remove chunks, and update any remaining data
93+
query = {'symbol': symbol}
94+
query.update(self.chunker.to_mongo(chunk_range))
95+
self._collection.delete_many(query)
96+
self.update(symbol, df)
97+
98+
else:
99+
query = {"symbol": symbol}
100+
self._collection.delete_many(query)
101+
self._collection.symbols.delete_many(query)
88102

89103
def list_symbols(self):
90104
"""
@@ -99,7 +113,7 @@ def list_symbols(self):
99113
def _get_symbol_info(self, symbol):
100114
return self._symbols.find_one({'symbol': symbol})
101115

102-
def read(self, symbol, chunk_range=None):
116+
def read(self, symbol, chunk_range=None, no_filter=False):
103117
"""
104118
Reads data for a given symbol from the database.
105119
@@ -110,6 +124,9 @@ def read(self, symbol, chunk_range=None):
110124
chunk_range: object
111125
corresponding range object for the specified chunker (for
112126
DateChunker it is a DateRange object)
127+
no_filter: boolean
128+
perform chunk level filtering on the data (see filter() in _chunker)
129+
only applicable when chunk_range is specified
113130
114131
Returns
115132
-------
@@ -124,7 +141,7 @@ def read(self, symbol, chunk_range=None):
124141
}
125142

126143
if chunk_range:
127-
spec['start'] = self.chunker.to_mongo(chunk_range)
144+
spec.update(self.chunker.to_mongo(chunk_range))
128145

129146
segments = []
130147
for _, x in enumerate(self._collection.find(spec, sort=[('start', pymongo.ASCENDING)],)):
@@ -137,7 +154,7 @@ def read(self, symbol, chunk_range=None):
137154

138155
data = deserialize(records, sym['type'])
139156

140-
if chunk_range is None:
157+
if no_filter or chunk_range is None:
141158
return data
142159
return self.chunker.filter(data, chunk_range)
143160

arctic/chunkstore/date_chunker.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,15 @@ def to_start_end(self, data):
7474
return self._get_date_range(data)
7575

7676
def to_mongo(self, range_obj):
77-
return range_obj.mongo_query()
77+
if range_obj[0] and range_obj[1]:
78+
return {'$and': [{'start': {'$lte': range_obj[1]}}, {'end': {'$gte': range_obj[0]}}]}
79+
elif range_obj[0]:
80+
return {'end': {'$gte': range_obj[0]}}
81+
elif range_obj[1]:
82+
return {'start': {'$lte': range_obj[1]}}
83+
else:
84+
return {}
85+
7886

7987
def filter(self, data, range_obj):
8088
return data.ix[range_obj[0]:range_obj[1]]

tests/integration/chunkstore/test_chunkstore.py

+49-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def test_monthly_df(chunkstore_lib):
135135
chunkstore_lib.write('chunkstore_test', df, chunk_size='M')
136136
ret = chunkstore_lib.read('chunkstore_test', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 2)))
137137
assert len(ret) == 2
138-
# assert_frame_equal(df, chunkstore_lib.read('chunkstore_test') )
138+
assert_frame_equal(df, chunkstore_lib.read('chunkstore_test'))
139139

140140

141141
def test_yearly_df(chunkstore_lib):
@@ -638,3 +638,51 @@ def test_dtype_mismatch_error(chunkstore_lib):
638638
assert('Dtype mismatch' in str(e))
639639

640640
assert_series_equal(s, chunkstore_lib.read('test'))
641+
642+
643+
def test_delete_range(chunkstore_lib):
644+
df = DataFrame(data={'data': [1, 2, 3, 4, 5, 6]},
645+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
646+
(dt(2016, 1, 2), 1),
647+
(dt(2016, 2, 1), 1),
648+
(dt(2016, 2, 2), 1),
649+
(dt(2016, 3, 1), 1),
650+
(dt(2016, 3, 2), 1)],
651+
names=['date', 'id'])
652+
)
653+
654+
df_result = DataFrame(data={'data': [1, 6]},
655+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
656+
(dt(2016, 3, 2), 1)],
657+
names=['date', 'id'])
658+
)
659+
660+
chunkstore_lib.write('test', df, 'M')
661+
chunkstore_lib.delete('test', chunk_range=DateRange(dt(2016, 1, 2), dt(2016, 3, 1)))
662+
assert_frame_equal(chunkstore_lib.read('test'), df_result)
663+
664+
665+
def test_read_chunk_range(chunkstore_lib):
666+
df = DataFrame(data={'data': [1, 2, 3, 4, 5, 6, 7, 8, 9]},
667+
index=MultiIndex.from_tuples([(dt(2016, 1, 1), 1),
668+
(dt(2016, 1, 2), 1),
669+
(dt(2016, 1, 3), 1),
670+
(dt(2016, 2, 1), 1),
671+
(dt(2016, 2, 2), 1),
672+
(dt(2016, 2, 3), 1),
673+
(dt(2016, 3, 1), 1),
674+
(dt(2016, 3, 2), 1),
675+
(dt(2016, 3, 3), 1)],
676+
names=['date', 'id'])
677+
)
678+
679+
chunkstore_lib.write('test', df, 'M')
680+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 1, 1), dt(2016, 1, 1))).index.get_level_values('date')[0] == dt(2016,1,1))
681+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 1, 2), dt(2016, 1, 2))).index.get_level_values('date')[0] == dt(2016, 1, 2))
682+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 1, 3), dt(2016, 1, 3))).index.get_level_values('date')[0] == dt(2016, 1, 3))
683+
assert(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 2, 2), dt(2016, 2, 2))).index.get_level_values('date')[0] == dt(2016, 2, 2))
684+
685+
assert(len(chunkstore_lib.read('test', chunk_range=DateRange(dt(2016, 2, 2), dt(2016, 2, 2)), no_filter=True)) == 3)
686+
687+
df2 = chunkstore_lib.read('test', chunk_range=DateRange(None, None))
688+
assert_frame_equal(df, df2)

0 commit comments

Comments
 (0)