Skip to content

Commit 0534b00

Browse files
ENH: Implement Keyword Aggregation for DataFrame.agg and Series.agg (#29116)
1 parent 280efbf commit 0534b00

File tree

10 files changed

+317
-23
lines changed

10 files changed

+317
-23
lines changed

pandas/core/aggregation.py

+161-1
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,99 @@
55

66
from collections import defaultdict
77
from functools import partial
8-
from typing import Any, Callable, DefaultDict, List, Sequence, Tuple, Union
8+
from typing import (
9+
Any,
10+
Callable,
11+
DefaultDict,
12+
Dict,
13+
List,
14+
Optional,
15+
Sequence,
16+
Tuple,
17+
Union,
18+
)
19+
20+
from pandas._typing import Label
921

1022
from pandas.core.dtypes.common import is_dict_like, is_list_like
1123

24+
from pandas.core.base import SpecificationError
1225
import pandas.core.common as com
1326
from pandas.core.indexes.api import Index
27+
from pandas.core.series import FrameOrSeriesUnion, Series
28+
29+
# types of `func` kwarg for DataFrame.aggregate and Series.aggregate
30+
AggFuncTypeBase = Union[Callable, str]
31+
AggFuncType = Union[
32+
AggFuncTypeBase,
33+
List[AggFuncTypeBase],
34+
Dict[Label, Union[AggFuncTypeBase, List[AggFuncTypeBase]]],
35+
]
36+
37+
38+
def reconstruct_func(
39+
func: Optional[AggFuncType], **kwargs,
40+
) -> Tuple[
41+
bool, Optional[AggFuncType], Optional[List[str]], Optional[List[int]],
42+
]:
43+
"""
44+
This is the internal function to reconstruct func given if there is relabeling
45+
or not and also normalize the keyword to get new order of columns.
46+
47+
If named aggregation is applied, `func` will be None, and kwargs contains the
48+
column and aggregation function information to be parsed;
49+
If named aggregation is not applied, `func` is either string (e.g. 'min') or
50+
Callable, or list of them (e.g. ['min', np.max]), or the dictionary of column name
51+
and str/Callable/list of them (e.g. {'A': 'min'}, or {'A': [np.min, lambda x: x]})
52+
53+
If relabeling is True, will return relabeling, reconstructed func, column
54+
names, and the reconstructed order of columns.
55+
If relabeling is False, the columns and order will be None.
56+
57+
Parameters
58+
----------
59+
func: agg function (e.g. 'min' or Callable) or list of agg functions
60+
(e.g. ['min', np.max]) or dictionary (e.g. {'A': ['min', np.max]}).
61+
**kwargs: dict, kwargs used in is_multi_agg_with_relabel and
62+
normalize_keyword_aggregation function for relabelling
63+
64+
Returns
65+
-------
66+
relabelling: bool, if there is relabelling or not
67+
func: normalized and mangled func
68+
columns: list of column names
69+
order: list of columns indices
70+
71+
Examples
72+
--------
73+
>>> reconstruct_func(None, **{"foo": ("col", "min")})
74+
(True, defaultdict(None, {'col': ['min']}), ('foo',), array([0]))
75+
76+
>>> reconstruct_func("min")
77+
(False, 'min', None, None)
78+
"""
79+
relabeling = func is None and is_multi_agg_with_relabel(**kwargs)
80+
columns: Optional[List[str]] = None
81+
order: Optional[List[int]] = None
82+
83+
if not relabeling:
84+
if isinstance(func, list) and len(func) > len(set(func)):
85+
86+
# GH 28426 will raise error if duplicated function names are used and
87+
# there is no reassigned name
88+
raise SpecificationError(
89+
"Function names must be unique if there is no new column names "
90+
"assigned"
91+
)
92+
elif func is None:
93+
# nicer error message
94+
raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
95+
96+
if relabeling:
97+
func, columns, order = normalize_keyword_aggregation(kwargs)
98+
func = maybe_mangle_lambdas(func)
99+
100+
return relabeling, func, columns, order
14101

15102

16103
def is_multi_agg_with_relabel(**kwargs) -> bool:
@@ -198,6 +285,79 @@ def maybe_mangle_lambdas(agg_spec: Any) -> Any:
198285
return mangled_aggspec
199286

200287

288+
def relabel_result(
289+
result: FrameOrSeriesUnion,
290+
func: Dict[str, List[Union[Callable, str]]],
291+
columns: Tuple,
292+
order: List[int],
293+
) -> Dict[Label, Series]:
294+
"""Internal function to reorder result if relabelling is True for
295+
dataframe.agg, and return the reordered result in dict.
296+
297+
Parameters:
298+
----------
299+
result: Result from aggregation
300+
func: Dict of (column name, funcs)
301+
columns: New columns name for relabelling
302+
order: New order for relabelling
303+
304+
Examples:
305+
---------
306+
>>> result = DataFrame({"A": [np.nan, 2, np.nan],
307+
... "C": [6, np.nan, np.nan], "B": [np.nan, 4, 2.5]}) # doctest: +SKIP
308+
>>> funcs = {"A": ["max"], "C": ["max"], "B": ["mean", "min"]}
309+
>>> columns = ("foo", "aab", "bar", "dat")
310+
>>> order = [0, 1, 2, 3]
311+
>>> _relabel_result(result, func, columns, order) # doctest: +SKIP
312+
dict(A=Series([2.0, NaN, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
313+
C=Series([NaN, 6.0, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
314+
B=Series([NaN, NaN, 2.5, 4.0], index=["foo", "aab", "bar", "dat"]))
315+
"""
316+
reordered_indexes = [
317+
pair[0] for pair in sorted(zip(columns, order), key=lambda t: t[1])
318+
]
319+
reordered_result_in_dict: Dict[Label, Series] = {}
320+
idx = 0
321+
322+
reorder_mask = not isinstance(result, Series) and len(result.columns) > 1
323+
for col, fun in func.items():
324+
s = result[col].dropna()
325+
326+
# In the `_aggregate`, the callable names are obtained and used in `result`, and
327+
# these names are ordered alphabetically. e.g.
328+
# C2 C1
329+
# <lambda> 1 NaN
330+
# amax NaN 4.0
331+
# max NaN 4.0
332+
# sum 18.0 6.0
333+
# Therefore, the order of functions for each column could be shuffled
334+
# accordingly so need to get the callable name if it is not parsed names, and
335+
# reorder the aggregated result for each column.
336+
# e.g. if df.agg(c1=("C2", sum), c2=("C2", lambda x: min(x))), correct order is
337+
# [sum, <lambda>], but in `result`, it will be [<lambda>, sum], and we need to
338+
# reorder so that aggregated values map to their functions regarding the order.
339+
340+
# However there is only one column being used for aggregation, not need to
341+
# reorder since the index is not sorted, and keep as is in `funcs`, e.g.
342+
# A
343+
# min 1.0
344+
# mean 1.5
345+
# mean 1.5
346+
if reorder_mask:
347+
fun = [
348+
com.get_callable_name(f) if not isinstance(f, str) else f for f in fun
349+
]
350+
col_idx_order = Index(s.index).get_indexer(fun)
351+
s = s[col_idx_order]
352+
353+
# assign the new user-provided "named aggregation" as index names, and reindex
354+
# it based on the whole user-provided names.
355+
s.index = reordered_indexes[idx : idx + len(fun)]
356+
reordered_result_in_dict[col] = s.reindex(columns, copy=False)
357+
idx = idx + len(fun)
358+
return reordered_result_in_dict
359+
360+
201361
def validate_func_kwargs(
202362
kwargs: dict,
203363
) -> Tuple[List[str], List[Union[str, Callable[..., Any]]]]:

pandas/core/frame.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114

115115
from pandas.core import algorithms, common as com, nanops, ops
116116
from pandas.core.accessor import CachedAccessor
117+
from pandas.core.aggregation import reconstruct_func, relabel_result
117118
from pandas.core.arrays import Categorical, ExtensionArray
118119
from pandas.core.arrays.datetimelike import DatetimeLikeArrayMixin as DatetimeLikeArray
119120
from pandas.core.arrays.sparse import SparseFrameAccessor
@@ -7301,9 +7302,11 @@ def _gotitem(
73017302
examples=_agg_examples_doc,
73027303
versionadded="\n.. versionadded:: 0.20.0\n",
73037304
)
7304-
def aggregate(self, func, axis=0, *args, **kwargs):
7305+
def aggregate(self, func=None, axis=0, *args, **kwargs):
73057306
axis = self._get_axis_number(axis)
73067307

7308+
relabeling, func, columns, order = reconstruct_func(func, **kwargs)
7309+
73077310
result = None
73087311
try:
73097312
result, how = self._aggregate(func, axis=axis, *args, **kwargs)
@@ -7315,6 +7318,13 @@ def aggregate(self, func, axis=0, *args, **kwargs):
73157318
raise exc from err
73167319
if result is None:
73177320
return self.apply(func, axis=axis, args=args, **kwargs)
7321+
7322+
if relabeling:
7323+
# This is to keep the order to columns occurrence unchanged, and also
7324+
# keep the order of new columns occurrence unchanged
7325+
result_in_dict = relabel_result(result, func, columns, order)
7326+
result = DataFrame(result_in_dict, index=columns)
7327+
73187328
return result
73197329

73207330
def _aggregate(self, arg, axis=0, *args, **kwargs):

pandas/core/groupby/generic.py

+2-20
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,8 @@
5555
from pandas.core.dtypes.missing import isna, notna
5656

5757
from pandas.core.aggregation import (
58-
is_multi_agg_with_relabel,
5958
maybe_mangle_lambdas,
60-
normalize_keyword_aggregation,
59+
reconstruct_func,
6160
validate_func_kwargs,
6261
)
6362
import pandas.core.algorithms as algorithms
@@ -937,24 +936,7 @@ def aggregate(
937936
self, func=None, *args, engine="cython", engine_kwargs=None, **kwargs
938937
):
939938

940-
relabeling = func is None and is_multi_agg_with_relabel(**kwargs)
941-
if relabeling:
942-
func, columns, order = normalize_keyword_aggregation(kwargs)
943-
944-
kwargs = {}
945-
elif isinstance(func, list) and len(func) > len(set(func)):
946-
947-
# GH 28426 will raise error if duplicated function names are used and
948-
# there is no reassigned name
949-
raise SpecificationError(
950-
"Function names must be unique if there is no new column "
951-
"names assigned"
952-
)
953-
elif func is None:
954-
# nicer error message
955-
raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
956-
957-
func = maybe_mangle_lambdas(func)
939+
relabeling, func, columns, order = reconstruct_func(func, **kwargs)
958940

959941
if engine == "numba":
960942
return self._python_agg_general(

pandas/core/series.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -4016,9 +4016,14 @@ def _gotitem(self, key, ndim, subset=None) -> "Series":
40164016
examples=_agg_examples_doc,
40174017
versionadded="\n.. versionadded:: 0.20.0\n",
40184018
)
4019-
def aggregate(self, func, axis=0, *args, **kwargs):
4019+
def aggregate(self, func=None, axis=0, *args, **kwargs):
40204020
# Validate the axis parameter
40214021
self._get_axis_number(axis)
4022+
4023+
# if func is None, will switch to user-provided "named aggregation" kwargs
4024+
if func is None:
4025+
func = dict(kwargs.items())
4026+
40224027
result, how = self._aggregate(func, *args, **kwargs)
40234028
if result is None:
40244029

pandas/tests/frame/apply/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import numpy as np
2+
import pytest
3+
4+
import pandas as pd
5+
import pandas._testing as tm
6+
7+
8+
class TestDataFrameNamedAggregate:
9+
def test_agg_relabel(self):
10+
# GH 26513
11+
df = pd.DataFrame({"A": [1, 2, 1, 2], "B": [1, 2, 3, 4], "C": [3, 4, 5, 6]})
12+
13+
# simplest case with one column, one func
14+
result = df.agg(foo=("B", "sum"))
15+
expected = pd.DataFrame({"B": [10]}, index=pd.Index(["foo"]))
16+
tm.assert_frame_equal(result, expected)
17+
18+
# test on same column with different methods
19+
result = df.agg(foo=("B", "sum"), bar=("B", "min"))
20+
expected = pd.DataFrame({"B": [10, 1]}, index=pd.Index(["foo", "bar"]))
21+
22+
tm.assert_frame_equal(result, expected)
23+
24+
def test_agg_relabel_multi_columns_multi_methods(self):
25+
# GH 26513, test on multiple columns with multiple methods
26+
df = pd.DataFrame({"A": [1, 2, 1, 2], "B": [1, 2, 3, 4], "C": [3, 4, 5, 6]})
27+
result = df.agg(
28+
foo=("A", "sum"),
29+
bar=("B", "mean"),
30+
cat=("A", "min"),
31+
dat=("B", "max"),
32+
f=("A", "max"),
33+
g=("C", "min"),
34+
)
35+
expected = pd.DataFrame(
36+
{
37+
"A": [6.0, np.nan, 1.0, np.nan, 2.0, np.nan],
38+
"B": [np.nan, 2.5, np.nan, 4.0, np.nan, np.nan],
39+
"C": [np.nan, np.nan, np.nan, np.nan, np.nan, 3.0],
40+
},
41+
index=pd.Index(["foo", "bar", "cat", "dat", "f", "g"]),
42+
)
43+
tm.assert_frame_equal(result, expected)
44+
45+
def test_agg_relabel_partial_functions(self):
46+
# GH 26513, test on partial, functools or more complex cases
47+
df = pd.DataFrame({"A": [1, 2, 1, 2], "B": [1, 2, 3, 4], "C": [3, 4, 5, 6]})
48+
result = df.agg(foo=("A", np.mean), bar=("A", "mean"), cat=("A", min))
49+
expected = pd.DataFrame(
50+
{"A": [1.5, 1.5, 1.0]}, index=pd.Index(["foo", "bar", "cat"])
51+
)
52+
tm.assert_frame_equal(result, expected)
53+
54+
result = df.agg(
55+
foo=("A", min),
56+
bar=("A", np.min),
57+
cat=("B", max),
58+
dat=("C", "min"),
59+
f=("B", np.sum),
60+
kk=("B", lambda x: min(x)),
61+
)
62+
expected = pd.DataFrame(
63+
{
64+
"A": [1.0, 1.0, np.nan, np.nan, np.nan, np.nan],
65+
"B": [np.nan, np.nan, 4.0, np.nan, 10.0, 1.0],
66+
"C": [np.nan, np.nan, np.nan, 3.0, np.nan, np.nan],
67+
},
68+
index=pd.Index(["foo", "bar", "cat", "dat", "f", "kk"]),
69+
)
70+
tm.assert_frame_equal(result, expected)
71+
72+
def test_agg_namedtuple(self):
73+
# GH 26513
74+
df = pd.DataFrame({"A": [0, 1], "B": [1, 2]})
75+
result = df.agg(
76+
foo=pd.NamedAgg("B", "sum"),
77+
bar=pd.NamedAgg("B", min),
78+
cat=pd.NamedAgg(column="B", aggfunc="count"),
79+
fft=pd.NamedAgg("B", aggfunc="max"),
80+
)
81+
82+
expected = pd.DataFrame(
83+
{"B": [3, 1, 2, 2]}, index=pd.Index(["foo", "bar", "cat", "fft"])
84+
)
85+
tm.assert_frame_equal(result, expected)
86+
87+
result = df.agg(
88+
foo=pd.NamedAgg("A", "min"),
89+
bar=pd.NamedAgg(column="B", aggfunc="max"),
90+
cat=pd.NamedAgg(column="A", aggfunc="max"),
91+
)
92+
expected = pd.DataFrame(
93+
{"A": [0.0, np.nan, 1.0], "B": [np.nan, 2.0, np.nan]},
94+
index=pd.Index(["foo", "bar", "cat"]),
95+
)
96+
tm.assert_frame_equal(result, expected)
97+
98+
def test_agg_raises(self):
99+
# GH 26513
100+
df = pd.DataFrame({"A": [0, 1], "B": [1, 2]})
101+
msg = "Must provide"
102+
103+
with pytest.raises(TypeError, match=msg):
104+
df.agg()

pandas/tests/series/apply/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)