Skip to content

Commit 38574fc

Browse files
authored
Merge pull request pandas-dev#179 from manahl/append-fix
Fix issue pandas-dev#178
2 parents 9832f62 + 7be6bb3 commit 38574fc

File tree

5 files changed

+139
-141
lines changed

5 files changed

+139
-141
lines changed

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* Feature: #171 allow deleting of values within a date range in ChunkStore
1212
* Bugfix: #172 Fix date range bug when querying dates in the middle of chunks
1313
* Bugfix: #176 Fix overwrite failures in Chunkstore
14+
* Bugfix: #178 - Change how start/end dates are populated in the DB, also fix append so it works as expected.
1415

1516
### 1.25 (2016-05-23)
1617

arctic/chunkstore/_chunker.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,6 @@ def to_range(self, start, end):
2424
"""
2525
raise NotImplementedError
2626

27-
def to_start_end(self, item):
28-
"""
29-
turns the data in item to a start/end pair (same as is returned by
30-
to_chunks()
31-
32-
returns
33-
-------
34-
tuple - (start, end)
35-
"""
36-
raise NotImplementedError
37-
3827
def to_mongo(self, range_obj):
3928
"""
4029
takes the range object used for this chunker type
@@ -62,7 +51,7 @@ def filter(self, data, range_obj):
6251
"""
6352
raise NotImplementedError
6453

65-
def exclude(self, data, range_obj):
54+
def exclude(self, data, range_obj):
6655
"""
6756
Removes data within the bounds of the range object (inclusive)
6857

arctic/chunkstore/chunkstore.py

+58-104
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import ast
55

66
from bson.binary import Binary
7-
from pandas import Series, DataFrame
7+
from pandas import Series, DataFrame, concat
88

99
from ..store._version_store_utils import checksum
1010
from ..decorators import mongo_retry
@@ -245,49 +245,45 @@ def write(self, symbol, item, chunk_size):
245245
{'$set': doc},
246246
upsert=True)
247247

248-
def append(self, symbol, item):
249-
"""
250-
Appends data from item to symbol's data in the database
248+
def __concat(self, a, b):
249+
return concat([a, b]).sort()
251250

252-
Parameters
253-
----------
254-
symbol: str
255-
the symbol for the given item in the DB
256-
item:
257-
the data to append
258-
"""
251+
def __combine(self, a, b):
252+
return a.combine_first(b)
259253

254+
def __update(self, symbol, item, combine_method=None):
260255
sym = self._get_symbol_info(symbol)
261256
if not sym:
262-
raise NoDataFoundException("Symbol does not exist. Cannot append")
257+
raise NoDataFoundException("Symbol does not exist.")
263258

264259
if isinstance(item, Series) and sym['type'] == 'df':
265-
raise Exception("cannot append a series to a dataframe")
260+
raise Exception("Symbol types do not match")
266261
if isinstance(item, DataFrame) and sym['type'] == 'series':
267-
raise Exception("cannot append a dataframe to a series")
262+
raise Exception("Symbol types do not match")
268263

269264
records = []
270265
ranges = []
271-
dtype = None
272-
266+
new_chunks = []
273267
for start, end, record in self.chunker.to_chunks(item, sym['chunk_size']):
274-
'''
275-
if we have a multiindex there is a chance that part of the append
276-
will overlap an already written chunk, so we need to update
277-
where the date part of the index overlaps
278-
'''
279-
if item.index.nlevels > 1:
280-
df = self.read(symbol, chunk_range=self.chunker.to_range(start, end))
281-
if not df.empty:
282-
if df.equals(record):
283-
continue
284-
record = record.combine_first(df)
285-
self.update(symbol, record)
286-
sym = self._get_symbol_info(symbol)
268+
# read out matching chunks
269+
df = self.read(symbol, chunk_range=self.chunker.to_range(start, end), filter_data=False)
270+
# assuming they exist, update them and store the original chunk
271+
# range for later use
272+
if not df.empty:
273+
record = combine_method(record, df)
274+
if record is None or record.equals(df):
287275
continue
276+
277+
new_chunks.append(False)
278+
sym['append_count'] += len(record)
279+
sym['len'] -= len(df)
280+
else:
281+
new_chunks.append(True)
282+
sym['chunk_count'] += 1
283+
288284
r, dtype = serialize(record, string_max_len=self.STRING_MAX)
289285
if str(dtype) != sym['dtype']:
290-
raise Exception("Dtype mismatch - cannot append")
286+
raise Exception('Dtype mismatch.')
291287
records.append(r)
292288
ranges.append((start, end))
293289

