Skip to content

Commit 006e251

Browse files
committed
Merge pull request pandas-dev#30 from manahl/feature/multi-index-dataframe
Feature/multi index dataframe
2 parents ed0e729 + 3e6c98c commit 006e251

File tree

8 files changed

+802
-16
lines changed

8 files changed

+802
-16
lines changed

arctic/arctic.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,14 @@
33
import pymongo
44
from pymongo.errors import OperationFailure, AutoReconnect
55

6+
from ._util import indent
67
from .auth import authenticate, get_auth
7-
from .hooks import get_mongodb_uri
88
from .decorators import mongo_retry
9-
from ._util import indent
10-
119
from .exceptions import LibraryNotFoundException, ArcticException, QuotaExceededException
10+
from .hooks import get_mongodb_uri
1211
from .store import version_store
13-
from .tickstore import tickstore
14-
from .tickstore import toplevel
12+
from .tickstore import tickstore, toplevel
13+
1514

1615
__all__ = ['Arctic', 'VERSION_STORE', 'TICK_STORE', 'register_library_type']
1716

@@ -23,7 +22,7 @@
2322
TICK_STORE = tickstore.TICK_STORE_TYPE
2423
LIBRARY_TYPES = {version_store.VERSION_STORE_TYPE: version_store.VersionStore,
2524
tickstore.TICK_STORE_TYPE: tickstore.TickStore,
26-
toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore
25+
toplevel.TICK_STORE_TYPE: toplevel.TopLevelTickStore,
2726
}
2827

2928

arctic/fixtures/arctic.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
import pytest as pytest
55

66
from .. import arctic as m
7+
from ..store.bitemporal_store import BitemporalStore
78
from ..tickstore.tickstore import TICK_STORE_TYPE
8-
99
from .mongo import mongo_proc, mongodb
1010

11+
1112
logger = logging.getLogger(__name__)
1213

