Skip to content

CLN: Move _aggregate and _aggregate_multiple_funcs to core.aggregation #36999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
297 changes: 296 additions & 1 deletion pandas/core/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
Label,
)

from pandas.core.dtypes.cast import is_nested_object
from pandas.core.dtypes.common import is_dict_like, is_list_like
from pandas.core.dtypes.generic import ABCDataFrame, ABCSeries

from pandas.core.base import SpecificationError
from pandas.core.base import DataError, SpecificationError
import pandas.core.common as com
from pandas.core.indexes.api import Index

Expand Down Expand Up @@ -525,3 +526,297 @@ def transform_str_or_callable(
return obj.apply(func, args=args, **kwargs)
except Exception:
return func(obj, *args, **kwargs)


def aggregate(obj, arg: AggFuncType, *args, **kwargs):
"""
provide an implementation for the aggregators

Parameters
----------
arg : string, dict, function
*args : args to pass on to the function
**kwargs : kwargs to pass on to the function

Returns
-------
tuple of result, how

Notes
-----
how can be a string describe the required post-processing, or
None if not required
"""
is_aggregator = lambda x: isinstance(x, (list, tuple, dict))

_axis = kwargs.pop("_axis", None)
if _axis is None:
_axis = getattr(obj, "axis", 0)

if isinstance(arg, str):
return obj._try_aggregate_string_function(arg, *args, **kwargs), None

if isinstance(arg, dict):
# aggregate based on the passed dict
if _axis != 0: # pragma: no cover
raise ValueError("Can only pass dict with axis=0")

selected_obj = obj._selected_obj

# if we have a dict of any non-scalars
# eg. {'A' : ['mean']}, normalize all to
# be list-likes
if any(is_aggregator(x) for x in arg.values()):
new_arg: Dict[Label, Union[AggFuncTypeBase, List[AggFuncTypeBase]]] = {}
for k, v in arg.items():
if not isinstance(v, (tuple, list, dict)):
new_arg[k] = [v]
else:
new_arg[k] = v

# the keys must be in the columns
# for ndim=2, or renamers for ndim=1

# ok for now, but deprecated
# {'A': { 'ra': 'mean' }}
# {'A': { 'ra': ['mean'] }}
# {'ra': ['mean']}

# not ok
# {'ra' : { 'A' : 'mean' }}
if isinstance(v, dict):
raise SpecificationError("nested renamer is not supported")
elif isinstance(selected_obj, ABCSeries):
raise SpecificationError("nested renamer is not supported")
elif (
isinstance(selected_obj, ABCDataFrame)
and k not in selected_obj.columns
):
raise KeyError(f"Column '{k}' does not exist!")

arg = new_arg

else:
# deprecation of renaming keys
# GH 15931
keys = list(arg.keys())
if isinstance(selected_obj, ABCDataFrame) and len(
selected_obj.columns.intersection(keys)
) != len(keys):
cols = sorted(set(keys) - set(selected_obj.columns.intersection(keys)))
raise SpecificationError(f"Column(s) {cols} do not exist")

from pandas.core.reshape.concat import concat

def _agg_1dim(name, how, subset=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follown can make these module level / simpler

"""
aggregate a 1-dim with how
"""
colg = obj._gotitem(name, ndim=1, subset=subset)
if colg.ndim != 1:
raise SpecificationError(
"nested dictionary is ambiguous in aggregation"
)
return colg.aggregate(how)

def _agg_2dim(how):
"""
aggregate a 2-dim with how
"""
colg = obj._gotitem(obj._selection, ndim=2, subset=selected_obj)
return colg.aggregate(how)

def _agg(arg, func):
"""
run the aggregations over the arg with func
return a dict
"""
result = {}
for fname, agg_how in arg.items():
result[fname] = func(fname, agg_how)
return result

# set the final keys
keys = list(arg.keys())

if obj._selection is not None:

sl = set(obj._selection_list)

# we are a Series like object,
# but may have multiple aggregations
if len(sl) == 1:

result = _agg(
arg, lambda fname, agg_how: _agg_1dim(obj._selection, agg_how)
)

# we are selecting the same set as we are aggregating
elif not len(sl - set(keys)):

result = _agg(arg, _agg_1dim)

# we are a DataFrame, with possibly multiple aggregations
else:

result = _agg(arg, _agg_2dim)

# no selection
else:

try:
result = _agg(arg, _agg_1dim)
except SpecificationError:

# we are aggregating expecting all 1d-returns
# but we have 2d
result = _agg(arg, _agg_2dim)

# combine results

def is_any_series() -> bool:
# return a boolean if we have *any* nested series
return any(isinstance(r, ABCSeries) for r in result.values())

def is_any_frame() -> bool:
# return a boolean if we have *any* nested series
return any(isinstance(r, ABCDataFrame) for r in result.values())

if isinstance(result, list):
return concat(result, keys=keys, axis=1, sort=True), True

elif is_any_frame():
# we have a dict of DataFrames
# return a MI DataFrame

keys_to_use = [k for k in keys if not result[k].empty]
# Have to check, if at least one DataFrame is not empty.
keys_to_use = keys_to_use if keys_to_use != [] else keys
return (
concat([result[k] for k in keys_to_use], keys=keys_to_use, axis=1),
True,
)

elif isinstance(obj, ABCSeries) and is_any_series():

# we have a dict of Series
# return a MI Series
try:
result = concat(result)
except TypeError as err:
# we want to give a nice error here if
# we have non-same sized objects, so
# we don't automatically broadcast

raise ValueError(
"cannot perform both aggregation "
"and transformation operations "
"simultaneously"
) from err

return result, True

# fall thru
from pandas import DataFrame, Series

try:
result = DataFrame(result)
except ValueError:
# we have a dict of scalars

# GH 36212 use name only if obj is a series
if obj.ndim == 1:
obj = cast("Series", obj)
name = obj.name
else:
name = None

result = Series(result, name=name)

return result, True
elif is_list_like(arg):
# we require a list, but not an 'str'
return aggregate_multiple_funcs(obj, arg, _axis=_axis), None
else:
result = None

if callable(arg):
f = obj._get_cython_func(arg)
if f and not args and not kwargs:
return getattr(obj, f)(), None

# caller can react
return result, True


def aggregate_multiple_funcs(obj, arg, _axis):
from pandas.core.reshape.concat import concat

if _axis != 0:
raise NotImplementedError("axis other than 0 is not supported")

if obj._selected_obj.ndim == 1:
selected_obj = obj._selected_obj
else:
selected_obj = obj._obj_with_exclusions

results = []
keys = []

# degenerate case
if selected_obj.ndim == 1:
for a in arg:
colg = obj._gotitem(selected_obj.name, ndim=1, subset=selected_obj)
try:
new_res = colg.aggregate(a)

except TypeError:
pass
else:
results.append(new_res)

# make sure we find a good name
name = com.get_callable_name(a) or a
keys.append(name)

# multiples
else:
for index, col in enumerate(selected_obj):
colg = obj._gotitem(col, ndim=1, subset=selected_obj.iloc[:, index])
try:
new_res = colg.aggregate(arg)
except (TypeError, DataError):
pass
except ValueError as err:
# cannot aggregate
if "Must produce aggregated value" in str(err):
# raised directly in _aggregate_named
pass
elif "no results" in str(err):
# raised directly in _aggregate_multiple_funcs
pass
else:
raise
else:
results.append(new_res)
keys.append(col)

# if we are empty
if not len(results):
raise ValueError("no results")

try:
return concat(results, keys=keys, axis=1, sort=False)
except TypeError as err:

# we are concatting non-NDFrame objects,
# e.g. a list of scalars

from pandas import Series

result = Series(results, index=keys, name=obj.name)
if is_nested_object(result):
raise ValueError(
"cannot combine transform and aggregation operations"
) from err
return result
Loading