Skip to content

Commit 8f1d873

Browse files
Merge pull request pandas-dev#33 from manahl/pandas_daterange
Pandas DateRange query support
2 parents a3668f1 + ef79873 commit 8f1d873

File tree

7 files changed

+385
-24
lines changed

7 files changed

+385
-24
lines changed

CHANGES.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11

22
## Changelog
33

4+
### 1.7 (2015-09-18)
5+
6+
* Feature: Add support for reading a subset of a pandas DataFrame
7+
in VersionStore.read by passing in an arctic.date.DateRange
8+
49
### 1.6 (2015-09-16)
510

611
* Feature: Add support for multi-index Bitemporal DataFrame storage.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ It wouldn't be possible without the work of the AHL Data Engineering Team includ
120120
* [Richard Bounds](https://github.com/richardbounds)
121121
* [James Blackburn](https://github.com/jamesblackburn)
122122
* [Vlad Mereuta](https://github.com/vmereuta)
123-
* Tom Taylor
123+
* [Tom Taylor](https://github.com/TomTaylorLondon)
124124
* Tope Olukemi
125125
* Drake Siard
126126
* [Slavi Marinov](https://github.com/slavi)

arctic/store/_ndarray_store.py

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,70 @@ def _promote(type1, type2):
3737

3838

3939
class NdarrayStore(object):
40-
"""Chunked store for arbitrary ndarrays, supporting append."""
40+
"""Chunked store for arbitrary ndarrays, supporting append.
41+
42+
for the simple example:
43+
dat = np.empty(10)
44+
library.write('test', dat) #version 1
45+
library.append('test', dat) #version 2
46+
47+
version documents:
48+
49+
[
50+
{u'_id': ObjectId('55fa9a7781f12654382e58b8'),
51+
u'symbol': u'test',
52+
u'version': 1
53+
u'type': u'ndarray',
54+
u'up_to': 10, # no. of rows included in the data for this version
55+
u'append_count': 0,
56+
u'append_size': 0,
57+
u'base_sha': Binary('........', 0),
58+
u'dtype': u'float64',
59+
u'dtype_metadata': {},
60+
u'segment_count': 1, #only 1 segment included in this version
61+
u'sha': Binary('.........', 0),
62+
u'shape': [-1],
63+
},
64+
65+
{u'_id': ObjectId('55fa9aa981f12654382e58ba'),
66+
u'symbol': u'test',
67+
u'version': 2
68+
u'type': u'ndarray',
69+
u'up_to': 20, # no. of rows included in the data for this version
70+
u'append_count': 1, # 1 append operation so far
71+
u'append_size': 80, # 80 bytes appended
72+
u'base_sha': Binary('.........', 0), # equal to sha for version 1
73+
u'base_version_id': ObjectId('55fa9a7781f12654382e58b8'), # _id of version 1
74+
u'dtype': u'float64',
75+
u'dtype_metadata': {},
76+
u'segment_count': 2, #2 segments included in this version
77+
}
78+
]
79+
80+
81+
segment documents:
82+
83+
[
84+
#first chunk written:
85+
{u'_id': ObjectId('55fa9a778b376a68efdd10e3'),
86+
u'compressed': True, #data is lz4 compressed on write()
87+
u'data': Binary('...........', 0),
88+
u'parent': [ObjectId('55fa9a7781f12654382e58b8')],
89+
u'segment': 9, #10 rows in the data up to this segment, so last row is 9
90+
u'sha': Binary('.............', 0), # checksum of (symbol, {'data':.., 'compressed':.., 'segment':...})
91+
u'symbol': u'test'},
92+
93+
#second chunk appended:
94+
{u'_id': ObjectId('55fa9aa98b376a68efdd10e6'),
95+
u'compressed': False, # no initial compression for append()
96+
u'data': Binary('...........', 0),
97+
u'parent': [ObjectId('55fa9a7781f12654382e58b8')],
98+
u'segment': 19, #20 rows in the data up to this segment, so last row is 19
99+
u'sha': Binary('............', 0), # checksum of (symbol, {'data':.., 'compressed':.., 'segment':...})
100+
u'symbol': u'test'},
101+
]
102+
103+
"""
41104
TYPE = 'ndarray'
42105

43106
@classmethod
@@ -117,16 +180,22 @@ def read(self, arctic_lib, version, symbol, read_preference=None, **kwargs):
117180
return self._do_read(collection, version, symbol, index_range=index_range)
118181

119182
def _do_read(self, collection, version, symbol, index_range=None):
183+
'''
184+
index_range is a 2-tuple of integers - a [from, to) range of segments to be read.
185+
Either from or to can be None, indicating no bound.
186+
'''
120187
from_index = index_range[0] if index_range else None
121-
to_index = index_range[1] if index_range and index_range[1] is not None \
122-
and index_range[1] < version['up_to'] else version['up_to']
188+
to_index = version['up_to']
189+
if index_range and index_range[1] and index_range[1] < version['up_to']:
190+
to_index = index_range[1]
123191
segment_count = None
124192

125193
spec = {'symbol': symbol,
126194
'parent': version.get('base_version_id', version['_id']),
127-
'segment': {'$lt': to_index}}
195+
'segment': {'$lt': to_index}
196+
}
128197
if from_index:
129-
spec['segment'] = {'$lt': version['up_to'], '$gte': from_index}
198+
spec['segment']['$gte'] = from_index
130199
else:
131200
segment_count = version.get('segment_count', None)
132201

@@ -389,5 +458,22 @@ def _do_write(self, collection, version, symbol, item, previous_version, segment
389458

390459
self.check_written(collection, symbol, version)
391460

392-
def _segment_index(self, item, existing_index, start, new_segments):
393-
pass
461+
def _segment_index(self, new_data, existing_index, start, new_segments):
462+
"""
463+
Generate a segment index which can be used in subselect data in _index_range.
464+
This function must handle both generation of the index and appending to an existing index
465+
466+
Parameters:
467+
-----------
468+
new_data: new data being written (or appended)
469+
existing_index: index field from the versions document of the previous version
470+
start: first (0-based) offset of the new data
471+
segments: list of offsets. Each offset is the row index of the
472+
the last row of a particular chunk relative to the start of the _original_ item.
473+
array(new_data) - segments = array(offsets in item)
474+
475+
Returns:
476+
--------
477+
Library specific index metadata to be stored in the version document.
478+
"""
479+
pass # numpy arrays have no index

arctic/store/_pandas_ndarray_store.py

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
11
import logging
22

3-
from _ndarray_store import NdarrayStore
3+
from bson.binary import Binary
44
from pandas import DataFrame, MultiIndex, Series, DatetimeIndex, Panel
55
from pandas.tslib import Timestamp, get_timezone
66
import numpy as np
77

8+
from .._compression import compress, decompress
9+
from ..exceptions import ArcticException
10+
from ._ndarray_store import NdarrayStore
11+
from ..date._util import to_pandas_closed_closed
12+
813
log = logging.getLogger(__name__)
914

15+
DTN64_DTYPE = 'datetime64[ns]'
16+
17+
INDEX_DTYPE = [('datetime', DTN64_DTYPE), ('index', 'i8')]
18+
1019

1120
def _to_primitive(arr):
1221
if arr.dtype.hasobject:
1322
if len(arr) > 0:
1423
if isinstance(arr[0], Timestamp):
15-
return arr.astype('datetime64[ns]')
24+
return arr.astype(DTN64_DTYPE)
1625
return np.array(list(arr))
1726
return arr
1827

@@ -102,6 +111,103 @@ def can_convert_to_records_without_objects(self, df, symbol):
102111
else:
103112
return True
104113

114+
def _segment_index(self, recarr, existing_index, start, new_segments):
115+
"""
116+
Generate index of datetime64 -> item offset.
117+
118+
Parameters:
119+
-----------
120+
new_data: new data being written (or appended)
121+
existing_index: index field from the versions document of the previous version
122+
start: first (0-based) offset of the new data
123+
segments: list of offsets. Each offset is the row index of the
124+
the last row of a particular chunk relative to the start of the _original_ item.
125+
array(new_data) - segments = array(offsets in item)
126+
127+
Returns:
128+
--------
129+
Binary(compress(array([(index, datetime)]))
130+
Where index is the 0-based index of the datetime in the DataFrame
131+
"""
132+
# find the index of the first datetime64 column
133+
idx_col = self._datetime64_index(recarr)
134+
# if one exists let's create the index on it
135+
if idx_col is not None:
136+
new_segments = np.array(new_segments, dtype='i8')
137+
last_rows = recarr[new_segments - start]
138+
# create numpy index
139+
index = np.core.records.fromarrays([last_rows[idx_col]]
140+
+ [new_segments, ],
141+
dtype=INDEX_DTYPE)
142+
# append to existing index if exists
143+
if existing_index:
144+
existing_index_arr = np.fromstring(decompress(existing_index), dtype=INDEX_DTYPE)
145+
if start > 0:
146+
existing_index_arr = existing_index_arr[existing_index_arr['index'] < start]
147+
index = np.concatenate((existing_index_arr, index))
148+
return Binary(compress(index.tostring()))
149+
elif existing_index:
150+
raise ArcticException("Could not find datetime64 index in item but existing data contains one")
151+
return None
152+
153+
def _datetime64_index(self, recarr):
154+
""" Given a np.recarray find the first datetime64 column """
155+
# TODO: Handle multi-indexes
156+
names = recarr.dtype.names
157+
for name in names:
158+
if recarr[name].dtype == DTN64_DTYPE:
159+
return name
160+
return None
161+
162+
def _index_range(self, version, symbol, date_range=None, **kwargs):
163+
""" Given a version, read the segment_index and return the chunks associated
164+
with the date_range. As the segment index is (id -> last datetime)
165+
we need to take care in choosing the correct chunks. """
166+
if date_range and 'segment_index' in version:
167+
index = np.fromstring(decompress(version['segment_index']), dtype=INDEX_DTYPE)
168+
dtcol = self._datetime64_index(index)
169+
if dtcol and len(index):
170+
dts = index[dtcol]
171+
start, end = _start_end(date_range, dts)
172+
if start > dts[-1]:
173+
return -1, -1
174+
idxstart = min(np.searchsorted(dts, start), len(dts))
175+
idxend = min(np.searchsorted(dts, end), len(dts))
176+
return index['index'][idxstart], index['index'][idxend] + 1
177+
return super(PandasStore, self)._index_range(version, symbol, **kwargs)
178+
179+
def _daterange(self, recarr, date_range):
180+
""" Given a recarr, slice out the given artic.date.DateRange if a
181+
datetime64 index exists """
182+
idx = self._datetime64_index(recarr)
183+
if idx and len(recarr):
184+
dts = recarr[idx]
185+
mask = Series(np.zeros(len(dts)), index=dts)
186+
start, end = _start_end(date_range, dts)
187+
mask[start:end] = 1.0
188+
return recarr[mask.values.astype(bool)]
189+
return recarr
190+
191+
def read(self, arctic_lib, version, symbol, read_preference=None, date_range=None, **kwargs):
192+
item = super(PandasStore, self).read(arctic_lib, version, symbol, read_preference,
193+
date_range=date_range, **kwargs)
194+
if date_range:
195+
item = self._daterange(item, date_range)
196+
return item
197+
198+
199+
def _start_end(date_range, dts):
200+
"""
201+
Return tuple: [start, end] of np.datetime64 dates that are inclusive of the passed
202+
in datetimes.
203+
"""
204+
# FIXME: timezones
205+
assert len(dts)
206+
date_range = to_pandas_closed_closed(date_range)
207+
start = np.datetime64(date_range.start) if date_range.start else dts[0]
208+
end = np.datetime64(date_range.end) if date_range.end else dts[-1]
209+
return start, end
210+
105211

106212
class PandasSeriesStore(PandasStore):
107213
TYPE = 'pandasseries'

arctic/store/version_store.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ def _write_handler(self, version, symbol, data, **kwargs):
285285
handler = self._bson_handler
286286
return handler
287287

288-
def read(self, symbol, as_of=None, from_version=None, allow_secondary=None, **kwargs):
288+
def read(self, symbol, as_of=None, date_range=None, from_version=None, allow_secondary=None, **kwargs):
289289
"""
290290
Read data for the named symbol. Returns a VersionedItem object with
291291
a data and metdata element (as passed into write).
@@ -299,6 +299,9 @@ def read(self, symbol, as_of=None, from_version=None, allow_secondary=None, **kw
299299
`int` : specific version number
300300
`str` : snapshot name which contains the version
301301
`datetime.datetime` : the version of the data that existed as_of the requested point in time
302+
date_range: `arctic.date.DateRange`
303+
DateRange to read data for. Applies to Pandas data, with a DateTime index
304+
returns only the part of the data that falls in the DateRange.
302305
allow_secondary : `bool` or `None`
303306
Override the default behavior for allowing reads from secondary members of a cluster:
304307
`None` : use the settings from the top-level `Arctic` object used to query this version store.
@@ -312,7 +315,8 @@ def read(self, symbol, as_of=None, from_version=None, allow_secondary=None, **kw
312315
try:
313316
read_preference = self._read_preference(allow_secondary)
314317
_version = self._read_metadata(symbol, as_of=as_of, read_preference=read_preference)
315-
return self._do_read(symbol, _version, from_version, read_preference=read_preference, **kwargs)
318+
return self._do_read(symbol, _version, from_version,
319+
date_range=date_range, read_preference=read_preference, **kwargs)
316320
except (OperationFailure, AutoReconnect) as e:
317321
# Log the exception so we know how often this is happening
318322
log_exception('read', e, 1)
@@ -321,6 +325,7 @@ def read(self, symbol, as_of=None, from_version=None, allow_secondary=None, **kw
321325
_version = mongo_retry(self._read_metadata)(symbol, as_of=as_of,
322326
read_preference=ReadPreference.PRIMARY)
323327
return self._do_read_retry(symbol, _version, from_version,
328+
date_range=date_range,
324329
read_preference=ReadPreference.PRIMARY,
325330
**kwargs)
326331
except Exception, e:

0 commit comments

Comments
 (0)