Skip to content

Commit 7a2a0db

Browse files
authored
1 parent 882620b commit 7a2a0db

File tree

3 files changed

+49
-4
lines changed

3 files changed

+49
-4
lines changed

CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
* Bugfix: #187 Compatibility with latest version of pytest-dbfixtures
66
* Feature: #182 Improve ChunkStore read/write performance
77
* Feature: #162 Rename API for ChunkStore
8+
* Feature: #186 chunk_range on update
9+
* Bugfix: #189 range delete does not update symbol metadata
810

911
### 1.26 (2016-07-20)
1012

arctic/chunkstore/chunkstore.py

+25-3
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ def delete(self, symbol, chunk_range=None):
9898
# read out chunks that fall within the range and filter out
9999
# data within the range
100100
df = self.read(symbol, chunk_range=chunk_range, filter_data=False)
101+
row_adjust = len(df)
101102
df = self.chunker.exclude(df, chunk_range)
102103

103104
# remove chunks, and update any remaining data
@@ -106,6 +107,12 @@ def delete(self, symbol, chunk_range=None):
106107
self._collection.delete_many(query)
107108
self.update(symbol, df)
108109

110+
# update symbol metadata (rows and chunk count)
111+
sym = self._get_symbol_info(symbol)
112+
sym[ROWS] -= row_adjust
113+
sym[CHUNK_COUNT] = self._collection.count({SYMBOL: symbol})
114+
self._symbols.replace_one({SYMBOL: symbol}, sym)
115+
109116
else:
110117
query = {SYMBOL: symbol}
111118
self._collection.delete_many(query)
@@ -264,7 +271,7 @@ def __concat(self, a, b):
264271
def __take_new(self, a, b):
265272
return a
266273

267-
def __update(self, symbol, item, combine_method=None):
274+
def __update(self, symbol, item, combine_method=None, chunk_range=None):
268275
if not isinstance(item, (DataFrame, Series)):
269276
raise Exception("Can only chunk DataFrames and Series")
270277

@@ -277,11 +284,16 @@ def __update(self, symbol, item, combine_method=None):
277284
if sym[TYPE] == 'dataframe' and not isinstance(item, DataFrame):
278285
raise Exception("Cannot combine DataFrame and Series")
279286

287+
if chunk_range:
288+
self.delete(symbol, chunk_range)
289+
sym = self._get_symbol_info(symbol)
290+
280291
bulk = self._collection.initialize_unordered_bulk_op()
281292
op = False
282293
for start, end, record in self.chunker.to_chunks(item, sym[CHUNK_SIZE]):
283294
# read out matching chunks
284295
df = self.read(symbol, chunk_range=self.chunker.to_range(start, end), filter_data=False)
296+
285297
# assuming they exist, update them and store the original chunk
286298
# range for later use
287299
if not df.empty:
@@ -334,7 +346,7 @@ def append(self, symbol, item):
334346
"""
335347
self.__update(symbol, item, combine_method=self.__concat)
336348

337-
def update(self, symbol, item):
349+
def update(self, symbol, item, chunk_range=None):
338350
"""
339351
Overwrites data in DB with data in item for the given symbol.
340352
@@ -346,9 +358,19 @@ def update(self, symbol, item):
346358
the symbol for the given item in the DB
347359
item: DataFrame or Series
348360
the data to update
361+
chunk_range: None, or a range object
362+
If a range is specified, it will clear/delete the data within the
363+
range and overwrite it with the data in item. This allows the user
364+
to update with data that might only be a subset of the
365+
original data.
349366
"""
350367

351-
self.__update(symbol, item, combine_method=self.__take_new)
368+
if chunk_range:
369+
if self.chunker.filter(item, chunk_range).empty:
370+
raise Exception('Range must be inclusive of data')
371+
self.__update(symbol, item, combine_method=self.__concat, chunk_range=chunk_range)
372+
else:
373+
self.__update(symbol, item, combine_method=self.__take_new, chunk_range=chunk_range)
352374

353375
def get_info(self, symbol):
354376
sym = self._get_symbol_info(symbol)

tests/integration/chunkstore/test_chunkstore.py

+22-1
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ def test_update(chunkstore_lib):
342342
chunkstore_lib.write('chunkstore_test', df, chunk_size='D')
343343
chunkstore_lib.update('chunkstore_test', df2)
344344
assert_frame_equal(chunkstore_lib.read('chunkstore_test'), equals)
345+
assert(chunkstore_lib.get_info('chunkstore_test')['rows'] == len(equals))
346+
assert(chunkstore_lib.get_info('chunkstore_test')['chunk_count'] == len(equals))
345347

346348

347349
def test_update_no_overlap(chunkstore_lib):
@@ -364,8 +366,25 @@ def test_update_no_overlap(chunkstore_lib):
364366

365367
chunkstore_lib.write('chunkstore_test', df, chunk_size='D')
366368
chunkstore_lib.update('chunkstore_test', df2)
367-
assert_frame_equal(chunkstore_lib.read('chunkstore_test') , equals)
369+
assert_frame_equal(chunkstore_lib.read('chunkstore_test'), equals)
370+
371+
372+
def test_update_chunk_range(chunkstore_lib):
373+
df = DataFrame(data={'data': [1, 2, 3]},
374+
index=pd.Index(data=[dt(2015, 1, 1),
375+
dt(2015, 1, 2),
376+
dt(2015, 1, 3)], name='date'))
377+
df2 = DataFrame(data={'data': [30]},
378+
index=pd.Index(data=[dt(2015, 1, 2)],
379+
name='date'))
380+
equals = DataFrame(data={'data': [30, 3]},
381+
index=pd.Index(data=[dt(2015, 1, 2),
382+
dt(2015, 1, 3)],
383+
name='date'))
368384

385+
chunkstore_lib.write('chunkstore_test', df, chunk_size='M')
386+
chunkstore_lib.update('chunkstore_test', df2, chunk_range=DateRange(dt(2015, 1, 1), dt(2015, 1, 2)))
387+
assert_frame_equal(chunkstore_lib.read('chunkstore_test'), equals)
369388

370389
def test_append_before(chunkstore_lib):
371390
df = DataFrame(data={'data': [1, 2, 3]},
@@ -664,6 +683,8 @@ def test_delete_range(chunkstore_lib):
664683
chunkstore_lib.write('test', df, 'M')
665684
chunkstore_lib.delete('test', chunk_range=DateRange(dt(2016, 1, 2), dt(2016, 3, 1)))
666685
assert_frame_equal(chunkstore_lib.read('test'), df_result)
686+
assert(chunkstore_lib.get_info('test')['rows'] == len(df_result))
687+
assert(chunkstore_lib.get_info('test')['chunk_count'] == 2)
667688

668689

669690
def test_delete_range_noindex(chunkstore_lib):

0 commit comments

Comments
 (0)