From 868a6d7e8fb7fc4b1cf7bfc00447323d0c49075c Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Wed, 31 May 2023 17:08:51 -0700 Subject: [PATCH 1/3] ENH: Groupby agg support multiple funcs numba --- doc/source/whatsnew/v2.1.0.rst | 1 + pandas/core/apply.py | 14 ++- pandas/core/groupby/generic.py | 39 +++++-- pandas/tests/groupby/aggregate/test_numba.py | 116 +++++++++++++++++-- 4 files changed, 146 insertions(+), 24 deletions(-) diff --git a/doc/source/whatsnew/v2.1.0.rst b/doc/source/whatsnew/v2.1.0.rst index 1a5cc122897a5..95c4e3d8411e4 100644 --- a/doc/source/whatsnew/v2.1.0.rst +++ b/doc/source/whatsnew/v2.1.0.rst @@ -98,6 +98,7 @@ Other enhancements - Performance improvement in :func:`read_csv` (:issue:`52632`) with ``engine="c"`` - :meth:`Categorical.from_codes` has gotten a ``validate`` parameter (:issue:`50975`) - :meth:`DataFrame.stack` gained the ``sort`` keyword to dictate whether the resulting :class:`MultiIndex` levels are sorted (:issue:`15105`) +- :meth:`SeriesGroupby.agg` and :meth:`DataFrameGroupby.agg` now support passing in multiple functions for ``engine="numba"`` (:issue:`53486`) - Added ``engine_kwargs`` parameter to :meth:`DataFrame.to_excel` (:issue:`53220`) - Performance improvement in :func:`concat` with homogeneous ``np.float64`` or ``np.float32`` dtypes (:issue:`52685`) - Performance improvement in :meth:`DataFrame.filter` when ``items`` is given (:issue:`52941`) diff --git a/pandas/core/apply.py b/pandas/core/apply.py index 5cb7b1c8279ab..78c078da3df21 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -403,11 +403,18 @@ def agg_dict_like(self) -> DataFrame | Series: and selected_obj.columns.nunique() < len(selected_obj.columns) ) + # Numba Groupby engine/engine-kwargs passthrough + kwargs = {} + if is_groupby: + engine = self.kwargs.get("engine", None) + engine_kwargs = self.kwargs.get("engine_kwargs", None) + kwargs = {"engine": engine, "engine_kwargs": engine_kwargs} + with context_manager: if selected_obj.ndim == 1: # key only used for output colg = obj._gotitem(selection, ndim=1) - result_data = [colg.agg(how) for _, how in func.items()] + result_data = [colg.agg(how, **kwargs) for _, how in func.items()] result_index = list(func.keys()) elif is_non_unique_col: # key used for column selection and output @@ -422,7 +429,7 @@ def agg_dict_like(self) -> DataFrame | Series: label_to_indices[label].append(index) key_data = [ - selected_obj._ixs(indice, axis=1).agg(how) + selected_obj._ixs(indice, axis=1).agg(how, **kwargs) for label, indices in label_to_indices.items() for indice in indices ] @@ -432,7 +439,8 @@ def agg_dict_like(self) -> DataFrame | Series: else: # key used for column selection and output result_data = [ - obj._gotitem(key, ndim=1).agg(how) for key, how in func.items() + obj._gotitem(key, ndim=1).agg(how, **kwargs) + for key, how in func.items() ] result_index = list(func.keys()) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 37ef04f17a2e5..80e7be0fd3c91 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -223,11 +223,6 @@ def apply(self, func, *args, **kwargs) -> Series: @doc(_agg_template_series, examples=_agg_examples_doc, klass="Series") def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): - if maybe_use_numba(engine): - return self._aggregate_with_numba( - func, *args, engine_kwargs=engine_kwargs, **kwargs - ) - relabeling = func is None columns = None if relabeling: @@ -235,12 +230,19 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) kwargs = {} if isinstance(func, str): + if maybe_use_numba(engine): + # Not all agg functions support numba, only propagate numba kwargs + # if user asks for numba + kwargs["engine"] = engine + kwargs["engine_kwargs"] = engine_kwargs return getattr(self, func)(*args, **kwargs) elif isinstance(func, abc.Iterable): # Catch instances of lists / tuples # but not the class list / tuple itself. func = maybe_mangle_lambdas(func) + kwargs["engine"] = engine + kwargs["engine_kwargs"] = engine_kwargs ret = self._aggregate_multiple_funcs(func, *args, **kwargs) if relabeling: # columns is not narrowed by mypy from relabeling flag @@ -255,6 +257,11 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) if cyfunc and not args and not kwargs: return getattr(self, cyfunc)() + if maybe_use_numba(engine): + return self._aggregate_with_numba( + func, *args, engine_kwargs=engine_kwargs, **kwargs + ) + if self.ngroups == 0: # e.g. test_evaluate_with_empty_groups without any groups to # iterate over, we have no output on which to do dtype @@ -1387,14 +1394,15 @@ class DataFrameGroupBy(GroupBy[DataFrame]): @doc(_agg_template_frame, examples=_agg_examples_doc, klass="DataFrame") def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): - if maybe_use_numba(engine): - return self._aggregate_with_numba( - func, *args, engine_kwargs=engine_kwargs, **kwargs - ) - relabeling, func, columns, order = reconstruct_func(func, **kwargs) func = maybe_mangle_lambdas(func) + if maybe_use_numba(engine): + # Not all agg functions support numba, only propagate numba kwargs + # if user asks for numba + kwargs["engine"] = engine + kwargs["engine_kwargs"] = engine_kwargs + op = GroupByApply(self, func, args=args, kwargs=kwargs) result = op.agg() if not is_dict_like(func) and result is not None: @@ -1416,6 +1424,17 @@ def aggregate(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs) result.columns = columns # type: ignore[assignment] if result is None: + # Remove the kwargs we inserted + # (already stored in engine, engine_kwargs arguments) + if "engine" in kwargs: + del kwargs["engine"] + del kwargs["engine_kwargs"] + # at this point func is not a str, list-like, dict-like, + # or a known callable(e.g. sum) + if maybe_use_numba(engine): + return self._aggregate_with_numba( + func, *args, engine_kwargs=engine_kwargs, **kwargs + ) # grouper specific aggregations if self.grouper.nkeys > 1: # test_groupby_as_index_series_scalar gets here with 'not self.as_index' diff --git a/pandas/tests/groupby/aggregate/test_numba.py b/pandas/tests/groupby/aggregate/test_numba.py index 9dd3d1d45abf0..28c8ee8b1553e 100644 --- a/pandas/tests/groupby/aggregate/test_numba.py +++ b/pandas/tests/groupby/aggregate/test_numba.py @@ -134,25 +134,119 @@ def func_1(values, index): @td.skip_if_no("numba") +# TODO: Add test to check that UDF is still jitted by numba @pytest.mark.parametrize( - "agg_func", + "agg_kwargs", [ - ["min", "max"], - "min", - {"B": ["min", "max"], "C": "sum"}, - NamedAgg(column="B", aggfunc="min"), + {"func": ["min", "max"]}, + {"func": "min"}, + {"func": {1: ["min", "max"], 2: "sum"}}, + {"bmin": NamedAgg(column=1, aggfunc="min")}, ], ) -def test_multifunc_notimplimented(agg_func): +def test_multifunc_numba_vs_cython_frame(agg_kwargs): data = DataFrame( - {0: ["a", "a", "b", "b", "a"], 1: [1.0, 2.0, 3.0, 4.0, 5.0]}, columns=[0, 1] + { + 0: ["a", "a", "b", "b", "a"], + 1: [1.0, 2.0, 3.0, 4.0, 5.0], + 2: [1, 2, 3, 4, 5], + }, + columns=[0, 1, 2], + ) + grouped = data.groupby(0) + result = grouped.agg(**agg_kwargs, engine="numba") + expected = grouped.agg(**agg_kwargs, engine="cython") + # check_dtype can be removed if GH 44952 is addressed + tm.assert_frame_equal(result, expected, check_dtype=False) + + +@td.skip_if_no("numba") +@pytest.mark.parametrize( + "agg_kwargs,expected_func", + [ + ({"func": lambda values, index: values.sum()}, "sum"), + # TODO: This does not work yet (fails in nopython pipeline)! + # ({'func': [lambda values, index: values.sum(), + # lambda values, index: values.min()]}, ['sum', 'min']) + ], +) +def test_multifunc_numba_udf_frame(agg_kwargs, expected_func): + data = DataFrame( + { + 0: ["a", "a", "b", "b", "a"], + 1: [1.0, 2.0, 3.0, 4.0, 5.0], + 2: [1, 2, 3, 4, 5], + }, + columns=[0, 1, 2], ) grouped = data.groupby(0) - with pytest.raises(NotImplementedError, match="Numba engine can"): - grouped.agg(agg_func, engine="numba") + result = grouped.agg(**agg_kwargs, engine="numba") + expected = grouped.agg(expected_func, engine="cython") + # check_dtype can be removed if GH 44952 is addressed + tm.assert_frame_equal(result, expected, check_dtype=False) - with pytest.raises(NotImplementedError, match="Numba engine can"): - grouped[1].agg(agg_func, engine="numba") + +@td.skip_if_no("numba") +@pytest.mark.parametrize( + "agg_kwargs", + [{"func": ["min", "max"]}, {"func": "min"}, {"min_val": "min", "max_val": "max"}], +) +def test_multifunc_numba_vs_cython_series(agg_kwargs): + labels = ["a", "a", "b", "b", "a"] + data = Series([1.0, 2.0, 3.0, 4.0, 5.0]) + grouped = data.groupby(labels) + agg_kwargs["engine"] = "numba" + result = grouped.agg(**agg_kwargs) + agg_kwargs["engine"] = "cython" + expected = grouped.agg(**agg_kwargs) + if isinstance(expected, DataFrame): + tm.assert_frame_equal(result, expected) + else: + tm.assert_series_equal(result, expected) + + +@td.skip_if_no("numba") +@pytest.mark.single_cpu +@pytest.mark.parametrize( + "data,agg_kwargs", + [ + (Series([1.0, 2.0, 3.0, 4.0, 5.0]), {"func": ["min", "max"]}), + (Series([1.0, 2.0, 3.0, 4.0, 5.0]), {"func": "min"}), + ( + DataFrame( + {1: [1.0, 2.0, 3.0, 4.0, 5.0], 2: [1, 2, 3, 4, 5]}, columns=[1, 2] + ), + {"func": ["min", "max"]}, + ), + ( + DataFrame( + {1: [1.0, 2.0, 3.0, 4.0, 5.0], 2: [1, 2, 3, 4, 5]}, columns=[1, 2] + ), + {"func": "min"}, + ), + ( + DataFrame( + {1: [1.0, 2.0, 3.0, 4.0, 5.0], 2: [1, 2, 3, 4, 5]}, columns=[1, 2] + ), + {"func": {1: ["min", "max"], 2: "sum"}}, + ), + ( + DataFrame( + {1: [1.0, 2.0, 3.0, 4.0, 5.0], 2: [1, 2, 3, 4, 5]}, columns=[1, 2] + ), + {"min_col": NamedAgg(column=1, aggfunc="min")}, + ), + ], +) +def test_multifunc_numba_kwarg_propagation(data, agg_kwargs): + labels = ["a", "a", "b", "b", "a"] + grouped = data.groupby(labels) + result = grouped.agg(**agg_kwargs, engine="numba", engine_kwargs={"parallel": True}) + expected = grouped.agg(**agg_kwargs, engine="numba") + if isinstance(expected, DataFrame): + tm.assert_frame_equal(result, expected) + else: + tm.assert_series_equal(result, expected) @td.skip_if_no("numba") From 7b12eb9156382c3c700fb61b98a5c0225fff4e50 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Thu, 1 Jun 2023 11:50:25 -0700 Subject: [PATCH 2/3] address code review --- pandas/tests/groupby/aggregate/test_numba.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pandas/tests/groupby/aggregate/test_numba.py b/pandas/tests/groupby/aggregate/test_numba.py index 28c8ee8b1553e..07ff6067800d3 100644 --- a/pandas/tests/groupby/aggregate/test_numba.py +++ b/pandas/tests/groupby/aggregate/test_numba.py @@ -165,9 +165,19 @@ def test_multifunc_numba_vs_cython_frame(agg_kwargs): "agg_kwargs,expected_func", [ ({"func": lambda values, index: values.sum()}, "sum"), - # TODO: This does not work yet (fails in nopython pipeline)! - # ({'func': [lambda values, index: values.sum(), - # lambda values, index: values.min()]}, ['sum', 'min']) + # FIXME + pytest.param( + { + "func": [ + lambda values, index: values.sum(), + lambda values, index: values.min(), + ] + }, + ["sum", "min"], + marks=pytest.mark.xfail( + reason="This doesn't work yet! Fails in nopython pipeline!" + ), + ), ], ) def test_multifunc_numba_udf_frame(agg_kwargs, expected_func): From c080c4ffac6b4144bf3af5ed09d2194edee0d173 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Thu, 1 Jun 2023 14:18:55 -0700 Subject: [PATCH 3/3] remove old TODO --- pandas/tests/groupby/aggregate/test_numba.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pandas/tests/groupby/aggregate/test_numba.py b/pandas/tests/groupby/aggregate/test_numba.py index 07ff6067800d3..a82c4d0d8ffbc 100644 --- a/pandas/tests/groupby/aggregate/test_numba.py +++ b/pandas/tests/groupby/aggregate/test_numba.py @@ -134,7 +134,6 @@ def func_1(values, index): @td.skip_if_no("numba") -# TODO: Add test to check that UDF is still jitted by numba @pytest.mark.parametrize( "agg_kwargs", [