@@ -299,38 +295,59 @@ def append(self, symbol, item):
299295

300296
item = item.astype(dtype)
301297

302-
if str(dtype) != sym['dtype']:
303-
raise Exception("Dtype mismatch - cannot append")
304-
305298
data = item.tostring()
306299
sym['len'] += len(item)
307300
if len(item) > 0:
308-
sym['chunk_count'] += len(records)
309-
sym['append_count'] += len(records)
310301
sym['append_size'] += len(data)
311302

312303
chunks = [r.tostring() for r in records]
313304
chunks = compress_array(chunks)
314305

315-
for chunk, rng in zip(chunks, ranges):
306+
bulk = self._collection.initialize_unordered_bulk_op()
307+
for chunk, rng, new_chunk in zip(chunks, ranges, new_chunks):
316308
start = rng[0]
317-
end = rng[-1]
309+
end = rng[1]
318310

319311
segment = {'data': Binary(chunk)}
320312
segment['start'] = start
321313
segment['end'] = end
322-
self._collection.update_one({'symbol': symbol, 'sha': checksum(symbol, segment)},
323-
{'$set': segment},
324-
upsert=True)
314+
sha = checksum(symbol, segment)
315+
segment['sha'] = sha
316+
if new_chunk:
317+
# new chunk
318+
bulk.find({'symbol': symbol, 'sha': sha}
319+
).upsert().update_one({'$set': segment})
320+
else:
321+
bulk.find({'symbol': symbol, 'start': start, 'end': end}
322+
).update_one({'$set': segment})
323+
if len(chunks) > 0:
324+
bulk.execute()
325325

326326
self._symbols.replace_one({'symbol': symbol}, sym)
327327

328+
def append(self, symbol, item):
329+
"""
330+
Appends data from item to symbol's data in the database.
331+
332+
Is not idempotent
333+
334+
Parameters
335+
----------
336+
symbol: str
337+
the symbol for the given item in the DB
338+
item:
339+
the data to append
340+
"""
341+
self.__update(symbol, item, combine_method=self.__concat)
342+
328343
def update(self, symbol, item):
329344
"""
330345
Merges data from item onto existing data in the database for symbol
331346
data that exists in symbol and item for the same index/multiindex will
332347
be overwritten by the data in item.
333348
349+
Is idempotent
350+
334351
Parameters
335352
----------
336353
symbol: str
@@ -339,70 +356,7 @@ def update(self, symbol, item):
339356
the data to update
340357
"""
341358

342-
sym = self._get_symbol_info(symbol)
343-
if not sym:
344-
raise NoDataFoundException("Symbol does not exist. Cannot update")
345-
346-
347-
records = []
348-
ranges = []
349-
orig_ranges = []
350-
for start, end, record in self.chunker.to_chunks(item, sym['chunk_size']):
351-
# read out matching chunks
352-
df = self.read(symbol, chunk_range=self.chunker.to_range(start, end))
353-
# assuming they exist, update them and store the original chunk
354-
# range for later use
355-
if not df.empty:
356-
if df.equals(record):
357-
continue
358-
record = record.combine_first(df)
359-
orig_ranges.append((self.chunker.to_start_end(record)))
360-
else:
361-
orig_ranges.append((None, None))
362-
363-
r, dtype = serialize(record, string_max_len=self.STRING_MAX)
364-
if str(dtype) != sym['dtype']:
365-
raise Exception('Dtype mismatch - cannot update')
366-
records.append(r)
367-
ranges.append((start, end))
368-
369-
if len(records) > 0:
370-
chunks = [r.tostring() for r in records]
371-
lens = [len(i) for i in chunks]
372-
chunks = compress_array(chunks)
373-
374-
seg_count = 0
375-
seg_len = 0
376-
377-
bulk = self._collection.initialize_unordered_bulk_op()
378-
for chunk, rng, orig_rng, rec_len in zip(chunks, ranges, orig_ranges, lens):
379-
start = rng[0]
380-
end = rng[1]
381-
orig_start = orig_rng[0]
382-
if orig_start is None:
383-
sym['len'] += rec_len
384-
seg_count += 1
385-
seg_len += rec_len
386-
segment = {'data': Binary(chunk)}
387-
segment['start'] = start
388-
segment['end'] = end
389-
sha = checksum(symbol, segment)
390-
segment['sha'] = sha
391-
if orig_start is None:
392-
# new chunk
393-
bulk.find({'symbol': symbol, 'sha': sha, 'start': segment['start']}
394-
).upsert().update_one({'$set': segment})
395-
else:
396-
bulk.find({'symbol': symbol, 'start': orig_start}
397-
).update_one({'$set': segment})
398-
if len(chunks) > 0:
399-
bulk.execute()
400-
401-
if seg_count != 0:
402-
sym['chunk_count'] += seg_count
403-
sym['append_size'] += seg_len
404-
sym['append_count'] += seg_count
405-
self._symbols.replace_one({'symbol': symbol}, sym)
359+
self.__update(symbol, item, combine_method=self.__combine)
406360

