-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
/
Copy pathaggregation.py
395 lines (331 loc) · 12.5 KB
/
aggregation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
"""
aggregation.py contains utility functions to handle multiple named and lambda
kwarg aggregations in groupby and DataFrame/Series aggregation
"""
from __future__ import annotations
from collections import defaultdict
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Callable,
DefaultDict,
Hashable,
Iterable,
Sequence,
)
from pandas._typing import (
AggFuncType,
FrameOrSeries,
)
from pandas.core.dtypes.common import (
is_dict_like,
is_list_like,
)
from pandas.core.dtypes.generic import ABCSeries
from pandas.core.base import SpecificationError
import pandas.core.common as com
from pandas.core.indexes.api import Index
if TYPE_CHECKING:
from pandas.core.series import Series
def reconstruct_func(
func: AggFuncType | None, **kwargs
) -> tuple[bool, AggFuncType | None, list[str] | None, list[int] | None]:
"""
This is the internal function to reconstruct func given if there is relabeling
or not and also normalize the keyword to get new order of columns.
If named aggregation is applied, `func` will be None, and kwargs contains the
column and aggregation function information to be parsed;
If named aggregation is not applied, `func` is either string (e.g. 'min') or
Callable, or list of them (e.g. ['min', np.max]), or the dictionary of column name
and str/Callable/list of them (e.g. {'A': 'min'}, or {'A': [np.min, lambda x: x]})
If relabeling is True, will return relabeling, reconstructed func, column
names, and the reconstructed order of columns.
If relabeling is False, the columns and order will be None.
Parameters
----------
func: agg function (e.g. 'min' or Callable) or list of agg functions
(e.g. ['min', np.max]) or dictionary (e.g. {'A': ['min', np.max]}).
**kwargs: dict, kwargs used in is_multi_agg_with_relabel and
normalize_keyword_aggregation function for relabelling
Returns
-------
relabelling: bool, if there is relabelling or not
func: normalized and mangled func
columns: list of column names
order: list of columns indices
Examples
--------
>>> reconstruct_func(None, **{"foo": ("col", "min")})
(True, defaultdict(<class 'list'>, {'col': ['min']}), ('foo',), array([0]))
>>> reconstruct_func("min")
(False, 'min', None, None)
"""
relabeling = func is None and is_multi_agg_with_relabel(**kwargs)
columns: list[str] | None = None
order: list[int] | None = None
if not relabeling:
if isinstance(func, list) and len(func) > len(set(func)):
# GH 28426 will raise error if duplicated function names are used and
# there is no reassigned name
raise SpecificationError(
"Function names must be unique if there is no new column names "
"assigned"
)
elif func is None:
# nicer error message
raise TypeError("Must provide 'func' or tuples of '(column, aggfunc).")
if relabeling:
func, columns, order = normalize_keyword_aggregation(kwargs)
return relabeling, func, columns, order
def is_multi_agg_with_relabel(**kwargs) -> bool:
"""
Check whether kwargs passed to .agg look like multi-agg with relabeling.
Parameters
----------
**kwargs : dict
Returns
-------
bool
Examples
--------
>>> is_multi_agg_with_relabel(a="max")
False
>>> is_multi_agg_with_relabel(a_max=("a", "max"), a_min=("a", "min"))
True
>>> is_multi_agg_with_relabel()
False
"""
return all(isinstance(v, tuple) and len(v) == 2 for v in kwargs.values()) and (
len(kwargs) > 0
)
def normalize_keyword_aggregation(kwargs: dict) -> tuple[dict, list[str], list[int]]:
"""
Normalize user-provided "named aggregation" kwargs.
Transforms from the new ``Mapping[str, NamedAgg]`` style kwargs
to the old Dict[str, List[scalar]]].
Parameters
----------
kwargs : dict
Returns
-------
aggspec : dict
The transformed kwargs.
columns : List[str]
The user-provided keys.
col_idx_order : List[int]
List of columns indices.
Examples
--------
>>> normalize_keyword_aggregation({"output": ("input", "sum")})
(defaultdict(<class 'list'>, {'input': ['sum']}), ('output',), array([0]))
"""
# Normalize the aggregation functions as Mapping[column, List[func]],
# process normally, then fixup the names.
# TODO: aggspec type: typing.Dict[str, List[AggScalar]]
# May be hitting https://github.com/python/mypy/issues/5958
# saying it doesn't have an attribute __name__
aggspec: DefaultDict = defaultdict(list)
order = []
columns, pairs = list(zip(*kwargs.items()))
for column, aggfunc in pairs:
aggspec[column].append(aggfunc)
order.append((column, com.get_callable_name(aggfunc) or aggfunc))
# uniquify aggfunc name if duplicated in order list
uniquified_order = _make_unique_kwarg_list(order)
# GH 25719, due to aggspec will change the order of assigned columns in aggregation
# uniquified_aggspec will store uniquified order list and will compare it with order
# based on index
aggspec_order = [
(column, com.get_callable_name(aggfunc) or aggfunc)
for column, aggfuncs in aggspec.items()
for aggfunc in aggfuncs
]
uniquified_aggspec = _make_unique_kwarg_list(aggspec_order)
# get the new index of columns by comparison
col_idx_order = Index(uniquified_aggspec).get_indexer(uniquified_order)
# error: Incompatible return value type (got "Tuple[defaultdict[Any, Any],
# Any, ndarray]", expected "Tuple[Dict[Any, Any], List[str], List[int]]")
return aggspec, columns, col_idx_order # type: ignore[return-value]
def _make_unique_kwarg_list(
seq: Sequence[tuple[Any, Any]]
) -> Sequence[tuple[Any, Any]]:
"""
Uniquify aggfunc name of the pairs in the order list
Examples:
--------
>>> kwarg_list = [('a', '<lambda>'), ('a', '<lambda>'), ('b', '<lambda>')]
>>> _make_unique_kwarg_list(kwarg_list)
[('a', '<lambda>_0'), ('a', '<lambda>_1'), ('b', '<lambda>')]
"""
return [
(pair[0], "_".join([pair[1], str(seq[:i].count(pair))]))
if seq.count(pair) > 1
else pair
for i, pair in enumerate(seq)
]
# TODO: Can't use, because mypy doesn't like us setting __name__
# error: "partial[Any]" has no attribute "__name__"
# the type is:
# typing.Sequence[Callable[..., ScalarResult]]
# -> typing.Sequence[Callable[..., ScalarResult]]:
def _managle_lambda_list(aggfuncs: Sequence[Any]) -> Sequence[Any]:
"""
Possibly mangle a list of aggfuncs.
Parameters
----------
aggfuncs : Sequence
Returns
-------
mangled: list-like
A new AggSpec sequence, where lambdas have been converted
to have unique names.
Notes
-----
If just one aggfunc is passed, the name will not be mangled.
"""
if len(aggfuncs) <= 1:
# don't mangle for .agg([lambda x: .])
return aggfuncs
i = 0
mangled_aggfuncs = []
for aggfunc in aggfuncs:
if com.get_callable_name(aggfunc) == "<lambda>":
aggfunc = partial(aggfunc)
aggfunc.__name__ = f"<lambda_{i}>"
i += 1
mangled_aggfuncs.append(aggfunc)
return mangled_aggfuncs
def maybe_mangle_lambdas(agg_spec: Any) -> Any:
"""
Make new lambdas with unique names.
Parameters
----------
agg_spec : Any
An argument to GroupBy.agg.
Non-dict-like `agg_spec` are pass through as is.
For dict-like `agg_spec` a new spec is returned
with name-mangled lambdas.
Returns
-------
mangled : Any
Same type as the input.
Examples
--------
>>> maybe_mangle_lambdas('sum')
'sum'
>>> maybe_mangle_lambdas([lambda: 1, lambda: 2]) # doctest: +SKIP
[<function __main__.<lambda_0>,
<function pandas...._make_lambda.<locals>.f(*args, **kwargs)>]
"""
is_dict = is_dict_like(agg_spec)
if not (is_dict or is_list_like(agg_spec)):
return agg_spec
mangled_aggspec = type(agg_spec)() # dict or OrderedDict
if is_dict:
for key, aggfuncs in agg_spec.items():
if is_list_like(aggfuncs) and not is_dict_like(aggfuncs):
mangled_aggfuncs = _managle_lambda_list(aggfuncs)
else:
mangled_aggfuncs = aggfuncs
mangled_aggspec[key] = mangled_aggfuncs
else:
mangled_aggspec = _managle_lambda_list(agg_spec)
return mangled_aggspec
def relabel_result(
result: FrameOrSeries,
func: dict[str, list[Callable | str]],
columns: Iterable[Hashable],
order: Iterable[int],
) -> dict[Hashable, Series]:
"""
Internal function to reorder result if relabelling is True for
dataframe.agg, and return the reordered result in dict.
Parameters:
----------
result: Result from aggregation
func: Dict of (column name, funcs)
columns: New columns name for relabelling
order: New order for relabelling
Examples:
---------
>>> result = DataFrame({"A": [np.nan, 2, np.nan],
... "C": [6, np.nan, np.nan], "B": [np.nan, 4, 2.5]}) # doctest: +SKIP
>>> funcs = {"A": ["max"], "C": ["max"], "B": ["mean", "min"]}
>>> columns = ("foo", "aab", "bar", "dat")
>>> order = [0, 1, 2, 3]
>>> _relabel_result(result, func, columns, order) # doctest: +SKIP
dict(A=Series([2.0, NaN, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
C=Series([NaN, 6.0, NaN, NaN], index=["foo", "aab", "bar", "dat"]),
B=Series([NaN, NaN, 2.5, 4.0], index=["foo", "aab", "bar", "dat"]))
"""
reordered_indexes = [
pair[0] for pair in sorted(zip(columns, order), key=lambda t: t[1])
]
reordered_result_in_dict: dict[Hashable, Series] = {}
idx = 0
reorder_mask = not isinstance(result, ABCSeries) and len(result.columns) > 1
for col, fun in func.items():
s = result[col].dropna()
# In the `_aggregate`, the callable names are obtained and used in `result`, and
# these names are ordered alphabetically. e.g.
# C2 C1
# <lambda> 1 NaN
# amax NaN 4.0
# max NaN 4.0
# sum 18.0 6.0
# Therefore, the order of functions for each column could be shuffled
# accordingly so need to get the callable name if it is not parsed names, and
# reorder the aggregated result for each column.
# e.g. if df.agg(c1=("C2", sum), c2=("C2", lambda x: min(x))), correct order is
# [sum, <lambda>], but in `result`, it will be [<lambda>, sum], and we need to
# reorder so that aggregated values map to their functions regarding the order.
# However there is only one column being used for aggregation, not need to
# reorder since the index is not sorted, and keep as is in `funcs`, e.g.
# A
# min 1.0
# mean 1.5
# mean 1.5
if reorder_mask:
fun = [
com.get_callable_name(f) if not isinstance(f, str) else f for f in fun
]
col_idx_order = Index(s.index).get_indexer(fun)
s = s[col_idx_order]
# assign the new user-provided "named aggregation" as index names, and reindex
# it based on the whole user-provided names.
s.index = reordered_indexes[idx : idx + len(fun)]
reordered_result_in_dict[col] = s.reindex(columns, copy=False)
idx = idx + len(fun)
return reordered_result_in_dict
def validate_func_kwargs(
kwargs: dict,
) -> tuple[list[str], list[str | Callable[..., Any]]]:
"""
Validates types of user-provided "named aggregation" kwargs.
`TypeError` is raised if aggfunc is not `str` or callable.
Parameters
----------
kwargs : dict
Returns
-------
columns : List[str]
List of user-provied keys.
func : List[Union[str, callable[...,Any]]]
List of user-provided aggfuncs
Examples
--------
>>> validate_func_kwargs({'one': 'min', 'two': 'max'})
(['one', 'two'], ['min', 'max'])
"""
tuple_given_message = "func is expected but received {} in **kwargs."
columns = list(kwargs)
func = []
for col_func in kwargs.values():
if not (isinstance(col_func, str) or callable(col_func)):
raise TypeError(tuple_given_message.format(type(col_func).__name__))
func.append(col_func)
if not columns:
no_arg_message = "Must provide 'func' or named aggregation **kwargs."
raise TypeError(no_arg_message)
return columns, func