From f98e6f8a17a05d4c73657e970ba913b479bf5f3c Mon Sep 17 00:00:00 2001 From: Jeff Reback Date: Tue, 29 Mar 2016 23:31:37 -0400 Subject: [PATCH] ENH: allow .rolling / .expanding as groupby methods closes #12738 BUG: allow df.groupby(...).resample(...) to return a Resampler groupby object closes #12486 BUG: consistency of name of returned groupby closes #12363 --- doc/source/whatsnew/v0.18.1.txt | 54 +++++++++ pandas/core/base.py | 60 ++++++++++ pandas/core/generic.py | 5 +- pandas/core/groupby.py | 136 ++++++++++++++------- pandas/core/missing.py | 3 +- pandas/core/window.py | 123 +++++++++++++------ pandas/indexes/base.py | 4 +- pandas/tests/test_groupby.py | 26 +++-- pandas/tests/test_window.py | 129 ++++++++++++++++++++ pandas/tools/merge.py | 1 + pandas/tseries/resample.py | 148 +++++++++++++++++++---- pandas/tseries/tests/test_resample.py | 162 ++++++++++++++++++++++---- 12 files changed, 719 insertions(+), 132 deletions(-) diff --git a/doc/source/whatsnew/v0.18.1.txt b/doc/source/whatsnew/v0.18.1.txt index 928fefd6ce17e..702e5f2a57201 100644 --- a/doc/source/whatsnew/v0.18.1.txt +++ b/doc/source/whatsnew/v0.18.1.txt @@ -18,6 +18,8 @@ Highlights include: .. _whatsnew_0181.new_features: +- ``.groupby(...)`` has been enhanced to provide convenient syntax when working with ``.rolling(..)``, ``.expanding(..)`` and ``.resample(..)`` per group, see :ref:`here ` + New features ~~~~~~~~~~~~ @@ -48,6 +50,55 @@ see :ref:`Custom Business Hour ` (:issue:`11514`) Enhancements ~~~~~~~~~~~~ +.. _whatsnew_0181.deferred_ops: + +``.groupby(..)`` syntax with window and resample operations +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``.groupby(...)`` has been enhanced to provide convenient syntax when working with ``.rolling(..)``, ``.expanding(..)`` and ``.resample(..)`` per group, see (:issue:`12486`, :issue:`12738`). + +You can now use ``.rolling(..)`` and ``.expanding(..)`` as methods on groupbys. These return another deferred object (similar to what ``.rolling()`` and ``.expanding()`` do on ungrouped pandas objects). You can then operate on these ``RollingGroupby`` objects in a similar manner. + +Previously you would have to do this to get a rolling window mean per-group: + +.. ipython:: python + + df = pd.DataFrame({'A': [1] * 20 + [2] * 12 + [3] * 8, + 'B': np.arange(40)}) + df + +.. ipython:: python + + df.groupby('A').apply(lambda x: x.rolling(4).B.mean()) + +Now you can do: + +.. ipython:: python + + df.groupby('A').rolling(4).B.mean() + +For ``.resample(..)`` type of operations, previously you would have to: + +.. ipython:: python + + df = pd.DataFrame({'date': pd.date_range(start='2016-01-01', + periods=4, + freq='W'), + 'group': [1, 1, 2, 2], + 'val': [5, 6, 7, 8]}).set_index('date') + + df + +.. ipython:: python + + df.groupby('group').apply(lambda x: x.resample('1D').ffill()) + +Now you can do: + +.. ipython:: python + + df.groupby('group').resample('1D').ffill() + .. _whatsnew_0181.partial_string_indexing: Partial string indexing on ``DateTimeIndex`` when part of a ``MultiIndex`` @@ -282,6 +333,9 @@ Bug Fixes - Bug in ``.concat`` of datetime tz-aware and naive DataFrames (:issue:`12467`) - Bug in correctly raising a ``ValueError`` in ``.resample(..).fillna(..)`` when passing a non-string (:issue:`12952`) +- Bug in consistency of ``.name`` on ``.groupby(..).apply(..)`` cases (:issue:`12363`) + + - Bug in ``Timestamp.__repr__`` that caused ``pprint`` to fail in nested structures (:issue:`12622`) - Bug in ``Timedelta.min`` and ``Timedelta.max``, the properties now report the true minimum/maximum ``timedeltas`` as recognized by Pandas. See :ref:`documentation `. (:issue:`12727`) - Bug in ``.quantile()`` with interpolation may coerce to ``float`` unexpectedly (:issue:`12772`) diff --git a/pandas/core/base.py b/pandas/core/base.py index e14cdd88b50f7..ba9702f4b8f93 100644 --- a/pandas/core/base.py +++ b/pandas/core/base.py @@ -613,6 +613,19 @@ def _aggregate_multiple_funcs(self, arg, _level): return concat(results, keys=keys, axis=1) + def _shallow_copy(self, obj=None, obj_type=None, **kwargs): + """ return a new object with the replacement attributes """ + if obj is None: + obj = self._selected_obj.copy() + if obj_type is None: + obj_type = self._constructor + if isinstance(obj, obj_type): + obj = obj.obj + for attr in self._attributes: + if attr not in kwargs: + kwargs[attr] = getattr(self, attr) + return obj_type(obj, **kwargs) + def _is_cython_func(self, arg): """ if we define an internal function for this argument, return it """ return self._cython_table.get(arg) @@ -625,6 +638,53 @@ def _is_builtin_func(self, arg): return self._builtin_table.get(arg, arg) +class GroupByMixin(object): + """ provide the groupby facilities to the mixed object """ + + @staticmethod + def _dispatch(name, *args, **kwargs): + """ dispatch to apply """ + def outer(self, *args, **kwargs): + def f(x): + x = self._shallow_copy(x, groupby=self._groupby) + return getattr(x, name)(*args, **kwargs) + return self._groupby.apply(f) + outer.__name__ = name + return outer + + def _gotitem(self, key, ndim, subset=None): + """ + sub-classes to define + return a sliced object + + Parameters + ---------- + key : string / list of selections + ndim : 1,2 + requested ndim of result + subset : object, default None + subset to act on + """ + + # create a new object to prevent aliasing + if subset is None: + subset = self.obj + + # we need to make a shallow copy of ourselves + # with the same groupby + kwargs = dict([(attr, getattr(self, attr)) + for attr in self._attributes]) + self = self.__class__(subset, + groupby=self._groupby[key], + parent=self, + **kwargs) + self._reset_cache() + if subset.ndim == 2: + if lib.isscalar(key) and key in subset or com.is_list_like(key): + self._selection = key + return self + + class FrozenList(PandasObject, list): """ diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 30252f7068424..8befa782d4a31 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -3705,7 +3705,7 @@ def clip_lower(self, threshold, axis=None): return self.where(subset, threshold, axis=axis) def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, - group_keys=True, squeeze=False): + group_keys=True, squeeze=False, **kwargs): """ Group series using mapper (dict or key function, apply given function to group, return result as series) or by a series of columns. @@ -3757,7 +3757,8 @@ def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, raise TypeError("You have to supply one of 'by' and 'level'") axis = self._get_axis_number(axis) return groupby(self, by=by, axis=axis, level=level, as_index=as_index, - sort=sort, group_keys=group_keys, squeeze=squeeze) + sort=sort, group_keys=group_keys, squeeze=squeeze, + **kwargs) def asfreq(self, freq, method=None, how=None, normalize=False): """ diff --git a/pandas/core/groupby.py b/pandas/core/groupby.py index e2a4482404506..a79b4ae6e67f9 100644 --- a/pandas/core/groupby.py +++ b/pandas/core/groupby.py @@ -25,6 +25,8 @@ from pandas.util.decorators import (cache_readonly, Substitution, Appender, make_signature, deprecate_kwarg) from pandas.formats.printing import pprint_thing +from pandas.util.validators import validate_kwargs + import pandas.core.algorithms as algos import pandas.core.common as com from pandas.core.common import(_possibly_downcast_to_dtype, isnull, @@ -322,7 +324,8 @@ class _GroupBy(PandasObject, SelectionMixin): def __init__(self, obj, keys=None, axis=0, level=None, grouper=None, exclusions=None, selection=None, as_index=True, - sort=True, group_keys=True, squeeze=False): + sort=True, group_keys=True, squeeze=False, **kwargs): + self._selection = selection if isinstance(obj, NDFrame): @@ -341,16 +344,23 @@ def __init__(self, obj, keys=None, axis=0, level=None, self.sort = sort self.group_keys = group_keys self.squeeze = squeeze + self.mutated = kwargs.pop('mutated', False) if grouper is None: - grouper, exclusions, obj = _get_grouper(obj, keys, axis=axis, - level=level, sort=sort) + grouper, exclusions, obj = _get_grouper(obj, keys, + axis=axis, + level=level, + sort=sort, + mutated=self.mutated) self.obj = obj self.axis = obj._get_axis_number(axis) self.grouper = grouper self.exclusions = set(exclusions) if exclusions else set() + # we accept no other args + validate_kwargs('group', kwargs) + def __len__(self): return len(self.groups) @@ -644,8 +654,10 @@ def _python_apply_general(self, f): keys, values, mutated = self.grouper.apply(f, self._selected_obj, self.axis) - return self._wrap_applied_output(keys, values, - not_indexed_same=mutated) + return self._wrap_applied_output( + keys, + values, + not_indexed_same=mutated or self.mutated) def _iterate_slices(self): yield self.name, self._selected_obj @@ -790,6 +802,14 @@ def _wrap_applied_output(self, *args, **kwargs): def _concat_objects(self, keys, values, not_indexed_same=False): from pandas.tools.merge import concat + def reset_identity(values): + # reset the identities of the components + # of the values to prevent aliasing + for v in values: + ax = v._get_axis(self.axis) + ax._reset_identity() + return values + if not not_indexed_same: result = concat(values, axis=self.axis) ax = self._selected_obj._get_axis(self.axis) @@ -801,12 +821,14 @@ def _concat_objects(self, keys, values, not_indexed_same=False): elif self.group_keys: + values = reset_identity(values) if self.as_index: # possible MI return case group_keys = keys group_levels = self.grouper.levels group_names = self.grouper.names + result = concat(values, axis=self.axis, keys=group_keys, levels=group_levels, names=group_names) else: @@ -816,8 +838,14 @@ def _concat_objects(self, keys, values, not_indexed_same=False): keys = list(range(len(values))) result = concat(values, axis=self.axis, keys=keys) else: + values = reset_identity(values) result = concat(values, axis=self.axis) + if (isinstance(result, Series) and + getattr(self, 'name', None) is not None): + + result.name = self.name + return result def _apply_filter(self, indices, dropna): @@ -1045,33 +1073,35 @@ def ohlc(self): @Substitution(name='groupby') @Appender(_doc_template) - def resample(self, rule, how=None, fill_method=None, limit=None, **kwargs): + def resample(self, rule, *args, **kwargs): """ Provide resampling when using a TimeGrouper Return a new grouper with our resampler appended """ - from pandas.tseries.resample import (TimeGrouper, - _maybe_process_deprecations) - gpr = TimeGrouper(axis=self.axis, freq=rule, **kwargs) - - # we by definition have at least 1 key as we are already a grouper - groupings = list(self.grouper.groupings) - groupings.append(gpr) - - result = self.__class__(self.obj, - keys=groupings, - axis=self.axis, - level=self.level, - as_index=self.as_index, - sort=self.sort, - group_keys=self.group_keys, - squeeze=self.squeeze, - selection=self._selection) - - return _maybe_process_deprecations(result, - how=how, - fill_method=fill_method, - limit=limit) + from pandas.tseries.resample import get_resampler_for_grouping + return get_resampler_for_grouping(self, rule, *args, **kwargs) + + @Substitution(name='groupby') + @Appender(_doc_template) + def rolling(self, *args, **kwargs): + """ + Return a rolling grouper, providing rolling + functionaility per group + + """ + from pandas.core.window import RollingGroupby + return RollingGroupby(self, *args, **kwargs) + + @Substitution(name='groupby') + @Appender(_doc_template) + def expanding(self, *args, **kwargs): + """ + Return an expanding grouper, providing expanding + functionaility per group + + """ + from pandas.core.window import ExpandingGroupby + return ExpandingGroupby(self, *args, **kwargs) @Substitution(name='groupby') @Appender(_doc_template) @@ -1239,7 +1269,8 @@ def nth(self, n, dropna=None): # object grouper, _, _ = _get_grouper(dropped, key=self.keys, axis=self.axis, level=self.level, - sort=self.sort) + sort=self.sort, + mutated=self.mutated) sizes = dropped.groupby(grouper).size() result = dropped.groupby(grouper).nth(n) @@ -1453,10 +1484,14 @@ class BaseGrouper(object): the generated groups """ - def __init__(self, axis, groupings, sort=True, group_keys=True): + def __init__(self, axis, groupings, sort=True, group_keys=True, + mutated=False): self._filter_empty_groups = self.compressed = len(groupings) != 1 - self.axis, self.groupings, self.sort, self.group_keys = \ - axis, groupings, sort, group_keys + self.axis = axis + self.groupings = groupings + self.sort = sort + self.group_keys = group_keys + self.mutated = mutated @property def shape(self): @@ -1497,7 +1532,7 @@ def _get_group_keys(self): return [mapper.get_key(i) for i in range(ngroups)] def apply(self, f, data, axis=0): - mutated = False + mutated = self.mutated splitter = self._get_splitter(data, axis=axis) group_keys = self._get_group_keys() @@ -1959,10 +1994,11 @@ def generate_bins_generic(values, binner, closed): class BinGrouper(BaseGrouper): - def __init__(self, bins, binlabels, filter_empty=False): + def __init__(self, bins, binlabels, filter_empty=False, mutated=False): self.bins = com._ensure_int64(bins) self.binlabels = _ensure_index(binlabels) self._filter_empty_groups = filter_empty + self.mutated = mutated @cache_readonly def groups(self): @@ -2270,7 +2306,8 @@ def groups(self): return self.index.groupby(self.grouper) -def _get_grouper(obj, key=None, axis=0, level=None, sort=True): +def _get_grouper(obj, key=None, axis=0, level=None, sort=True, + mutated=False): """ create and return a BaseGrouper, which is an internal mapping of how to create the grouper indexers. @@ -2404,7 +2441,7 @@ def is_in_obj(gpr): raise ValueError('No group keys passed!') # create the internals grouper - grouper = BaseGrouper(group_axis, groupings, sort=sort) + grouper = BaseGrouper(group_axis, groupings, sort=sort, mutated=mutated) return grouper, exclusions, obj @@ -2499,6 +2536,18 @@ class SeriesGroupBy(GroupBy): _series_apply_whitelist): exec(_def_str) + @property + def name(self): + """ + since we are a series, we by definition only have + a single name, but may be the result of a selection or + the name of our object + """ + if self._selection is None: + return self.obj.name + else: + return self._selection + def aggregate(self, func_or_funcs, *args, **kwargs): """ Apply aggregation function or functions to groups, yielding most likely @@ -2666,7 +2715,9 @@ def _get_index(): if isinstance(values[0], dict): # GH #823 index = _get_index() - return DataFrame(values, index=index).stack() + result = DataFrame(values, index=index).stack() + result.name = self.name + return result if isinstance(values[0], (Series, dict)): return self._concat_objects(keys, values, @@ -2955,7 +3006,7 @@ def value_counts(self, normalize=False, sort=True, ascending=False, if com.is_integer_dtype(out): out = com._ensure_int64(out) - return Series(out, index=mi) + return Series(out, index=mi, name=self.name) # for compat. with algos.value_counts need to ensure every # bin is present at every index level, null filled with zeros @@ -2986,7 +3037,7 @@ def value_counts(self, normalize=False, sort=True, ascending=False, if com.is_integer_dtype(out): out = com._ensure_int64(out) - return Series(out, index=mi) + return Series(out, index=mi, name=self.name) def count(self): """ Compute count of group, excluding missing values """ @@ -2999,7 +3050,8 @@ def count(self): return Series(out, index=self.grouper.result_index, - name=self.name, dtype='int64') + name=self.name, + dtype='int64') def _apply_to_column_groupbys(self, func): """ return a pass thru """ @@ -3332,7 +3384,7 @@ def _wrap_applied_output(self, keys, values, not_indexed_same=False): except (ValueError, AttributeError): # GH1738: values is list of arrays of unequal lengths fall # through to the outer else caluse - return Series(values, index=key_index) + return Series(values, index=key_index, name=self.name) # if we have date/time like in the original, then coerce dates # as we are stacking can easily have object dtypes here @@ -3354,7 +3406,7 @@ def _wrap_applied_output(self, keys, values, not_indexed_same=False): # only coerce dates if we find at least 1 datetime coerce = True if any([isinstance(x, Timestamp) for x in values]) else False - return (Series(values, index=key_index) + return (Series(values, index=key_index, name=self.name) ._convert(datetime=True, coerce=coerce)) diff --git a/pandas/core/missing.py b/pandas/core/missing.py index 09e8e8e1401ca..634a04bbc2cdb 100644 --- a/pandas/core/missing.py +++ b/pandas/core/missing.py @@ -58,7 +58,8 @@ def mask_missing(arr, values_to_mask): def clean_fill_method(method, allow_nearest=False): - if method is None: + # asfreq is compat for resampling + if method in [None, 'asfreq']: return None if isinstance(method, string_types): diff --git a/pandas/core/window.py b/pandas/core/window.py index 31874a96f8111..1c2c6e4a04fe6 100644 --- a/pandas/core/window.py +++ b/pandas/core/window.py @@ -13,7 +13,8 @@ import pandas as pd from pandas.lib import isscalar -from pandas.core.base import PandasObject, SelectionMixin +from pandas.core.base import (PandasObject, SelectionMixin, + GroupByMixin) import pandas.core.common as com import pandas.algos as algos from pandas import compat @@ -40,7 +41,7 @@ class _Window(PandasObject, SelectionMixin): exclusions = set() def __init__(self, obj, window=None, min_periods=None, freq=None, - center=False, win_type=None, axis=0): + center=False, win_type=None, axis=0, **kwargs): if freq is not None: warnings.warn("The freq kw is deprecated and will be removed in a " @@ -55,15 +56,11 @@ def __init__(self, obj, window=None, min_periods=None, freq=None, self.center = center self.win_type = win_type self.axis = axis - self._setup() @property def _constructor(self): return Window - def _setup(self): - pass - def _convert_freq(self, how=None): """ resample according to the how, return a new object """ @@ -137,17 +134,6 @@ def __unicode__(self): return "{klass} [{attrs}]".format(klass=self._window_type, attrs=','.join(attrs)) - def _shallow_copy(self, obj=None, **kwargs): - """ return a new object with the replacement attributes """ - if obj is None: - obj = self._selected_obj.copy() - if isinstance(obj, self.__class__): - obj = obj.obj - for attr in self._attributes: - if attr not in kwargs: - kwargs[attr] = getattr(self, attr) - return self._constructor(obj, **kwargs) - def _prep_values(self, values=None, kill_inf=True, how=None): if values is None: @@ -183,6 +169,8 @@ def _wrap_result(self, result, block=None, obj=None): if obj is None: obj = self._selected_obj + + index = obj.index if isinstance(result, np.ndarray): # coerce if necessary @@ -193,9 +181,9 @@ def _wrap_result(self, result, block=None, obj=None): if result.ndim == 1: from pandas import Series - return Series(result, obj.index, name=obj.name) + return Series(result, index, name=obj.name) - return type(obj)(result, index=obj.index, columns=block.columns) + return type(obj)(result, index=index, columns=block.columns) return result def _wrap_results(self, results, blocks, obj): @@ -411,13 +399,48 @@ def mean(self, **kwargs): return self._apply_window(mean=True, **kwargs) +class _GroupByMixin(GroupByMixin): + """ provide the groupby facilities """ + + def __init__(self, obj, *args, **kwargs): + parent = kwargs.pop('parent', None) # noqa + groupby = kwargs.pop('groupby', None) + if groupby is None: + groupby, obj = obj, obj.obj + self._groupby = groupby + self._groupby.mutated = True + self._groupby.grouper.mutated = True + super(GroupByMixin, self).__init__(obj, *args, **kwargs) + + count = GroupByMixin._dispatch('count') + corr = GroupByMixin._dispatch('corr', other=None, pairwise=None) + cov = GroupByMixin._dispatch('cov', other=None, pairwise=None) + + def _apply(self, func, name, window=None, center=None, + check_minp=None, how=None, **kwargs): + """ + dispatch to apply; we are stripping all of the _apply kwargs and + performing the original function call on the grouped object + """ + + def f(x, name=name, *args): + x = self._shallow_copy(x) + + if isinstance(name, compat.string_types): + return getattr(x, name)(*args, **kwargs) + + return x.apply(name, *args, **kwargs) + + return self._groupby.apply(f) + + class _Rolling(_Window): @property def _constructor(self): return Rolling - def _apply(self, func, window=None, center=None, check_minp=None, how=None, - **kwargs): + def _apply(self, func, name=None, window=None, center=None, + check_minp=None, how=None, **kwargs): """ Rolling statistical measure using supplied function. Designed to be used with passed-in Cython array-based functions. @@ -425,6 +448,8 @@ def _apply(self, func, window=None, center=None, check_minp=None, how=None, Parameters ---------- func : string/callable to apply + name : string, optional + name of this function window : int/array, default to _get_window() center : boolean, default to self.center check_minp : function, default to _use_window @@ -546,10 +571,11 @@ def f(arg, window, min_periods): return algos.roll_generic(arg, window, minp, offset, func, args, kwargs) - return self._apply(f, center=False) + return self._apply(f, func, args=args, kwargs=kwargs, + center=False) def sum(self, **kwargs): - return self._apply('roll_sum', **kwargs) + return self._apply('roll_sum', 'sum', **kwargs) _shared_docs['max'] = dedent(""" %(name)s maximum @@ -562,7 +588,7 @@ def sum(self, **kwargs): def max(self, how=None, **kwargs): if self.freq is not None and how is None: how = 'max' - return self._apply('roll_max', how=how, **kwargs) + return self._apply('roll_max', 'max', how=how, **kwargs) _shared_docs['min'] = dedent(""" %(name)s minimum @@ -575,10 +601,10 @@ def max(self, how=None, **kwargs): def min(self, how=None, **kwargs): if self.freq is not None and how is None: how = 'min' - return self._apply('roll_min', how=how, **kwargs) + return self._apply('roll_min', 'min', how=how, **kwargs) def mean(self, **kwargs): - return self._apply('roll_mean', **kwargs) + return self._apply('roll_mean', 'mean', **kwargs) _shared_docs['median'] = dedent(""" %(name)s median @@ -591,7 +617,7 @@ def mean(self, **kwargs): def median(self, how=None, **kwargs): if self.freq is not None and how is None: how = 'median' - return self._apply('roll_median_c', how=how, **kwargs) + return self._apply('roll_median_c', 'median', how=how, **kwargs) _shared_docs['std'] = dedent(""" %(name)s standard deviation @@ -609,7 +635,8 @@ def f(arg, *args, **kwargs): minp = _require_min_periods(1)(self.min_periods, window) return _zsqrt(algos.roll_var(arg, window, minp, ddof)) - return self._apply(f, check_minp=_require_min_periods(1), **kwargs) + return self._apply(f, 'std', check_minp=_require_min_periods(1), + ddof=ddof, **kwargs) _shared_docs['var'] = dedent(""" %(name)s variance @@ -621,20 +648,21 @@ def f(arg, *args, **kwargs): is ``N - ddof``, where ``N`` represents the number of elements.""") def var(self, ddof=1, **kwargs): - return self._apply('roll_var', check_minp=_require_min_periods(1), - ddof=ddof, **kwargs) + return self._apply('roll_var', 'var', + check_minp=_require_min_periods(1), ddof=ddof, + **kwargs) _shared_docs['skew'] = """Unbiased %(name)s skewness""" def skew(self, **kwargs): - return self._apply('roll_skew', check_minp=_require_min_periods(3), - **kwargs) + return self._apply('roll_skew', 'skew', + check_minp=_require_min_periods(3), **kwargs) _shared_docs['kurt'] = """Unbiased %(name)s kurtosis""" def kurt(self, **kwargs): - return self._apply('roll_kurt', check_minp=_require_min_periods(4), - **kwargs) + return self._apply('roll_kurt', 'kurt', + check_minp=_require_min_periods(4), **kwargs) _shared_docs['quantile'] = dedent(""" %(name)s quantile @@ -651,7 +679,8 @@ def f(arg, *args, **kwargs): minp = _use_window(self.min_periods, window) return algos.roll_quantile(arg, window, minp, quantile) - return self._apply(f, **kwargs) + return self._apply(f, 'quantile', quantile=quantile, + **kwargs) _shared_docs['cov'] = dedent(""" %(name)s sample covariance @@ -857,6 +886,18 @@ def corr(self, other=None, pairwise=None, **kwargs): **kwargs) +class RollingGroupby(_GroupByMixin, Rolling): + """ + Provides a rolling groupby implementation + + .. versionadded:: 0.18.1 + + """ + @property + def _constructor(self): + return Rolling + + class Expanding(_Rolling_and_Expanding): """ Provides expanding transformations. @@ -1005,6 +1046,18 @@ def corr(self, other=None, pairwise=None, **kwargs): **kwargs) +class ExpandingGroupby(_GroupByMixin, Expanding): + """ + Provides a expanding groupby implementation + + .. versionadded:: 0.18.1 + + """ + @property + def _constructor(self): + return Expanding + + _bias_template = """ Parameters diff --git a/pandas/indexes/base.py b/pandas/indexes/base.py index 77e53f839f4f4..35d96170dec42 100644 --- a/pandas/indexes/base.py +++ b/pandas/indexes/base.py @@ -326,8 +326,7 @@ def _simple_new(cls, values, name=None, dtype=None, **kwargs): result.name = name for k, v in compat.iteritems(kwargs): setattr(result, k, v) - result._reset_identity() - return result + return result._reset_identity() _index_shared_docs['_shallow_copy'] = """ create a new Index with the same class as the caller, don't copy the @@ -402,6 +401,7 @@ def is_(self, other): def _reset_identity(self): """Initializes or resets ``_id`` attribute with new object""" self._id = _Identity() + return self # ndarray compat def __len__(self): diff --git a/pandas/tests/test_groupby.py b/pandas/tests/test_groupby.py index b964665ebe91b..5dec00d82e938 100644 --- a/pandas/tests/test_groupby.py +++ b/pandas/tests/test_groupby.py @@ -3095,6 +3095,20 @@ def test_seriesgroupby_name_attr(self): testFunc = lambda x: np.sum(x) * 2 self.assertEqual(result.agg(testFunc).name, 'C') + def test_consistency_name(self): + # GH 12363 + + df = DataFrame({'A': ['foo', 'bar', 'foo', 'bar', + 'foo', 'bar', 'foo', 'foo'], + 'B': ['one', 'one', 'two', 'two', + 'two', 'two', 'one', 'two'], + 'C': np.random.randn(8) + 1.0, + 'D': np.arange(8)}) + + expected = df.groupby(['A']).B.count() + result = df.B.groupby(df.A).count() + assert_series_equal(result, expected) + def test_groupby_name_propagation(self): # GH 6124 def summarize(df, name=None): @@ -3561,8 +3575,7 @@ def test_rank_apply(self): expected.append(piece.value.rank()) expected = concat(expected, axis=0) expected = expected.reindex(result.index) - assert_series_equal(result, expected, check_names=False) - self.assertTrue(result.name is None) + assert_series_equal(result, expected) result = df.groupby(['key1', 'key2']).value.rank(pct=True) @@ -3571,8 +3584,7 @@ def test_rank_apply(self): expected.append(piece.value.rank(pct=True)) expected = concat(expected, axis=0) expected = expected.reindex(result.index) - assert_series_equal(result, expected, check_names=False) - self.assertTrue(result.name is None) + assert_series_equal(result, expected) def test_dont_clobber_name_column(self): df = DataFrame({'key': ['a', 'a', 'a', 'b', 'b', 'b'], @@ -3604,8 +3616,7 @@ def test_skip_group_keys(self): pieces.append(group.sort_values()[:3]) expected = concat(pieces) - assert_series_equal(result, expected, check_names=False) - self.assertTrue(result.name is None) + assert_series_equal(result, expected) def test_no_nonsense_name(self): # GH #995 @@ -4131,6 +4142,7 @@ def test_groupby_multi_timezone(self): tz='America/Chicago'), Timestamp('2000-01-01 16:50:00-0500', tz='America/New_York')], + name='date', dtype=object) assert_series_equal(result, expected) @@ -5743,7 +5755,7 @@ def test_tab_completion(self): 'cumcount', 'all', 'shift', 'skew', 'bfill', 'ffill', 'take', 'tshift', 'pct_change', 'any', 'mad', 'corr', 'corrwith', 'cov', 'dtypes', 'ndim', 'diff', 'idxmax', 'idxmin', - 'ffill', 'bfill', 'pad', 'backfill']) + 'ffill', 'bfill', 'pad', 'backfill', 'rolling', 'expanding']) self.assertEqual(results, expected) def test_lexsort_indexer(self): diff --git a/pandas/tests/test_window.py b/pandas/tests/test_window.py index fb0e2ad2ca34e..b25727e083d37 100644 --- a/pandas/tests/test_window.py +++ b/pandas/tests/test_window.py @@ -2713,3 +2713,132 @@ def test_rolling_min_max_numeric_types(self): result = (DataFrame(np.arange(20, dtype=data_type)) .rolling(window=5).min()) self.assertEqual(result.dtypes[0], np.dtype("f8")) + + +class TestGrouperGrouping(tm.TestCase): + + def setUp(self): + self.series = Series(np.arange(10)) + self.frame = DataFrame({'A': [1] * 20 + [2] * 12 + [3] * 8, + 'B': np.arange(40)}) + + def test_mutated(self): + + def f(): + self.frame.groupby('A', foo=1) + self.assertRaises(TypeError, f) + + g = self.frame.groupby('A') + self.assertFalse(g.mutated) + g = self.frame.groupby('A', mutated=True) + self.assertTrue(g.mutated) + + def test_getitem(self): + g = self.frame.groupby('A') + g_mutated = self.frame.groupby('A', mutated=True) + + expected = g_mutated.B.apply(lambda x: x.rolling(2).mean()) + + result = g.rolling(2).mean().B + assert_series_equal(result, expected) + + result = g.rolling(2).B.mean() + assert_series_equal(result, expected) + + result = g.B.rolling(2).mean() + assert_series_equal(result, expected) + + result = self.frame.B.groupby(self.frame.A).rolling(2).mean() + assert_series_equal(result, expected) + + def test_rolling(self): + g = self.frame.groupby('A') + r = g.rolling(window=4) + + for f in ['sum', 'mean', 'min', 'max', 'count', 'kurt', 'skew']: + result = getattr(r, f)() + expected = g.apply(lambda x: getattr(x.rolling(4), f)()) + assert_frame_equal(result, expected) + + for f in ['std', 'var']: + result = getattr(r, f)(ddof=1) + expected = g.apply(lambda x: getattr(x.rolling(4), f)(ddof=1)) + assert_frame_equal(result, expected) + + result = r.quantile(0.5) + expected = g.apply(lambda x: x.rolling(4).quantile(0.5)) + assert_frame_equal(result, expected) + + def test_rolling_corr_cov(self): + g = self.frame.groupby('A') + r = g.rolling(window=4) + + for f in ['corr', 'cov']: + result = getattr(r, f)(self.frame) + + def func(x): + return getattr(x.rolling(4), f)(self.frame) + expected = g.apply(func) + assert_frame_equal(result, expected) + + result = getattr(r.B, f)(pairwise=True) + + def func(x): + return getattr(x.B.rolling(4), f)(pairwise=True) + expected = g.apply(func) + assert_series_equal(result, expected) + + def test_rolling_apply(self): + g = self.frame.groupby('A') + r = g.rolling(window=4) + + # reduction + result = r.apply(lambda x: x.sum()) + expected = g.apply(lambda x: x.rolling(4).apply(lambda y: y.sum())) + assert_frame_equal(result, expected) + + def test_expanding(self): + g = self.frame.groupby('A') + r = g.expanding() + + for f in ['sum', 'mean', 'min', 'max', 'count', 'kurt', 'skew']: + result = getattr(r, f)() + expected = g.apply(lambda x: getattr(x.expanding(), f)()) + assert_frame_equal(result, expected) + + for f in ['std', 'var']: + result = getattr(r, f)(ddof=0) + expected = g.apply(lambda x: getattr(x.expanding(), f)(ddof=0)) + assert_frame_equal(result, expected) + + result = r.quantile(0.5) + expected = g.apply(lambda x: x.expanding().quantile(0.5)) + assert_frame_equal(result, expected) + + def test_expanding_corr_cov(self): + g = self.frame.groupby('A') + r = g.expanding() + + for f in ['corr', 'cov']: + result = getattr(r, f)(self.frame) + + def func(x): + return getattr(x.expanding(), f)(self.frame) + expected = g.apply(func) + assert_frame_equal(result, expected) + + result = getattr(r.B, f)(pairwise=True) + + def func(x): + return getattr(x.B.expanding(), f)(pairwise=True) + expected = g.apply(func) + assert_series_equal(result, expected) + + def test_expanding_apply(self): + g = self.frame.groupby('A') + r = g.expanding() + + # reduction + result = r.apply(lambda x: x.sum()) + expected = g.apply(lambda x: x.expanding().apply(lambda y: y.sum())) + assert_frame_equal(result, expected) diff --git a/pandas/tools/merge.py b/pandas/tools/merge.py index 84a431393b0bf..08df9f1c998ef 100644 --- a/pandas/tools/merge.py +++ b/pandas/tools/merge.py @@ -1136,6 +1136,7 @@ def _concat_indexes(indexes): def _make_concat_multiindex(indexes, keys, levels=None, names=None): + if ((levels is None and isinstance(keys[0], tuple)) or (levels is not None and len(levels) > 1)): zipped = lzip(*keys) diff --git a/pandas/tseries/resample.py b/pandas/tseries/resample.py index 4a6592da0cb41..504f03b6bd53d 100644 --- a/pandas/tseries/resample.py +++ b/pandas/tseries/resample.py @@ -3,7 +3,7 @@ import warnings import pandas as pd -from pandas.core.base import AbstractMethodError +from pandas.core.base import AbstractMethodError, GroupByMixin from pandas.core.groupby import (BinGrouper, Grouper, _GroupBy, GroupBy, SeriesGroupBy, groupby, PanelGroupBy) @@ -57,12 +57,12 @@ class Resampler(_GroupBy): 'grouper', 'groupby', 'keys', 'sort', 'kind', 'squeeze', 'group_keys', 'as_index', - 'exclusions'] + 'exclusions', '_groupby'] # API compat of disallowed attributes _deprecated_invalids = ['iloc', 'loc', 'ix', 'iat', 'at'] - def __init__(self, obj, groupby, axis=0, kind=None, **kwargs): + def __init__(self, obj, groupby=None, axis=0, kind=None, **kwargs): self.groupby = groupby self.keys = None self.sort = True @@ -75,7 +75,8 @@ def __init__(self, obj, groupby, axis=0, kind=None, **kwargs): self.binner = None self.grouper = None - self.groupby._set_grouper(self._convert_obj(obj), sort=True) + if self.groupby is not None: + self.groupby._set_grouper(self._convert_obj(obj), sort=True) def __unicode__(self): """ provide a nice str repr of our rolling object """ @@ -287,8 +288,7 @@ def aggregate(self, arg, *args, **kwargs): self._set_binner() result, how = self._aggregate(arg, *args, **kwargs) if result is None: - return self._groupby_and_aggregate(self.grouper, - arg, + return self._groupby_and_aggregate(arg, *args, **kwargs) @@ -349,7 +349,7 @@ def _gotitem(self, key, ndim, subset=None): except KeyError: return grouped - def _groupby_and_aggregate(self, grouper, how, *args, **kwargs): + def _groupby_and_aggregate(self, how, grouper=None, *args, **kwargs): """ revaluate the obj with a groupby aggregation """ if grouper is None: @@ -393,8 +393,14 @@ def _apply_loffset(self, result): return result + def _get_resampler_for_grouping(self, groupby, **kwargs): + """ return the correct class for resampling with groupby """ + return self._resampler_for_grouping(self, groupby=groupby, **kwargs) + def _wrap_result(self, result): """ potentially wrap any results """ + if isinstance(result, com.ABCSeries) and self._selection is not None: + result.name = self._selection return result def pad(self, limit=None): @@ -453,7 +459,7 @@ def asfreq(self): return the values at the new freq, essentially a reindex with (no filling) """ - return self._upsample(None) + return self._upsample('asfreq') def std(self, ddof=1): """ @@ -491,14 +497,14 @@ def f(self, _method=method): for method in ['count', 'size']: def f(self, _method=method): - return self._groupby_and_aggregate(None, _method) + return self._downsample(_method) f.__doc__ = getattr(GroupBy, method).__doc__ setattr(Resampler, method, f) # series only methods for method in ['nunique']: def f(self, _method=method): - return self._groupby_and_aggregate(None, _method) + return self._downsample(_method) f.__doc__ = getattr(SeriesGroupBy, method).__doc__ setattr(Resampler, method, f) @@ -549,8 +555,55 @@ def _maybe_process_deprecations(r, how=None, fill_method=None, limit=None): return r +class _GroupByMixin(GroupByMixin): + """ provide the groupby facilities """ + + def __init__(self, obj, *args, **kwargs): + + parent = kwargs.pop('parent', None) + groupby = kwargs.pop('groupby', None) + if parent is None: + parent = obj + + # initialize our GroupByMixin object with + # the resampler attributes + for attr in self._attributes: + setattr(self, attr, kwargs.get(attr, getattr(parent, attr))) + + super(_GroupByMixin, self).__init__(None) + self._groupby = groupby + self._groupby.mutated = True + self._groupby.grouper.mutated = True + self.groupby = parent.groupby + + def _apply(self, f, **kwargs): + """ + dispatch to _upsample; we are stripping all of the _upsample kwargs and + performing the original function call on the grouped object + """ + + def func(x): + x = self._shallow_copy(x, groupby=self.groupby) + + if isinstance(f, compat.string_types): + return getattr(x, f)(**kwargs) + + return x.apply(f, **kwargs) + + result = self._groupby.apply(func) + return self._wrap_result(result) + + _upsample = _apply + _downsample = _apply + _groupby_and_aggregate = _apply + + class DatetimeIndexResampler(Resampler): + @property + def _resampler_for_grouping(self): + return DatetimeIndexResamplerGroupby + def _get_binner_for_time(self): # this is how we are actually creating the bins @@ -605,8 +658,8 @@ def _adjust_binner_for_upsample(self, binner): def _upsample(self, method, limit=None): """ - method : string {'backfill', 'bfill', 'pad', 'ffill'} - method for upsampling + method : string {'backfill', 'bfill', 'pad', + 'ffill', 'asfreq'} method for upsampling limit : int, default None Maximum size gap to fill when reindexing @@ -644,8 +697,24 @@ def _wrap_result(self, result): return result +class DatetimeIndexResamplerGroupby(_GroupByMixin, DatetimeIndexResampler): + """ + Provides a resample of a groupby implementation + + .. versionadded:: 0.18.1 + + """ + @property + def _constructor(self): + return DatetimeIndexResampler + + class PeriodIndexResampler(DatetimeIndexResampler): + @property + def _resampler_for_grouping(self): + return PeriodIndexResamplerGroupby + def _convert_obj(self, obj): obj = super(PeriodIndexResampler, self)._convert_obj(obj) @@ -713,7 +782,7 @@ def _downsample(self, how, **kwargs): rng = np.arange(memb.values[0], memb.values[-1] + 1) bins = memb.searchsorted(rng, side='right') grouper = BinGrouper(bins, new_index) - return self._groupby_and_aggregate(grouper, how) + return self._groupby_and_aggregate(how, grouper=grouper) elif is_superperiod(ax.freq, self.freq): return self.asfreq() elif ax.freq == self.freq: @@ -756,14 +825,24 @@ def _upsample(self, method, limit=None): return self._wrap_result(_take_new_index( obj, indexer, new_index, axis=self.axis)) - def _groupby_and_aggregate(self, grouper, how, *args, **kwargs): - if grouper is None: - return self._downsample(how, **kwargs) - return super(PeriodIndexResampler, self)._groupby_and_aggregate( - grouper, how, *args, **kwargs) +class PeriodIndexResamplerGroupby(_GroupByMixin, PeriodIndexResampler): + """ + Provides a resample of a groupby implementation + + .. versionadded:: 0.18.1 + + """ + @property + def _constructor(self): + return PeriodIndexResampler + + +class TimedeltaIndexResampler(DatetimeIndexResampler): -class TimedeltaResampler(DatetimeIndexResampler): + @property + def _resampler_for_grouping(self): + return TimedeltaIndexResamplerGroupby def _get_binner_for_time(self): return self.groupby._get_time_delta_bins(self.ax) @@ -783,6 +862,18 @@ def _adjust_binner_for_upsample(self, binner): return binner +class TimedeltaIndexResamplerGroupby(_GroupByMixin, TimedeltaIndexResampler): + """ + Provides a resample of a groupby implementation + + .. versionadded:: 0.18.1 + + """ + @property + def _constructor(self): + return TimedeltaIndexResampler + + def resample(obj, kind=None, **kwds): """ create a TimeGrouper and return our resampler """ tg = TimeGrouper(**kwds) @@ -790,6 +881,19 @@ def resample(obj, kind=None, **kwds): resample.__doc__ = Resampler.__doc__ +def get_resampler_for_grouping(groupby, rule, how=None, fill_method=None, + limit=None, kind=None, **kwargs): + """ return our appropriate resampler when grouping as well """ + tg = TimeGrouper(freq=rule, **kwargs) + resampler = tg._get_resampler(groupby.obj, kind=kind) + r = resampler._get_resampler_for_grouping(groupby=groupby) + return _maybe_process_deprecations(r, + how=how, + fill_method=fill_method, + limit=limit, + **kwargs) + + class TimeGrouper(Grouper): """ Custom groupby class for time-interval grouping @@ -881,9 +985,9 @@ def _get_resampler(self, obj, kind=None): kind=kind, axis=self.axis) elif isinstance(ax, TimedeltaIndex): - return TimedeltaResampler(obj, - groupby=self, - axis=self.axis) + return TimedeltaIndexResampler(obj, + groupby=self, + axis=self.axis) raise TypeError("Only valid with DatetimeIndex, " "TimedeltaIndex or PeriodIndex, " diff --git a/pandas/tseries/tests/test_resample.py b/pandas/tseries/tests/test_resample.py index 091e36ad7c049..80123ecd4d217 100644 --- a/pandas/tseries/tests/test_resample.py +++ b/pandas/tseries/tests/test_resample.py @@ -207,14 +207,6 @@ def test_groupby_resample_api(self): lambda x: x.resample('1D').ffill())[['val']] assert_frame_equal(result, expected) - # deferred operations are currently disabled - # GH 12486 - # - # with tm.assert_produces_warning(FutureWarning, - # check_stacklevel=False): - # result = df.groupby('group').resample('1D').ffill() - # assert_frame_equal(result, expected) - def test_plot_api(self): tm._skip_if_no_mpl() @@ -1438,15 +1430,7 @@ def test_resample_segfault(self): columns=("ID", "timestamp", "A", "B") ).set_index("timestamp") result = df.groupby("ID").resample("5min").sum() - expected = DataFrame([[1, 1, 0], - [4, 2, 0], - [2, 1, 0]], - index=pd.MultiIndex.from_tuples([ - (1, pd.Timestamp('2013-10-01 16:20:00')), - (2, pd.Timestamp('2013-10-01 16:10:00')), - (2, pd.Timestamp('2013-10-01 18:15:00'))], - names=['ID', 'timestamp']), - columns=['ID', 'A', 'B']) + expected = df.groupby("ID").apply(lambda x: x.resample("5min").sum()) assert_frame_equal(result, expected) def test_resample_dtype_preservation(self): @@ -1742,10 +1726,6 @@ def test_resample_nunique(self): result = r.ID.nunique() assert_series_equal(result, expected) - # TODO - # this should have name - # https://github.com/pydata/pandas/issues/12363 - expected.name = None result = df.ID.resample('D').nunique() assert_series_equal(result, expected) @@ -2464,6 +2444,146 @@ def test_asfreq_bug(self): assert_frame_equal(result, expected) +class TestResamplerGrouper(tm.TestCase): + + def setUp(self): + self.frame = DataFrame({'A': [1] * 20 + [2] * 12 + [3] * 8, + 'B': np.arange(40)}, + index=date_range('1/1/2000', + freq='s', + periods=40)) + + def test_back_compat_v180(self): + + df = self.frame + for how in ['sum', 'mean', 'prod', 'min', 'max', 'var', 'std']: + with tm.assert_produces_warning(FutureWarning, + check_stacklevel=False): + result = df.groupby('A').resample('4s', how=how) + expected = getattr(df.groupby('A').resample('4s'), how)() + assert_frame_equal(result, expected) + + with tm.assert_produces_warning(FutureWarning, + check_stacklevel=False): + result = df.groupby('A').resample('4s', how='mean', + fill_method='ffill') + expected = df.groupby('A').resample('4s').mean().ffill() + assert_frame_equal(result, expected) + + def test_deferred_with_groupby(self): + + # GH 12486 + # support deferred resample ops with groupby + data = [['2010-01-01', 'A', 2], ['2010-01-02', 'A', 3], + ['2010-01-05', 'A', 8], ['2010-01-10', 'A', 7], + ['2010-01-13', 'A', 3], ['2010-01-01', 'B', 5], + ['2010-01-03', 'B', 2], ['2010-01-04', 'B', 1], + ['2010-01-11', 'B', 7], ['2010-01-14', 'B', 3]] + + df = DataFrame(data, columns=['date', 'id', 'score']) + df.date = pd.to_datetime(df.date) + f = lambda x: x.set_index('date').resample('D').asfreq() + expected = df.groupby('id').apply(f) + result = df.set_index('date').groupby('id').resample('D').asfreq() + assert_frame_equal(result, expected) + + df = DataFrame({'date': pd.date_range(start='2016-01-01', + periods=4, + freq='W'), + 'group': [1, 1, 2, 2], + 'val': [5, 6, 7, 8]}).set_index('date') + + f = lambda x: x.resample('1D').ffill() + expected = df.groupby('group').apply(f) + result = df.groupby('group').resample('1D').ffill() + assert_frame_equal(result, expected) + + def test_getitem(self): + g = self.frame.groupby('A') + + expected = g.B.apply(lambda x: x.resample('2s').mean()) + + result = g.resample('2s').B.mean() + assert_series_equal(result, expected) + + result = g.B.resample('2s').mean() + assert_series_equal(result, expected) + + result = g.resample('2s').mean().B + assert_series_equal(result, expected) + + def test_methods(self): + g = self.frame.groupby('A') + r = g.resample('2s') + + for f in ['first', 'last', 'median', 'sem', 'sum', 'mean', + 'min', 'max']: + result = getattr(r, f)() + expected = g.apply(lambda x: getattr(x.resample('2s'), f)()) + assert_frame_equal(result, expected) + + for f in ['size']: + result = getattr(r, f)() + expected = g.apply(lambda x: getattr(x.resample('2s'), f)()) + assert_series_equal(result, expected) + + for f in ['count']: + result = getattr(r, f)() + expected = g.apply(lambda x: getattr(x.resample('2s'), f)()) + assert_frame_equal(result, expected) + + # series only + for f in ['nunique']: + result = getattr(r.B, f)() + expected = g.B.apply(lambda x: getattr(x.resample('2s'), f)()) + assert_series_equal(result, expected) + + for f in ['backfill', 'ffill', 'asfreq']: + result = getattr(r, f)() + expected = g.apply(lambda x: getattr(x.resample('2s'), f)()) + assert_frame_equal(result, expected) + + result = r.ohlc() + expected = g.apply(lambda x: x.resample('2s').ohlc()) + assert_frame_equal(result, expected) + + for f in ['std', 'var']: + result = getattr(r, f)(ddof=1) + expected = g.apply(lambda x: getattr(x.resample('2s'), f)(ddof=1)) + assert_frame_equal(result, expected) + + def test_apply(self): + + g = self.frame.groupby('A') + r = g.resample('2s') + + # reduction + expected = g.resample('2s').sum() + + def f(x): + return x.resample('2s').sum() + result = r.apply(f) + assert_frame_equal(result, expected) + + def f(x): + return x.resample('2s').apply(lambda y: y.sum()) + result = g.apply(f) + assert_frame_equal(result, expected) + + def test_consistency_with_window(self): + + # consistent return values with window + df = self.frame + expected = pd.Int64Index([1, 2, 3], name='A') + result = df.groupby('A').resample('2s').mean() + self.assertEqual(result.index.nlevels, 2) + tm.assert_index_equal(result.index.levels[0], expected) + + result = df.groupby('A').rolling(20).mean() + self.assertEqual(result.index.nlevels, 2) + tm.assert_index_equal(result.index.levels[0], expected) + + class TestTimeGrouper(tm.TestCase): def setUp(self): self.ts = Series(np.random.randn(1000),