diff --git a/pandas/core/aggregation.py b/pandas/core/aggregation.py index ad69e9f31e065..74359c8831745 100644 --- a/pandas/core/aggregation.py +++ b/pandas/core/aggregation.py @@ -29,10 +29,11 @@ Label, ) +from pandas.core.dtypes.cast import is_nested_object from pandas.core.dtypes.common import is_dict_like, is_list_like from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries -from pandas.core.base import SpecificationError +from pandas.core.base import DataError, SpecificationError import pandas.core.common as com from pandas.core.indexes.api import Index @@ -525,3 +526,297 @@ def transform_str_or_callable( return obj.apply(func, args=args, **kwargs) except Exception: return func(obj, *args, **kwargs) + + +def aggregate(obj, arg: AggFuncType, *args, **kwargs): + """ + provide an implementation for the aggregators + + Parameters + ---------- + arg : string, dict, function + *args : args to pass on to the function + **kwargs : kwargs to pass on to the function + + Returns + ------- + tuple of result, how + + Notes + ----- + how can be a string describe the required post-processing, or + None if not required + """ + is_aggregator = lambda x: isinstance(x, (list, tuple, dict)) + + _axis = kwargs.pop("_axis", None) + if _axis is None: + _axis = getattr(obj, "axis", 0) + + if isinstance(arg, str): + return obj._try_aggregate_string_function(arg, *args, **kwargs), None + + if isinstance(arg, dict): + # aggregate based on the passed dict + if _axis != 0: # pragma: no cover + raise ValueError("Can only pass dict with axis=0") + + selected_obj = obj._selected_obj + + # if we have a dict of any non-scalars + # eg. {'A' : ['mean']}, normalize all to + # be list-likes + if any(is_aggregator(x) for x in arg.values()): + new_arg: Dict[Label, Union[AggFuncTypeBase, List[AggFuncTypeBase]]] = {} + for k, v in arg.items(): + if not isinstance(v, (tuple, list, dict)): + new_arg[k] = [v] + else: + new_arg[k] = v + + # the keys must be in the columns + # for ndim=2, or renamers for ndim=1 + + # ok for now, but deprecated + # {'A': { 'ra': 'mean' }} + # {'A': { 'ra': ['mean'] }} + # {'ra': ['mean']} + + # not ok + # {'ra' : { 'A' : 'mean' }} + if isinstance(v, dict): + raise SpecificationError("nested renamer is not supported") + elif isinstance(selected_obj, ABCSeries): + raise SpecificationError("nested renamer is not supported") + elif ( + isinstance(selected_obj, ABCDataFrame) + and k not in selected_obj.columns + ): + raise KeyError(f"Column '{k}' does not exist!") + + arg = new_arg + + else: + # deprecation of renaming keys + # GH 15931 + keys = list(arg.keys()) + if isinstance(selected_obj, ABCDataFrame) and len( + selected_obj.columns.intersection(keys) + ) != len(keys): + cols = sorted(set(keys) - set(selected_obj.columns.intersection(keys))) + raise SpecificationError(f"Column(s) {cols} do not exist") + + from pandas.core.reshape.concat import concat + + def _agg_1dim(name, how, subset=None): + """ + aggregate a 1-dim with how + """ + colg = obj._gotitem(name, ndim=1, subset=subset) + if colg.ndim != 1: + raise SpecificationError( + "nested dictionary is ambiguous in aggregation" + ) + return colg.aggregate(how) + + def _agg_2dim(how): + """ + aggregate a 2-dim with how + """ + colg = obj._gotitem(obj._selection, ndim=2, subset=selected_obj) + return colg.aggregate(how) + + def _agg(arg, func): + """ + run the aggregations over the arg with func + return a dict + """ + result = {} + for fname, agg_how in arg.items(): + result[fname] = func(fname, agg_how) + return result + + # set the final keys + keys = list(arg.keys()) + + if obj._selection is not None: + + sl = set(obj._selection_list) + + # we are a Series like object, + # but may have multiple aggregations + if len(sl) == 1: + + result = _agg( + arg, lambda fname, agg_how: _agg_1dim(obj._selection, agg_how) + ) + + # we are selecting the same set as we are aggregating + elif not len(sl - set(keys)): + + result = _agg(arg, _agg_1dim) + + # we are a DataFrame, with possibly multiple aggregations + else: + + result = _agg(arg, _agg_2dim) + + # no selection + else: + + try: + result = _agg(arg, _agg_1dim) + except SpecificationError: + + # we are aggregating expecting all 1d-returns + # but we have 2d + result = _agg(arg, _agg_2dim) + + # combine results + + def is_any_series() -> bool: + # return a boolean if we have *any* nested series + return any(isinstance(r, ABCSeries) for r in result.values()) + + def is_any_frame() -> bool: + # return a boolean if we have *any* nested series + return any(isinstance(r, ABCDataFrame) for r in result.values()) + + if isinstance(result, list): + return concat(result, keys=keys, axis=1, sort=True), True + + elif is_any_frame(): + # we have a dict of DataFrames + # return a MI DataFrame + + keys_to_use = [k for k in keys if not result[k].empty] + # Have to check, if at least one DataFrame is not empty. + keys_to_use = keys_to_use if keys_to_use != [] else keys + return ( + concat([result[k] for k in keys_to_use], keys=keys_to_use, axis=1), + True, + ) + + elif isinstance(obj, ABCSeries) and is_any_series(): + + # we have a dict of Series + # return a MI Series + try: + result = concat(result) + except TypeError as err: + # we want to give a nice error here if + # we have non-same sized objects, so + # we don't automatically broadcast + + raise ValueError( + "cannot perform both aggregation " + "and transformation operations " + "simultaneously" + ) from err + + return result, True + + # fall thru + from pandas import DataFrame, Series + + try: + result = DataFrame(result) + except ValueError: + # we have a dict of scalars + + # GH 36212 use name only if obj is a series + if obj.ndim == 1: + obj = cast("Series", obj) + name = obj.name + else: + name = None + + result = Series(result, name=name) + + return result, True + elif is_list_like(arg): + # we require a list, but not an 'str' + return aggregate_multiple_funcs(obj, arg, _axis=_axis), None + else: + result = None + + if callable(arg): + f = obj._get_cython_func(arg) + if f and not args and not kwargs: + return getattr(obj, f)(), None + + # caller can react + return result, True + + +def aggregate_multiple_funcs(obj, arg, _axis): + from pandas.core.reshape.concat import concat + + if _axis != 0: + raise NotImplementedError("axis other than 0 is not supported") + + if obj._selected_obj.ndim == 1: + selected_obj = obj._selected_obj + else: + selected_obj = obj._obj_with_exclusions + + results = [] + keys = [] + + # degenerate case + if selected_obj.ndim == 1: + for a in arg: + colg = obj._gotitem(selected_obj.name, ndim=1, subset=selected_obj) + try: + new_res = colg.aggregate(a) + + except TypeError: + pass + else: + results.append(new_res) + + # make sure we find a good name + name = com.get_callable_name(a) or a + keys.append(name) + + # multiples + else: + for index, col in enumerate(selected_obj): + colg = obj._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index]) + try: + new_res = colg.aggregate(arg) + except (TypeError, DataError): + pass + except ValueError as err: + # cannot aggregate + if "Must produce aggregated value" in str(err): + # raised directly in _aggregate_named + pass + elif "no results" in str(err): + # raised directly in _aggregate_multiple_funcs + pass + else: + raise + else: + results.append(new_res) + keys.append(col) + + # if we are empty + if not len(results): + raise ValueError("no results") + + try: + return concat(results, keys=keys, axis=1, sort=False) + except TypeError as err: + + # we are concatting non-NDFrame objects, + # e.g. a list of scalars + + from pandas import Series + + result = Series(results, index=keys, name=obj.name) + if is_nested_object(result): + raise ValueError( + "cannot combine transform and aggregation operations" + ) from err + return result diff --git a/pandas/core/base.py b/pandas/core/base.py index 1063e742e38c8..10b83116dee58 100644 --- a/pandas/core/base.py +++ b/pandas/core/base.py @@ -4,30 +4,28 @@ import builtins import textwrap -from typing import Any, Callable, Dict, FrozenSet, List, Optional, TypeVar, Union, cast +from typing import Any, Callable, Dict, FrozenSet, Optional, TypeVar, Union import numpy as np import pandas._libs.lib as lib -from pandas._typing import AggFuncType, AggFuncTypeBase, IndexLabel, Label +from pandas._typing import IndexLabel from pandas.compat import PYPY from pandas.compat.numpy import function as nv from pandas.errors import AbstractMethodError from pandas.util._decorators import cache_readonly, doc -from pandas.core.dtypes.cast import is_nested_object from pandas.core.dtypes.common import ( is_categorical_dtype, is_dict_like, is_extension_array_dtype, - is_list_like, is_object_dtype, is_scalar, ) from pandas.core.dtypes.generic import ABCDataFrame, ABCIndexClass, ABCSeries from pandas.core.dtypes.missing import isna -from pandas.core import algorithms, common as com +from pandas.core import algorithms from pandas.core.accessor import DirNamesMixin from pandas.core.algorithms import duplicated, unique1d, value_counts from pandas.core.arraylike import OpsMixin @@ -282,300 +280,6 @@ def _try_aggregate_string_function(self, arg: str, *args, **kwargs): f"'{arg}' is not a valid function for '{type(self).__name__}' object" ) - def _aggregate(self, arg: AggFuncType, *args, **kwargs): - """ - provide an implementation for the aggregators - - Parameters - ---------- - arg : string, dict, function - *args : args to pass on to the function - **kwargs : kwargs to pass on to the function - - Returns - ------- - tuple of result, how - - Notes - ----- - how can be a string describe the required post-processing, or - None if not required - """ - is_aggregator = lambda x: isinstance(x, (list, tuple, dict)) - - _axis = kwargs.pop("_axis", None) - if _axis is None: - _axis = getattr(self, "axis", 0) - - if isinstance(arg, str): - return self._try_aggregate_string_function(arg, *args, **kwargs), None - - if isinstance(arg, dict): - # aggregate based on the passed dict - if _axis != 0: # pragma: no cover - raise ValueError("Can only pass dict with axis=0") - - selected_obj = self._selected_obj - - # if we have a dict of any non-scalars - # eg. {'A' : ['mean']}, normalize all to - # be list-likes - if any(is_aggregator(x) for x in arg.values()): - new_arg: Dict[Label, Union[AggFuncTypeBase, List[AggFuncTypeBase]]] = {} - for k, v in arg.items(): - if not isinstance(v, (tuple, list, dict)): - new_arg[k] = [v] - else: - new_arg[k] = v - - # the keys must be in the columns - # for ndim=2, or renamers for ndim=1 - - # ok for now, but deprecated - # {'A': { 'ra': 'mean' }} - # {'A': { 'ra': ['mean'] }} - # {'ra': ['mean']} - - # not ok - # {'ra' : { 'A' : 'mean' }} - if isinstance(v, dict): - raise SpecificationError("nested renamer is not supported") - elif isinstance(selected_obj, ABCSeries): - raise SpecificationError("nested renamer is not supported") - elif ( - isinstance(selected_obj, ABCDataFrame) - and k not in selected_obj.columns - ): - raise KeyError(f"Column '{k}' does not exist!") - - arg = new_arg - - else: - # deprecation of renaming keys - # GH 15931 - keys = list(arg.keys()) - if isinstance(selected_obj, ABCDataFrame) and len( - selected_obj.columns.intersection(keys) - ) != len(keys): - cols = sorted( - set(keys) - set(selected_obj.columns.intersection(keys)) - ) - raise SpecificationError(f"Column(s) {cols} do not exist") - - from pandas.core.reshape.concat import concat - - def _agg_1dim(name, how, subset=None): - """ - aggregate a 1-dim with how - """ - colg = self._gotitem(name, ndim=1, subset=subset) - if colg.ndim != 1: - raise SpecificationError( - "nested dictionary is ambiguous in aggregation" - ) - return colg.aggregate(how) - - def _agg_2dim(how): - """ - aggregate a 2-dim with how - """ - colg = self._gotitem(self._selection, ndim=2, subset=selected_obj) - return colg.aggregate(how) - - def _agg(arg, func): - """ - run the aggregations over the arg with func - return a dict - """ - result = {} - for fname, agg_how in arg.items(): - result[fname] = func(fname, agg_how) - return result - - # set the final keys - keys = list(arg.keys()) - - if self._selection is not None: - - sl = set(self._selection_list) - - # we are a Series like object, - # but may have multiple aggregations - if len(sl) == 1: - - result = _agg( - arg, lambda fname, agg_how: _agg_1dim(self._selection, agg_how) - ) - - # we are selecting the same set as we are aggregating - elif not len(sl - set(keys)): - - result = _agg(arg, _agg_1dim) - - # we are a DataFrame, with possibly multiple aggregations - else: - - result = _agg(arg, _agg_2dim) - - # no selection - else: - - try: - result = _agg(arg, _agg_1dim) - except SpecificationError: - - # we are aggregating expecting all 1d-returns - # but we have 2d - result = _agg(arg, _agg_2dim) - - # combine results - - def is_any_series() -> bool: - # return a boolean if we have *any* nested series - return any(isinstance(r, ABCSeries) for r in result.values()) - - def is_any_frame() -> bool: - # return a boolean if we have *any* nested series - return any(isinstance(r, ABCDataFrame) for r in result.values()) - - if isinstance(result, list): - return concat(result, keys=keys, axis=1, sort=True), True - - elif is_any_frame(): - # we have a dict of DataFrames - # return a MI DataFrame - - keys_to_use = [k for k in keys if not result[k].empty] - # Have to check, if at least one DataFrame is not empty. - keys_to_use = keys_to_use if keys_to_use != [] else keys - return ( - concat([result[k] for k in keys_to_use], keys=keys_to_use, axis=1), - True, - ) - - elif isinstance(self, ABCSeries) and is_any_series(): - - # we have a dict of Series - # return a MI Series - try: - result = concat(result) - except TypeError as err: - # we want to give a nice error here if - # we have non-same sized objects, so - # we don't automatically broadcast - - raise ValueError( - "cannot perform both aggregation " - "and transformation operations " - "simultaneously" - ) from err - - return result, True - - # fall thru - from pandas import DataFrame, Series - - try: - result = DataFrame(result) - except ValueError: - # we have a dict of scalars - - # GH 36212 use name only if self is a series - if self.ndim == 1: - self = cast("Series", self) - name = self.name - else: - name = None - - result = Series(result, name=name) - - return result, True - elif is_list_like(arg): - # we require a list, but not an 'str' - return self._aggregate_multiple_funcs(arg, _axis=_axis), None - else: - result = None - - if callable(arg): - f = self._get_cython_func(arg) - if f and not args and not kwargs: - return getattr(self, f)(), None - - # caller can react - return result, True - - def _aggregate_multiple_funcs(self, arg, _axis): - from pandas.core.reshape.concat import concat - - if _axis != 0: - raise NotImplementedError("axis other than 0 is not supported") - - if self._selected_obj.ndim == 1: - selected_obj = self._selected_obj - else: - selected_obj = self._obj_with_exclusions - - results = [] - keys = [] - - # degenerate case - if selected_obj.ndim == 1: - for a in arg: - colg = self._gotitem(selected_obj.name, ndim=1, subset=selected_obj) - try: - new_res = colg.aggregate(a) - - except TypeError: - pass - else: - results.append(new_res) - - # make sure we find a good name - name = com.get_callable_name(a) or a - keys.append(name) - - # multiples - else: - for index, col in enumerate(selected_obj): - colg = self._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index]) - try: - new_res = colg.aggregate(arg) - except (TypeError, DataError): - pass - except ValueError as err: - # cannot aggregate - if "Must produce aggregated value" in str(err): - # raised directly in _aggregate_named - pass - elif "no results" in str(err): - # raised directly in _aggregate_multiple_funcs - pass - else: - raise - else: - results.append(new_res) - keys.append(col) - - # if we are empty - if not len(results): - raise ValueError("no results") - - try: - return concat(results, keys=keys, axis=1, sort=False) - except TypeError as err: - - # we are concatting non-NDFrame objects, - # e.g. a list of scalars - - from pandas import Series - - result = Series(results, index=keys, name=self.name) - if is_nested_object(result): - raise ValueError( - "cannot combine transform and aggregation operations" - ) from err - return result - def _get_cython_func(self, arg: Callable) -> Optional[str]: """ if we define an internal function for this argument, return it diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 80e9ec5076610..1e3ae3eb41b8d 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -118,7 +118,12 @@ from pandas.core import algorithms, common as com, nanops, ops from pandas.core.accessor import CachedAccessor -from pandas.core.aggregation import reconstruct_func, relabel_result, transform +from pandas.core.aggregation import ( + aggregate, + reconstruct_func, + relabel_result, + transform, +) from pandas.core.arrays import Categorical, ExtensionArray from pandas.core.arrays.datetimelike import DatetimeLikeArrayMixin as DatetimeLikeArray from pandas.core.arrays.sparse import SparseFrameAccessor @@ -7434,10 +7439,10 @@ def _aggregate(self, arg, axis=0, *args, **kwargs): if axis == 1: # NDFrame.aggregate returns a tuple, and we need to transpose # only result - result, how = self.T._aggregate(arg, *args, **kwargs) + result, how = aggregate(self.T, arg, *args, **kwargs) result = result.T if result is not None else result return result, how - return super()._aggregate(arg, *args, **kwargs) + return aggregate(self, arg, *args, **kwargs) agg = aggregate diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index e7e812737d48e..af3aa5d121391 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -54,6 +54,8 @@ from pandas.core.dtypes.missing import isna, notna from pandas.core.aggregation import ( + aggregate, + aggregate_multiple_funcs, maybe_mangle_lambdas, reconstruct_func, validate_func_kwargs, @@ -946,7 +948,7 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) relabeling, func, columns, order = reconstruct_func(func, **kwargs) func = maybe_mangle_lambdas(func) - result, how = self._aggregate(func, *args, **kwargs) + result, how = aggregate(self, func, *args, **kwargs) if how is None: return result @@ -966,7 +968,7 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) # try to treat as if we are passing a list try: - result = self._aggregate_multiple_funcs([func], _axis=self.axis) + result = aggregate_multiple_funcs(self, [func], _axis=self.axis) # select everything except for the last level, which is the one # containing the name of the function(s), see GH 32040 diff --git a/pandas/core/resample.py b/pandas/core/resample.py index f881f79cb5c1d..3f1b1dac080a7 100644 --- a/pandas/core/resample.py +++ b/pandas/core/resample.py @@ -21,6 +21,7 @@ from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries +from pandas.core.aggregation import aggregate import pandas.core.algorithms as algos from pandas.core.base import DataError from pandas.core.generic import NDFrame, _shared_docs @@ -288,7 +289,7 @@ def pipe(self, func, *args, **kwargs): def aggregate(self, func, *args, **kwargs): self._set_binner() - result, how = self._aggregate(func, *args, **kwargs) + result, how = aggregate(self, func, *args, **kwargs) if result is None: how = func grouper = None diff --git a/pandas/core/series.py b/pandas/core/series.py index 9bd41ca0e76db..a2a6023bf4626 100644 --- a/pandas/core/series.py +++ b/pandas/core/series.py @@ -70,7 +70,7 @@ import pandas as pd from pandas.core import algorithms, base, generic, nanops, ops from pandas.core.accessor import CachedAccessor -from pandas.core.aggregation import transform +from pandas.core.aggregation import aggregate, transform from pandas.core.arrays import ExtensionArray from pandas.core.arrays.categorical import CategoricalAccessor from pandas.core.arrays.sparse import SparseAccessor @@ -4019,7 +4019,7 @@ def aggregate(self, func=None, axis=0, *args, **kwargs): if func is None: func = dict(kwargs.items()) - result, how = self._aggregate(func, *args, **kwargs) + result, how = aggregate(self, func, *args, **kwargs) if result is None: # we can be called from an inner function which diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index af3bba4edf343..466b320f1771f 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -47,6 +47,7 @@ ) from pandas.core.dtypes.missing import notna +from pandas.core.aggregation import aggregate from pandas.core.base import DataError, SelectionMixin import pandas.core.common as com from pandas.core.construction import extract_array @@ -618,7 +619,7 @@ def calc(x): return self._apply_blockwise(homogeneous_func, name) def aggregate(self, func, *args, **kwargs): - result, how = self._aggregate(func, *args, **kwargs) + result, how = aggregate(self, func, *args, **kwargs) if result is None: return self.apply(func, raw=False, args=args, kwargs=kwargs) return result @@ -1183,7 +1184,7 @@ def _get_window( axis="", ) def aggregate(self, func, *args, **kwargs): - result, how = self._aggregate(func, *args, **kwargs) + result, how = aggregate(self, func, *args, **kwargs) if result is None: # these must apply directly