407361
def get_info(self, symbol):
408362
sym = self._get_symbol_info(symbol)

arctic/chunkstore/date_chunker.py

+18-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import calendar
12
import pandas as pd
23
from pandas import Timestamp
4+
from datetime import datetime as dt
35

46
from ._chunker import Chunker
57
from ..date import DateRange
@@ -21,22 +23,26 @@ def _get_date_chunk(self, date, chunk_size):
2123
elif chunk_size == 'D':
2224
return date.strftime('%Y-%m-%d')
2325

24-
def _get_date_range(self, df):
26+
def _get_date_range(self, df, chunk_size):
2527
"""
26-
get minx/max dates in the index of the dataframe
28+
get minx/max dates for the chunk
2729
2830
returns
2931
-------
3032
A tuple (start date, end date)
3133
"""
32-
dates = df.index.get_level_values('date')
33-
start = dates.min()
34-
end = dates.max()
35-
if isinstance(start, Timestamp):
36-
start = start.to_pydatetime()
37-
if isinstance(end, Timestamp):
38-
end = end.to_pydatetime()
39-
return start, end
34+
date = df.index.get_level_values('date')[0]
35+
36+
if isinstance(date, Timestamp):
37+
date = date.to_pydatetime()
38+
39+
if chunk_size == 'M':
40+
_, end_day = calendar.monthrange(date.year, date.month)
41+
return dt(date.year, date.month, 1), dt(date.year, date.month, end_day)
42+
elif chunk_size == 'Y':
43+
return dt(date.year, 1, 1), dt(date.year, 12, 31)
44+
else:
45+
return date, date
4046

4147
def to_chunks(self, df, chunk_size):
4248
"""
@@ -64,15 +70,12 @@ def to_chunks(self, df, chunk_size):
6470
ret = df.xs(slice(date, date), level='date', drop_level=False)
6571
else:
6672
ret = df[date: date]
67-
start, end = self._get_date_range(ret)
73+
start, end = self._get_date_range(ret, chunk_size)
6874
yield start, end, ret
6975

7076
def to_range(self, start, end):
7177
return DateRange(start, end)
7278

73-
def to_start_end(self, data):
74-
return self._get_date_range(data)
75-
7679
def to_mongo(self, range_obj):
7780
if range_obj.start and range_obj.end:
7881
return {'$and': [{'start': {'$lte': range_obj.end}}, {'end': {'$gte': range_obj.start}}]}
@@ -84,7 +87,7 @@ def to_mongo(self, range_obj):
8487
return {}
8588

8689
def filter(self, data, range_obj):
87-
return data.ix[range_obj.start:range_obj.end]
90+
return data[range_obj.start:range_obj.end]
8891

8992
def exclude(self, data, range_obj):
9093
return data[(data.index.get_level_values('date') < range_obj.start) | (data.index.get_level_values('date') > range_obj.end)]

0 commit comments

Comments
 (0)