1314
mongo_proc2 = mongo_proc(executable="mongod", port="?",
@@ -73,6 +74,12 @@ def library(arctic, library_name):
7374
return arctic.get_library(library_name)
7475

7576

77+
@pytest.fixture(scope="function")
78+
def bitemporal_library(arctic, library_name):
79+
arctic.initialize_library(library_name, m.VERSION_STORE, segment='month')
80+
return BitemporalStore(arctic.get_library(library_name))
81+
82+
7683
@pytest.fixture(scope="function")
7784
def library_secondary(arctic_secondary, library_name):
7885
arctic_secondary.initialize_library(library_name, m.VERSION_STORE, segment='month')

arctic/multi_index.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
'''
2+
Utility functions for multi-index dataframes. Useful for creating bi-temporal timeseries.
3+
'''
4+
from datetime import datetime
5+
import logging
6+
import types
7+
8+
from pandas.tseries.tools import to_datetime as dt
9+
10+
import numpy as np
11+
import pandas as pd
12+
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
# ----------------------- Grouping and Aggregating ---------------------------- #
18+
19+
def fancy_group_by(df, grouping_level=0, aggregate_level=1, method='last', max_=None, min_=None, within=None):
20+
""" Dataframe group-by operation that supports aggregating by different methods on the index.
21+
22+
Parameters
23+
----------
24+
df: ``DataFrame``
25+
Pandas dataframe with a MultiIndex
26+
grouping_level: ``int`` or ``str`` or ``list`` of ``str``
27+
Index level to group by. Defaults to 0.
28+
aggregate_level: ``int`` or ``str``
29+
Index level to aggregate by. Defaults to 1.
30+
method: ``str``
31+
Aggregation method. One of
32+
last: Use the last (lexicographically) value from each group
33+
first: Use the first value from each group
34+
max_: <any>
35+
If set, will limit results to those having aggregate level values <= this value
36+
min_: <any>
37+
If set, will limit results to those having aggregate level values >= this value
38+
within: Any type supported by the index, or ``DateOffset``/timedelta-like for ``DatetimeIndex``.
39+
If set, will limit results to those having aggregate level values within this range of the group value.
40+
Note that this is currently unsupported for Multi-index of depth > 2
41+
"""
42+
if method not in ('first', 'last'):
43+
raise ValueError('Invalid method')
44+
45+
if isinstance(aggregate_level, basestring):
46+
aggregate_level = df.index.names.index(aggregate_level)
47+
48+
# Trim any rows outside the aggregate value bounds
49+
if max_ is not None or min_ is not None or within is not None:
50+
agg_idx = df.index.get_level_values(aggregate_level)
51+
mask = np.full(len(agg_idx), True, dtype='b1')
52+
if max_ is not None:
53+
mask &= (agg_idx <= max_)
54+
if min_ is not None:
55+
mask &= (agg_idx >= min_)
56+
if within is not None:
57+
group_idx = df.index.get_level_values(grouping_level)
58+
if isinstance(agg_idx, pd.DatetimeIndex):
59+
mask &= (group_idx >= agg_idx.shift(-1, freq=within))
60+
else:
61+
mask &= (group_idx >= (agg_idx - within))
62+
df = df.loc[mask]
63+
64+
# The sort order must be correct in order of grouping_level -> aggregate_level for the aggregation methods
65+
# to work properly. We can check the sortdepth to see if this is in fact the case and resort if necessary.
66+
# TODO: this might need tweaking if the levels are around the wrong way
67+
if df.index.lexsort_depth < (aggregate_level + 1):
68+
df = df.sortlevel(level=grouping_level)
69+
70+
gb = df.groupby(level=grouping_level)
71+
if method == 'last':
72+
return gb.last()
73+
return gb.first()
74+
75+
76+
# --------- Common as-of-date use case -------------- #
77+
78+
def groupby_asof(df, as_of=None, dt_col='sample_dt', asof_col='observed_dt'):
79+
''' Common use case for selecting the latest rows from a bitemporal dataframe as-of a certain date.
80+
81+
Parameters
82+
----------
83+
df: ``pd.DataFrame``
84+
Dataframe with a MultiIndex index
85+
as_of: ``datetime``
86+
Return a timeseries with values observed <= this as-of date. By default, the latest observed
87+
values will be returned.
88+
dt_col: ``str`` or ``int``
89+
Name or index of the column in the MultiIndex that is the sample date
90+
asof_col: ``str`` or ``int``
91+
Name or index of the column in the MultiIndex that is the observed date
92+
'''
93+
return fancy_group_by(df,
94+
grouping_level=dt_col,
95+
aggregate_level=asof_col,
96+
method='last',
97+
max_=as_of)
98+
99+
100+
# ----------------------- Insert/Append ---------------------------- #
101+
102+
103+
def multi_index_insert_row(df, index_row, values_row):
104+
""" Return a new dataframe with a row inserted for a multi-index dataframe.
105+
This will sort the rows according to the ordered multi-index levels.
106+
"""
107+
row_index = pd.MultiIndex(levels=[[i] for i in index_row],
108+
labels=[[0] for i in index_row])
109+
row = pd.DataFrame(values_row, index=row_index, columns=df.columns)
110+
df = pd.concat((df, row))
111+
if df.index.lexsort_depth == len(index_row) and df.index[-2] < df.index[-1]:
112+
# We've just appended a row to an already-sorted dataframe
113+
return df
114+
# The df wasn't sorted or the row has to be put in the middle somewhere
115+
return df.sortlevel()
116+
117+
118+
def insert_at(df, sample_date, values):
119+
""" Insert some values into a bi-temporal dataframe.
120+
This is like what would happen when we get a price correction.
121+
"""
122+
observed_dt = dt(datetime.now())
123+
return multi_index_insert_row(df, [sample_date, observed_dt], values)

arctic/store/bitemporal_store.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from collections import namedtuple
2+
from datetime import datetime as dt
3+
4+
from arctic.date._mktz import mktz
5+
from arctic.multi_index import groupby_asof
6+
import pandas as pd
7+
8+
9+
BitemporalItem = namedtuple('BitemporalItem', 'symbol, library, data, metadata, last_updated')
10+
11+
12+
class BitemporalStore(object):
13+
""" A versioned pandas DataFrame store.
14+
15+
As the name hinted, this holds versions of DataFrame by maintaining an extra 'insert time' index internally.
16+
"""
17+
18+
def __init__(self, version_store, observe_column='observed_dt'):
19+
"""
20+
Parameters
21+
----------
22+
version_store : `VersionStore`
23+
The version store that keeps the underlying data frames
24+
observe_column : `str`
25+
Column name for the datetime index that represents the insertion time of a row of data. Unless you intend to
26+
read raw data out, this column is internal to this store.
27+
"""
28+
self._store = version_store
29+
self.observe_column = observe_column
30+
31+
def read(self, symbol, as_of=None, raw=False, **kwargs):
32+
# TODO: shall we block from_version from getting into super.read?
33+
"""Read data for the named symbol. Returns a BitemporalItem object with
34+
a data and metdata element (as passed into write).
35+
36+
Parameters
37+
----------
38+
symbol : `str`
39+
symbol name for the item
40+
as_of : `datetime.datetime`
41+
Return the data as it was as_of the point in time.
42+
raw : `bool`
43+
If True, will return the full bitemporal dataframe (i.e. all versions of the data). This also means as_of is
44+
ignored.
45+
46+
Returns
47+
-------
48+
BitemporalItem namedtuple which contains a .data and .metadata element
49+
"""
50+
item = self._store.read(symbol, **kwargs)
51+
last_updated = max(item.data.index.get_level_values(self.observe_column))
52+
if raw:
53+
return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(), data=item.data,
54+
metadata=item.metadata,
55+
last_updated=last_updated)
56+
else:
57+
index_names = list(item.data.index.names)
58+
index_names.remove(self.observe_column)
59+
return BitemporalItem(symbol=symbol, library=self._store._arctic_lib.get_name(),
60+
data=groupby_asof(item.data, as_of=as_of, dt_col=index_names,
61+
asof_col=self.observe_column),
62+
metadata=item.metadata, last_updated=last_updated)
63+
64+
def update(self, symbol, data, metadata=None, upsert=True, as_of=None, **kwargs):
65+
""" Append 'data' under the specified 'symbol' name to this library.
66+
67+
Parameters
68+
----------
69+
symbol : `str`
70+
symbol name for the item
71+
data : `pd.DataFrame`
72+
to be persisted
73+
metadata : `dict`
74+
An optional dictionary of metadata to persist along with the symbol. If None and there are existing
75+
metadata, current metadata will be maintained
76+
upsert : `bool`
77+
Write 'data' if no previous version exists.
78+
as_of : `datetime.datetime`
79+
The "insert time". Default to datetime.now()
80+
"""
81+
local_tz = mktz()
82+
if not as_of:
83+
as_of = dt.now()
84+
if as_of.tzinfo is None:
85+
as_of = as_of.replace(tzinfo=local_tz)
86+
data = self._add_observe_dt_index(data, as_of)
87+
if upsert and not self._store.has_symbol(symbol):
88+
df = data
89+
else:
90+
existing_item = self._store.read(symbol, **kwargs)
91+
if metadata is None:
92+
metadata = existing_item.metadata
93+
df = existing_item.data.append(data).sort()
94+
self._store.write(symbol, df, metadata=metadata, prune_previous_version=True)
95+
96+
def write(self, *args, **kwargs):
97+
# TODO: may be diff + append?
98+
raise NotImplementedError('Direct write for BitemporalStore is not supported. Use append instead'
99+
'to add / modify timeseries.')
100+
101+
def _add_observe_dt_index(self, df, as_of):
102+
index_names = list(df.index.names)
103+
index_names.append(self.observe_column)
104+
index = [x + (as_of,) if df.index.nlevels > 1 else (x, as_of) for x in df.index.tolist()]
105+
df = df.set_index(pd.MultiIndex.from_tuples(index, names=index_names), inplace=False)
106+
return df

0 commit comments

Comments
 (0)