diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 3b20b854b344e..d71714084de1b 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -373,6 +373,61 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) agg = aggregate + def _agg_for_resample( + self, func=None, *args, engine=None, engine_kwargs=None, **kwargs + ): + relabeling = func is None + columns = None + if relabeling: + columns, func = validate_func_kwargs(kwargs) + kwargs = {} + + if isinstance(func, str): + if maybe_use_numba(engine) and engine is not None: + # Not all agg functions support numba, only propagate numba kwargs + # if user asks for numba, and engine is not None + # (if engine is None, the called function will handle the case where + # numba is requested via the global option) + kwargs["engine"] = engine + if engine_kwargs is not None: + kwargs["engine_kwargs"] = engine_kwargs + return getattr(self, func)(*args, **kwargs) + + elif isinstance(func, abc.Iterable): + # Catch instances of lists / tuples + # but not the class list / tuple itself. + func = maybe_mangle_lambdas(func) + kwargs["engine"] = engine + kwargs["engine_kwargs"] = engine_kwargs + ret = self._aggregate_multiple_funcs(func, *args, **kwargs) + if relabeling: + # columns is not narrowed by mypy from relabeling flag + assert columns is not None # for mypy + ret.columns = columns + if not self.as_index: + ret = ret.reset_index() + return ret + + else: + if maybe_use_numba(engine): + return self._aggregate_with_numba( + func, *args, engine_kwargs=engine_kwargs, **kwargs + ) + + if self.ngroups == 0: + # e.g. test_evaluate_with_empty_groups without any groups to + # iterate over, we have no output on which to do dtype + # inference. We default to using the existing dtype. + # xref GH#51445 + obj = self._obj_with_exclusions + return self.obj._constructor( + [], + name=self.obj.name, + index=self._grouper.result_index, + dtype=obj.dtype, + ) + return self._python_agg_general(func, *args, **kwargs) + def _python_agg_general(self, func, *args, **kwargs): f = lambda x: func(x, *args, **kwargs) @@ -1501,6 +1556,61 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) relabeling, func, columns, order = reconstruct_func(func, **kwargs) func = maybe_mangle_lambdas(func) + if maybe_use_numba(engine): + # Not all agg functions support numba, only propagate numba kwargs + # if user asks for numba + kwargs["engine"] = engine + kwargs["engine_kwargs"] = engine_kwargs + + op = GroupByApply(self, func, args=args, kwargs=kwargs) + result = op.agg() + if not is_dict_like(func) and result is not None: + # GH #52849 + if not self.as_index and is_list_like(func): + return result.reset_index() + else: + return result + elif relabeling: + # this should be the only (non-raising) case with relabeling + # used reordered index of columns + result = cast(DataFrame, result) + result = result.iloc[:, order] + result = cast(DataFrame, result) + # error: Incompatible types in assignment (expression has type + # "Optional[List[str]]", variable has type + # "Union[Union[Union[ExtensionArray, ndarray[Any, Any]], + # Index, Series], Sequence[Any]]") + result.columns = columns # type: ignore[assignment] + + if result is None: + # Remove the kwargs we inserted + # (already stored in engine, engine_kwargs arguments) + if "engine" in kwargs: + del kwargs["engine"] + del kwargs["engine_kwargs"] + # at this point func is not a str, list-like, dict-like, + # or a known callable(e.g. sum) + if maybe_use_numba(engine): + return self._aggregate_with_numba( + func, *args, engine_kwargs=engine_kwargs, **kwargs + ) + # grouper specific aggregations + result = self._python_agg_general(func, *args, **kwargs) + + if not self.as_index: + result = self._insert_inaxis_grouper(result) + result.index = default_index(len(result)) + + return result + + agg = aggregate + + def _agg_for_resample( + self, func=None, *args, engine=None, engine_kwargs=None, **kwargs + ): + relabeling, func, columns, order = reconstruct_func(func, **kwargs) + func = maybe_mangle_lambdas(func) + if maybe_use_numba(engine): # Not all agg functions support numba, only propagate numba kwargs # if user asks for numba @@ -1577,8 +1687,6 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) return result - agg = aggregate - def _python_agg_general(self, func, *args, **kwargs): f = lambda x: func(x, *args, **kwargs) @@ -1589,9 +1697,11 @@ def _python_agg_general(self, func, *args, **kwargs): obj = self._obj_with_exclusions - if not len(obj.columns): + if self._grouper._is_resample and not len(obj.columns): # e.g. test_margins_no_values_no_cols - return self._python_apply_general(f, self._selected_obj) + return obj._constructor( + index=self._grouper.result_index, columns=obj.columns + ) output: dict[int, ArrayLike] = {} for idx, (name, ser) in enumerate(obj.items()): diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index acf4c7bebf52d..3fb40f5022b33 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -589,6 +589,7 @@ def __init__( self._groupings: list[grouper.Grouping] = list(groupings) self._sort = sort self.dropna = dropna + self._is_resample = False @property def groupings(self) -> list[grouper.Grouping]: @@ -940,12 +941,14 @@ def _aggregate_series_pure_python( for i, group in enumerate(splitter): res = func(group) - res = extract_result(res) - if not initialized: - # We only do this validation on the first iteration - check_result_array(res, group.dtype) - initialized = True + if self._is_resample: + res = extract_result(res) + + if not initialized: + # We only do this validation on the first iteration + check_result_array(res, group.dtype) + initialized = True result[i] = res diff --git a/pandas/core/resample.py b/pandas/core/resample.py index 43077e7aeecb4..3efb85d3c68d8 100644 --- a/pandas/core/resample.py +++ b/pandas/core/resample.py @@ -249,6 +249,7 @@ def _get_binner(self): binner, bins, binlabels = self._get_binner_for_time() assert len(bins) == len(binlabels) bin_grouper = BinGrouper(bins, binlabels, indexer=self._indexer) + bin_grouper._is_resample = True return binner, bin_grouper @overload @@ -370,7 +371,21 @@ def aggregate(self, func=None, *args, **kwargs): return result agg = aggregate - apply = aggregate + + @final + @doc( + _shared_docs["aggregate"], + see_also=_agg_see_also_doc, + examples=_agg_examples_doc, + klass="DataFrame", + axis="", + ) + def apply(self, func=None, *args, **kwargs): + result = ResamplerWindowApply(self, func, args=args, kwargs=kwargs).agg() + if result is None: + how = func + result = self._groupby_and_apply(how, *args, **kwargs) + return result @final def transform(self, arg, *args, **kwargs): @@ -487,6 +502,53 @@ def _groupby_and_aggregate(self, how, *args, **kwargs): return self._wrap_result(result) + def _groupby_and_apply(self, how, *args, **kwargs): + """ + Re-evaluate the obj with a groupby aggregation. + """ + grouper = self._grouper + + # Excludes `on` column when provided + obj = self._obj_with_exclusions + + grouped = get_groupby(obj, by=None, grouper=grouper, group_keys=self.group_keys) + + try: + if callable(how): + # TODO: test_resample_apply_with_additional_args fails if we go + # through the non-lambda path, not clear that it should. + func = lambda x: how(x, *args, **kwargs) + result = grouped._agg_for_resample(func) + else: + result = grouped._agg_for_resample(how, *args, **kwargs) + except (AttributeError, KeyError): + # we have a non-reducing function; try to evaluate + # alternatively we want to evaluate only a column of the input + + # test_apply_to_one_column_of_df the function being applied references + # a DataFrame column, but aggregate_item_by_item operates column-wise + # on Series, raising AttributeError or KeyError + # (depending on whether the column lookup uses getattr/__getitem__) + result = _apply( + grouped, how, *args, include_groups=self.include_groups, **kwargs + ) + + except ValueError as err: + if "Must produce aggregated value" in str(err): + # raised in _aggregate_named + # see test_apply_without_aggregation, test_apply_with_mutated_index + pass + else: + raise + + # we have a non-reducing function + # try to evaluate + result = _apply( + grouped, how, *args, include_groups=self.include_groups, **kwargs + ) + + return self._wrap_result(result) + @final def _get_resampler_for_grouping( self, groupby: GroupBy, key, include_groups: bool = True @@ -1530,6 +1592,7 @@ def func(x): _upsample = _apply _downsample = _apply _groupby_and_aggregate = _apply + _groupby_and_apply = _apply @final def _gotitem(self, key, ndim, subset=None): diff --git a/pandas/core/reshape/pivot.py b/pandas/core/reshape/pivot.py index 7b2fbb54f7d35..0a60c2a03e446 100644 --- a/pandas/core/reshape/pivot.py +++ b/pandas/core/reshape/pivot.py @@ -442,7 +442,7 @@ def _generate_marginal_results_without_values( margins_name: Hashable = "All", ): margin_keys: list | Index - if len(cols) > 0: + if len(table.columns) > 0: # need to "interleave" the margins margin_keys = [] diff --git a/pandas/tests/groupby/aggregate/test_aggregate.py b/pandas/tests/groupby/aggregate/test_aggregate.py index 2b9df1b7079da..5178ae107774f 100644 --- a/pandas/tests/groupby/aggregate/test_aggregate.py +++ b/pandas/tests/groupby/aggregate/test_aggregate.py @@ -48,12 +48,27 @@ def test_agg_regression1(tsframe): def test_agg_must_agg(df): grouped = df.groupby("A")["C"] + expected = Series( + { + "bar": df[df.A == "bar"]["C"].describe(), + "foo": df[df.A == "foo"]["C"].describe(), + }, + index=Index(["bar", "foo"], name="A"), + name="C", + ) + result = grouped.agg(lambda x: x.describe()) + tm.assert_series_equal(result, expected) - msg = "Must produce aggregated value" - with pytest.raises(Exception, match=msg): - grouped.agg(lambda x: x.describe()) - with pytest.raises(Exception, match=msg): - grouped.agg(lambda x: x.index[:2]) + expected = Series( + { + "bar": df[df.A == "bar"]["C"].index[:2], + "foo": df[df.A == "foo"]["C"].index[:2], + }, + index=Index(["bar", "foo"], name="A"), + name="C", + ) + result = grouped.agg(lambda x: x.index[:2]) + tm.assert_series_equal(result, expected) def test_agg_ser_multi_key(df): @@ -1156,6 +1171,22 @@ def test_with_kwargs(self): expected = DataFrame({"": [13], "": [30]}) tm.assert_frame_equal(result, expected) + def test_unused_kwargs(self): + # GH#39169 - Passing kwargs used to have agg pass the entire frame rather + # than column-by-column + + # UDF that works on both the entire frame and column-by-column + func = lambda data, **kwargs: np.sum(np.sum(data)) + + df = DataFrame([[1, 2], [3, 4]]) + expected = DataFrame({0: [1, 3], 1: [2, 4]}) + + result = df.groupby(level=0).agg(func) + tm.assert_frame_equal(result, expected) + + result = df.groupby(level=0).agg(func, foo=42) + tm.assert_frame_equal(result, expected) + def test_agg_with_one_lambda(self): # GH 25719, write tests for DataFrameGroupby.agg with only one lambda df = DataFrame( @@ -1242,6 +1273,40 @@ def test_agg_multiple_lambda(self): ) tm.assert_frame_equal(result2, expected) + def test_multiple_udf_same_name(self): + # GH#28570 + quant50 = partial(np.percentile, q=50) + quant70 = partial(np.percentile, q=70) + + df = DataFrame({"col1": ["a", "a", "b", "b", "b"], "col2": [1, 2, 3, 4, 5]}) + expected = DataFrame( + [[1.5, 1.7], [4.0, 4.4]], + index=Index(["a", "b"], name="col1"), + columns=MultiIndex( + levels=[["col2"], ["percentile"]], + codes=[[0, 0], [0, 0]], + ), + ) + gb = df.groupby("col1") + result = gb.agg({"col2": [quant50, quant70]}) + tm.assert_frame_equal(result, expected) + + @pytest.mark.parametrize("use_kwargs", [True, False]) + def test_multiple_udf_with_args(self, use_kwargs): + # GH#26611 + def func(x, y): + return x.sum() + y + + df = DataFrame({"A": [1, 2]}) + expected = DataFrame({"A": [13]}) + gb = df.groupby([0, 0]) + if use_kwargs: + args, kwargs = (), {"y": 10} + else: + args, kwargs = (10,), {} + result = gb.agg(func, *args, **kwargs) + tm.assert_frame_equal(result, expected) + def test_pass_args_kwargs_duplicate_columns(tsframe, as_index): # go through _aggregate_frame with self.axis == 0 and duplicate columns @@ -1435,11 +1500,12 @@ def test_groupby_agg_precision(any_real_numeric_dtype): "key3": pd.array([max_value], dtype=any_real_numeric_dtype), } ) - arrays = [["a"], ["b"]] - index = MultiIndex.from_arrays(arrays, names=("key1", "key2")) expected = DataFrame( - {"key3": pd.array([max_value], dtype=any_real_numeric_dtype)}, index=index + {"key3": [df["key3"]]}, + index=MultiIndex( + levels=[["a"], ["b"]], codes=[[0], [0]], names=["key1", "key2"] + ), ) result = df.groupby(["key1", "key2"]).agg(lambda x: x) tm.assert_frame_equal(result, expected) @@ -1520,26 +1586,25 @@ def test_groupby_complex_raises(func): @pytest.mark.parametrize( - "test, constant", + "test, values", [ - ([[20, "A"], [20, "B"], [10, "C"]], {0: [10, 20], 1: ["C", ["A", "B"]]}), - ([[20, "A"], [20, "B"], [30, "C"]], {0: [20, 30], 1: [["A", "B"], "C"]}), - ([["a", 1], ["a", 1], ["b", 2], ["b", 3]], {0: ["a", "b"], 1: [1, [2, 3]]}), - pytest.param( - [["a", 1], ["a", 2], ["b", 3], ["b", 3]], - {0: ["a", "b"], 1: [[1, 2], 3]}, - marks=pytest.mark.xfail, - ), + ([[20, "A"], [20, "B"], [10, "C"]], [10, 20]), + ([[20, "A"], [20, "B"], [30, "C"]], [20, 30]), + ([["a", 1], ["a", 1], ["b", 2], ["b", 3]], ["a", "b"]), + ([["a", 1], ["a", 2], ["b", 3], ["b", 3]], ["a", "b"]), ], ) -def test_agg_of_mode_list(test, constant): +def test_agg_of_mode_list(test, values): # GH#25581 df1 = DataFrame(test) result = df1.groupby(0).agg(Series.mode) # Mode usually only returns 1 value, but can return a list in the case of a tie. - expected = DataFrame(constant) - expected = expected.set_index(0) + expected = DataFrame( + [[df1[df1[0] == value][1].mode()] for value in values], + index=Index(values, name=0), + columns=[1], + ) tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/groupby/aggregate/test_other.py b/pandas/tests/groupby/aggregate/test_other.py index 12f99e3cf7a63..7363e79cc5303 100644 --- a/pandas/tests/groupby/aggregate/test_other.py +++ b/pandas/tests/groupby/aggregate/test_other.py @@ -619,7 +619,7 @@ def test_agg_lambda_with_timezone(): ) result = df.groupby("tag").agg({"date": lambda e: e.head(1)}) expected = DataFrame( - [pd.Timestamp("2018-01-01", tz="UTC")], + [[df["date"].head(1)]], index=Index([1], name="tag"), columns=["date"], ) diff --git a/pandas/tests/groupby/test_categorical.py b/pandas/tests/groupby/test_categorical.py index 5a43a42aa936f..13a99e1bc39bb 100644 --- a/pandas/tests/groupby/test_categorical.py +++ b/pandas/tests/groupby/test_categorical.py @@ -1318,9 +1318,7 @@ def test_groupby_cat_preserves_structure(observed, ordered): expected = df.copy() result = ( - df.groupby("Name", observed=observed) - .agg(DataFrame.sum, skipna=True) - .reset_index() + df.groupby("Name", observed=observed).agg(Series.sum, skipna=True).reset_index() ) tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/groupby/test_groupby.py b/pandas/tests/groupby/test_groupby.py index 00e781e6a7f07..4119e7a6feaed 100644 --- a/pandas/tests/groupby/test_groupby.py +++ b/pandas/tests/groupby/test_groupby.py @@ -2467,7 +2467,7 @@ def test_by_column_values_with_same_starting_value(dtype): result = df.groupby(["Name"]).agg(aggregate_details) expected_result = DataFrame( { - "Mood": [["happy", "sad"], "happy"], + "Mood": [Series(["happy", "sad"]), Series(["happy"])], "Credit": [2500, 900], "Name": ["Thomas", "Thomas John"], } @@ -2935,6 +2935,7 @@ def test_groupby_dropna_with_nunique_unique(): # GH#42016 df = [[1, 1, 1, "A"], [1, None, 1, "A"], [1, None, 2, "A"], [1, None, 3, "A"]] df_dropna = DataFrame(df, columns=["a", "b", "c", "partner"]) + result = df_dropna.groupby(["a", "b", "c"], dropna=False).agg( {"partner": ["nunique", "unique"]} ) diff --git a/pandas/tests/groupby/test_reductions.py b/pandas/tests/groupby/test_reductions.py index edc94b2beeec1..4ac7790cd2675 100644 --- a/pandas/tests/groupby/test_reductions.py +++ b/pandas/tests/groupby/test_reductions.py @@ -66,10 +66,15 @@ def test_basic_aggregations(dtype): grouped.aggregate({"one": np.mean, "two": np.std}) # corner cases - msg = "Must produce aggregated value" - # exception raised is type Exception - with pytest.raises(Exception, match=msg): - grouped.aggregate(lambda x: x * 2) + result = grouped.aggregate(lambda x: x * 2) + expected = Series( + { + 0: data[data.index // 3 == 0] * 2, + 1: data[data.index // 3 == 1] * 2, + 2: data[data.index // 3 == 2] * 2, + }, + ) + tm.assert_series_equal(result, expected) @pytest.mark.parametrize( diff --git a/pandas/tests/resample/test_resample_api.py b/pandas/tests/resample/test_resample_api.py index f3b9c909290a8..6268a9e90e47d 100644 --- a/pandas/tests/resample/test_resample_api.py +++ b/pandas/tests/resample/test_resample_api.py @@ -643,10 +643,10 @@ def test_agg_list_like_func_with_args(): ) def foo1(x, a=1, c=0): - return x + a + c + return x.sum() + a + c def foo2(x, b=2, c=0): - return x + b + c + return x.sum() + b + c msg = r"foo1\(\) got an unexpected keyword argument 'b'" with pytest.raises(TypeError, match=msg): diff --git a/pandas/tests/reshape/test_pivot.py b/pandas/tests/reshape/test_pivot.py index 99250dc929997..e7fcab4241f60 100644 --- a/pandas/tests/reshape/test_pivot.py +++ b/pandas/tests/reshape/test_pivot.py @@ -1198,8 +1198,15 @@ def test_margins_no_values_no_cols(self, data): result = data[["A", "B"]].pivot_table( index=["A", "B"], aggfunc=len, margins=True ) - result_list = result.tolist() - assert sum(result_list[:-1]) == result_list[-1] + expected = DataFrame( + index=MultiIndex( + levels=[["bar", "foo", "All"], ["one", "two", ""]], + codes=[[0, 0, 1, 1, 2], [0, 1, 0, 1, 2]], + names=["A", "B"], + ), + columns=Index([]), + ) + tm.assert_frame_equal(result, expected) def test_margins_no_values_two_rows(self, data): # Regression test on pivot table: no values passed but rows are a @@ -1207,7 +1214,19 @@ def test_margins_no_values_two_rows(self, data): result = data[["A", "B", "C"]].pivot_table( index=["A", "B"], columns="C", aggfunc=len, margins=True ) - assert result.All.tolist() == [3.0, 1.0, 4.0, 3.0, 11.0] + expected = DataFrame( + index=MultiIndex( + levels=[["bar", "foo", "All"], ["one", "two", ""]], + codes=[[0, 0, 1, 1, 2], [0, 1, 0, 1, 2]], + names=["A", "B"], + ), + columns=MultiIndex( + levels=[[], ["dull", "shiny"]], + codes=[[], []], + names=[None, "C"], + ), + ) + tm.assert_frame_equal(result, expected) def test_margins_no_values_one_row_one_col(self, data): # Regression test on pivot table: no values passed but row and col @@ -1215,7 +1234,15 @@ def test_margins_no_values_one_row_one_col(self, data): result = data[["A", "B"]].pivot_table( index="A", columns="B", aggfunc=len, margins=True ) - assert result.All.tolist() == [4.0, 7.0, 11.0] + expected = DataFrame( + index=Index(["bar", "foo", "All"], name="A"), + columns=MultiIndex( + levels=[[], ["dull", "shiny"]], + codes=[[], []], + names=[None, "B"], + ), + ) + tm.assert_frame_equal(result, expected) def test_margins_no_values_two_row_two_cols(self, data): # Regression test on pivot table: no values passed but rows and cols @@ -1224,7 +1251,19 @@ def test_margins_no_values_two_row_two_cols(self, data): result = data[["A", "B", "C", "D"]].pivot_table( index=["A", "B"], columns=["C", "D"], aggfunc=len, margins=True ) - assert result.All.tolist() == [3.0, 1.0, 4.0, 3.0, 11.0] + expected = DataFrame( + index=MultiIndex( + levels=[["bar", "foo", "All"], ["one", "two", ""]], + codes=[[0, 0, 1, 1, 2], [0, 1, 0, 1, 2]], + names=["A", "B"], + ), + columns=MultiIndex( + levels=[[], ["dull", "shiny"], list("abcdefghijk")], + codes=[[], [], []], + names=[None, "C", "D"], + ), + ) + tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("margin_name", ["foo", "one", 666, None, ["a", "b"]]) def test_pivot_table_with_margins_set_margin_name(self, margin_name, data):