From 194bf7f6fc426dc66a253e396cada42a8861ebec Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 5 Apr 2020 21:59:51 -0700 Subject: [PATCH 01/16] Add engine keywords to aggregate signature --- pandas/core/groupby/generic.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 88580f6ebb3ed..01b5fc0c0e9a9 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -236,7 +236,9 @@ def apply(self, func, *args, **kwargs): axis="", ) @Appender(_shared_docs["aggregate"]) - def aggregate(self, func=None, *args, **kwargs): + def aggregate( + self, func=None, engine="cython", engine_kwargs=None, *args, **kwargs + ): relabeling = func is None columns = None @@ -909,7 +911,9 @@ class DataFrameGroupBy(GroupBy[DataFrame]): axis="", ) @Appender(_shared_docs["aggregate"]) - def aggregate(self, func=None, *args, **kwargs): + def aggregate( + self, func=None, engine="cython", engine_kwargs=None, *args, **kwargs + ): relabeling = func is None and is_multi_agg_with_relabel(**kwargs) if relabeling: From 7d4237939eed205fa4830f3a2132a3aeeb1d7805 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 5 Apr 2020 23:04:40 -0700 Subject: [PATCH 02/16] ENH: Add numba engine to groupby.transform --- pandas/core/groupby/generic.py | 13 +++++++++++-- pandas/core/groupby/groupby.py | 24 ++++++++++++++++++------ pandas/core/groupby/ops.py | 12 ++++++++++-- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 01b5fc0c0e9a9..ff2d199f90f4c 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -266,10 +266,14 @@ def aggregate( return getattr(self, cyfunc)() if self.grouper.nkeys > 1: - return self._python_agg_general(func, *args, **kwargs) + return self._python_agg_general( + func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) try: - return self._python_agg_general(func, *args, **kwargs) + return self._python_agg_general( + func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) except (ValueError, KeyError): # TODO: KeyError is raised in _python_agg_general, # see see test_groupby.test_basic @@ -934,6 +938,11 @@ def aggregate( func = maybe_mangle_lambdas(func) + if engine == "numba": + return self._python_agg_general( + func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) + result, how = self._aggregate(func, *args, **kwargs) if how is None: return result diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 1474e173b4f8c..a0f15d01b9f46 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -879,7 +879,9 @@ def _cython_agg_general( return self._wrap_aggregated_output(output) - def _python_agg_general(self, func, *args, **kwargs): + def _python_agg_general( + self, func, *args, engine="cython", engine_kwargs=None, **kwargs + ): func = self._is_builtin_func(func) f = lambda x: func(x, *args, **kwargs) @@ -892,11 +894,21 @@ def _python_agg_general(self, func, *args, **kwargs): # agg_series below assumes ngroups > 0 continue - try: - # if this function is invalid for this dtype, we will ignore it. - result, counts = self.grouper.agg_series(obj, f) - except TypeError: - continue + if engine == "numba": + result, counts = self.grouper.agg_series( + obj, + func, + *args, + engine=engine, + engine_kwargs=engine_kwargs, + **kwargs, + ) + else: + try: + # if this function is invalid for this dtype, we will ignore it. + result, counts = self.grouper.agg_series(obj, f) + except TypeError: + continue assert result is not None key = base.OutputKey(label=name, position=idx) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 8d535374a083f..8e60e8e3852dc 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -608,10 +608,16 @@ def _transform( return result - def agg_series(self, obj: Series, func): + def agg_series( + self, obj: Series, func, *args, engine="cython", engine_kwargs=None, **kwargs + ): # Caller is responsible for checking ngroups != 0 assert self.ngroups != 0 + if engine == "numba": + return self._aggregate_series_pure_python( + obj, func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs + ) if len(obj) == 0: # SeriesGrouper would raise if we were to call _aggregate_series_fast return self._aggregate_series_pure_python(obj, func) @@ -656,7 +662,9 @@ def _aggregate_series_fast(self, obj: Series, func): result, counts = grouper.get_result() return result, counts - def _aggregate_series_pure_python(self, obj: Series, func): + def _aggregate_series_pure_python( + self, obj: Series, func, *args, engine="cython", engine_kwargs=None, **kwargs + ): group_index, _, ngroups = self.group_info From 2124b8174ee1ec0a28cf5b62a041c6cf410b36b1 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 7 Apr 2020 22:40:31 -0700 Subject: [PATCH 03/16] include numba jitted func in agg routine --- pandas/core/groupby/generic.py | 1 + pandas/core/groupby/ops.py | 24 +++++++++++++++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index ff2d199f90f4c..95670c00774bf 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -275,6 +275,7 @@ def aggregate( func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs ) except (ValueError, KeyError): + # Do not catch Numba errors here, we want to raise and not fall back. # TODO: KeyError is raised in _python_agg_general, # see see test_groupby.test_basic result = self._aggregate_named(func, *args, **kwargs) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 8e60e8e3852dc..aa4133deb5b0c 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -54,6 +54,13 @@ get_group_index_sorter, get_indexer_dict, ) +from pandas.core.util.numba_ import ( + check_kwargs_and_nopython, + get_jit_arguments, + jit_user_function, + split_for_numba, + validate_udf, +) class BaseGrouper: @@ -666,6 +673,14 @@ def _aggregate_series_pure_python( self, obj: Series, func, *args, engine="cython", engine_kwargs=None, **kwargs ): + if engine == "numba": + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + check_kwargs_and_nopython(kwargs, nopython) + validate_udf(func) + numba_func = self._numba_func_cache.get( + func, jit_user_function(func, nopython, nogil, parallel) + ) + group_index, _, ngroups = self.group_info counts = np.zeros(ngroups, dtype=int) @@ -674,7 +689,14 @@ def _aggregate_series_pure_python( splitter = get_splitter(obj, group_index, ngroups, axis=0) for label, group in splitter: - res = func(group) + if engine == "numba": + values, index, _ = split_for_numba(group) + res = numba_func(values, index, *args) + if func not in self._numba_func_cache: + self._numba_func_cache[func] = numba_func + else: + res = func(group, *args, **kwargs) + if result is None: if isinstance(res, (Series, Index, np.ndarray)): if len(res) == 1: From 0f8a692cb24d69f30effc2d920919d13e53cdeb7 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 12 Apr 2020 18:56:37 -0700 Subject: [PATCH 04/16] Add util functions --- pandas/core/util/numba_.py | 105 +++++++++++++++++++++++++++++++++++-- 1 file changed, 101 insertions(+), 4 deletions(-) diff --git a/pandas/core/util/numba_.py b/pandas/core/util/numba_.py index e4debab2c22ee..a7f891ddd63ec 100644 --- a/pandas/core/util/numba_.py +++ b/pandas/core/util/numba_.py @@ -1,15 +1,33 @@ """Common utilities for Numba operations""" +import inspect import types -from typing import Callable, Dict, Optional +from typing import Callable, Dict, Optional, Tuple import numpy as np +from pandas._typing import FrameOrSeries from pandas.compat._optional import import_optional_dependency def check_kwargs_and_nopython( kwargs: Optional[Dict] = None, nopython: Optional[bool] = None -): +) -> None: + """ + Validate that **kwargs and nopython=True was passed + https://github.com/numba/numba/issues/2916 + Parameters + ---------- + kwargs : dict, default None + user passed keyword arguments to pass into the JITed function + nopython : bool, default None + nopython parameter + Returns + ------- + None + Raises + ------ + ValueError + """ if kwargs and nopython: raise ValueError( "numba does not support kwargs with nopython=True: " @@ -17,9 +35,19 @@ def check_kwargs_and_nopython( ) -def get_jit_arguments(engine_kwargs: Optional[Dict[str, bool]] = None): +def get_jit_arguments( + engine_kwargs: Optional[Dict[str, bool]] = None +) -> Tuple[bool, bool, bool]: """ Return arguments to pass to numba.JIT, falling back on pandas default JIT settings. + Parameters + ---------- + engine_kwargs : dict, default None + user passed keyword arguments for numba.JIT + Returns + ------- + (bool, bool, bool) + nopython, nogil, parallel """ if engine_kwargs is None: engine_kwargs = {} @@ -30,9 +58,25 @@ def get_jit_arguments(engine_kwargs: Optional[Dict[str, bool]] = None): return nopython, nogil, parallel -def jit_user_function(func: Callable, nopython: bool, nogil: bool, parallel: bool): +def jit_user_function( + func: Callable, nopython: bool, nogil: bool, parallel: bool +) -> Callable: """ JIT the user's function given the configurable arguments. + Parameters + ---------- + func : function + user defined function + nopython : bool + nopython parameter for numba.JIT + nogil : bool + nogil parameter for numba.JIT + parallel : bool + parallel parameter for numba.JIT + Returns + ------- + function + Numba JITed function """ numba = import_optional_dependency("numba") @@ -56,3 +100,56 @@ def impl(data, *_args): return impl return numba_func + + +def split_for_numba(arg: FrameOrSeries) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Split pandas object into its components as numpy arrays for numba functions. + Parameters + ---------- + arg : Series or DataFrame + Returns + ------- + (ndarray, ndarray, ndarray) + values, index, columns + """ + if getattr(arg, "columns", None) is not None: + columns_as_array = arg.columns.to_numpy() + else: + columns_as_array = None + return arg.to_numpy(), arg.index.to_numpy(), columns_as_array + + +def validate_udf(func: Callable, include_columns: bool = False) -> None: + """ + Validate user defined function for ops when using Numba. + For routines that pass Series objects, the first signature arguments should include: + def f(values, index, ...): + ... + For routines that pass DataFrame objects, the first signature arguments should + include: + def f(values, index, columns, ...): + ... + Parameters + ---------- + func : function, default False + user defined function + include_columns : bool + whether 'columns' should be in the signature + Returns + ------- + None + """ + udf_signature = list(inspect.signature(func).parameters.keys()) + expected_args = ["values", "index"] + if include_columns: + expected_args.append("columns") + min_number_args = len(expected_args) + if ( + len(udf_signature) < min_number_args + or udf_signature[:min_number_args] != expected_args + ): + raise ValueError( + f"The first {min_number_args} arguments to {func.__name__} must be " + f"{expected_args}" + ) \ No newline at end of file From 1d09ce120865766b042dd74c166fdec593827e74 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 12 Apr 2020 20:58:58 -0700 Subject: [PATCH 05/16] Add cache and more routines --- pandas/core/groupby/generic.py | 4 ++++ pandas/core/groupby/groupby.py | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 05e6982fa8856..f80d6c1973fa1 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -154,6 +154,8 @@ def pinner(cls): class SeriesGroupBy(GroupBy[Series]): _apply_whitelist = base.series_apply_whitelist + _numba_func_cache: Dict[Callable, Callable] = {} + def _iterate_slices(self) -> Iterable[Series]: yield self._selected_obj @@ -826,6 +828,8 @@ class DataFrameGroupBy(GroupBy[DataFrame]): _apply_whitelist = base.dataframe_apply_whitelist + _numba_func_cache: Dict[Callable, Callable] = {} + _agg_see_also_doc = dedent( """ See Also diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 4049b7a21c9cc..fb3c7433ad64f 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -885,7 +885,8 @@ def _python_agg_general( self, func, *args, engine="cython", engine_kwargs=None, **kwargs ): func = self._is_builtin_func(func) - f = lambda x: func(x, *args, **kwargs) + if engine != 'numba': + f = lambda x: func(x, *args, **kwargs) # iterate through "columns" ex exclusions to populate output dict output: Dict[base.OutputKey, np.ndarray] = {} From b43f18332d9905567a6617145a62559a19cd29e6 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 17 Apr 2020 15:55:47 -0700 Subject: [PATCH 06/16] minimize whitespace diff --- pandas/core/util/numba_.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pandas/core/util/numba_.py b/pandas/core/util/numba_.py index 4522d89a117f6..77a9a34c414a1 100644 --- a/pandas/core/util/numba_.py +++ b/pandas/core/util/numba_.py @@ -15,15 +15,18 @@ def check_kwargs_and_nopython( """ Validate that **kwargs and nopython=True was passed https://github.com/numba/numba/issues/2916 + Parameters ---------- kwargs : dict, default None user passed keyword arguments to pass into the JITed function nopython : bool, default None nopython parameter + Returns ------- None + Raises ------ ValueError @@ -40,10 +43,12 @@ def get_jit_arguments( ) -> Tuple[bool, bool, bool]: """ Return arguments to pass to numba.JIT, falling back on pandas default JIT settings. + Parameters ---------- engine_kwargs : dict, default None user passed keyword arguments for numba.JIT + Returns ------- (bool, bool, bool) @@ -63,6 +68,7 @@ def jit_user_function( ) -> Callable: """ JIT the user's function given the configurable arguments. + Parameters ---------- func : function @@ -73,6 +79,7 @@ def jit_user_function( nogil parameter for numba.JIT parallel : bool parallel parameter for numba.JIT + Returns ------- function @@ -105,9 +112,11 @@ def impl(data, *_args): def split_for_numba(arg: FrameOrSeries) -> Tuple[np.ndarray, np.ndarray]: """ Split pandas object into its components as numpy arrays for numba functions. + Parameters ---------- arg : Series or DataFrame + Returns ------- (ndarray, ndarray) @@ -119,13 +128,17 @@ def split_for_numba(arg: FrameOrSeries) -> Tuple[np.ndarray, np.ndarray]: def validate_udf(func: Callable) -> None: """ Validate user defined function for ops when using Numba. + The first signature arguments should include: + def f(values, index, ...): ... + Parameters ---------- func : function, default False user defined function + Returns ------- None From f30ba2b9d0e263bcb3df734a775cfd946e89c08f Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 17 Apr 2020 16:10:23 -0700 Subject: [PATCH 07/16] fix split by numba call --- pandas/core/groupby/ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index aa4133deb5b0c..2054388e81f2c 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -690,7 +690,7 @@ def _aggregate_series_pure_python( for label, group in splitter: if engine == "numba": - values, index, _ = split_for_numba(group) + values, index = split_for_numba(group) res = numba_func(values, index, *args) if func not in self._numba_func_cache: self._numba_func_cache[func] = numba_func From 6e4cdd15d771cb404716a2f8aff1278abfcd0138 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Mon, 20 Apr 2020 22:11:56 -0700 Subject: [PATCH 08/16] Use global cache correctly --- pandas/core/groupby/ops.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 2054388e81f2c..b2d7bf85437be 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -55,6 +55,7 @@ get_indexer_dict, ) from pandas.core.util.numba_ import ( + NUMBA_FUNC_CACHE, check_kwargs_and_nopython, get_jit_arguments, jit_user_function, @@ -677,8 +678,9 @@ def _aggregate_series_pure_python( nopython, nogil, parallel = get_jit_arguments(engine_kwargs) check_kwargs_and_nopython(kwargs, nopython) validate_udf(func) - numba_func = self._numba_func_cache.get( - func, jit_user_function(func, nopython, nogil, parallel) + cache_key = (func, "groupby_agg") + numba_func = NUMBA_FUNC_CACHE.get( + cache_key, jit_user_function(func, nopython, nogil, parallel) ) group_index, _, ngroups = self.group_info @@ -692,8 +694,8 @@ def _aggregate_series_pure_python( if engine == "numba": values, index = split_for_numba(group) res = numba_func(values, index, *args) - if func not in self._numba_func_cache: - self._numba_func_cache[func] = numba_func + if cache_key not in NUMBA_FUNC_CACHE: + NUMBA_FUNC_CACHE[cache_key] = numba_func else: res = func(group, *args, **kwargs) From 7ffe304b877c56bb2f0bdaa4e6d47a6cbf87be7b Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Mon, 20 Apr 2020 23:16:42 -0700 Subject: [PATCH 09/16] Raise for numba specific errors, add tests --- pandas/core/groupby/generic.py | 5 +- pandas/core/util/numba_.py | 19 ++++ pandas/tests/groupby/aggregate/test_numba.py | 114 +++++++++++++++++++ 3 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 pandas/tests/groupby/aggregate/test_numba.py diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 7b77ee54c23b1..d6af62b0fe295 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -79,6 +79,7 @@ NUMBA_FUNC_CACHE, check_kwargs_and_nopython, get_jit_arguments, + is_numba_util_related_error, jit_user_function, split_for_numba, validate_udf, @@ -282,8 +283,10 @@ def aggregate( return self._python_agg_general( func, *args, engine=engine, engine_kwargs=engine_kwargs, **kwargs ) - except (ValueError, KeyError): + except (ValueError, KeyError) as err: # Do not catch Numba errors here, we want to raise and not fall back. + if is_numba_util_related_error(str(err)): + raise err # TODO: KeyError is raised in _python_agg_general, # see see test_groupby.test_basic result = self._aggregate_named(func, *args, **kwargs) diff --git a/pandas/core/util/numba_.py b/pandas/core/util/numba_.py index 8836857b14156..215248f5a43c2 100644 --- a/pandas/core/util/numba_.py +++ b/pandas/core/util/numba_.py @@ -12,6 +12,25 @@ NUMBA_FUNC_CACHE: Dict[Tuple[Callable, str], Callable] = dict() +def is_numba_util_related_error(err_message: str) -> bool: + """ + Check if an error was raised from one of the numba utility functions + + For cases where a try/except block has mistakenly caught the error + and we want to re-raise + + Parameters + ---------- + err_message : str, + exception error message + + Returns + ------- + bool + """ + return "The first" in err_message or "numba does not" in err_message + + def check_kwargs_and_nopython( kwargs: Optional[Dict] = None, nopython: Optional[bool] = None ) -> None: diff --git a/pandas/tests/groupby/aggregate/test_numba.py b/pandas/tests/groupby/aggregate/test_numba.py new file mode 100644 index 0000000000000..70b0a027f1bd1 --- /dev/null +++ b/pandas/tests/groupby/aggregate/test_numba.py @@ -0,0 +1,114 @@ +import numpy as np +import pytest + +import pandas.util._test_decorators as td + +from pandas import DataFrame +import pandas._testing as tm +from pandas.core.util.numba_ import NUMBA_FUNC_CACHE + + +@td.skip_if_no("numba", "0.46.0") +def test_correct_function_signature(): + def incorrect_function(x): + return sum(x) * 2.7 + + data = DataFrame( + {"key": ["a", "a", "b", "b", "a"], "data": [1.0, 2.0, 3.0, 4.0, 5.0]}, + columns=["key", "data"], + ) + with pytest.raises(ValueError, match=f"The first 2"): + data.groupby("key").agg(incorrect_function, engine="numba") + + with pytest.raises(ValueError, match=f"The first 2"): + data.groupby("key")["data"].agg(incorrect_function, engine="numba") + + +@td.skip_if_no("numba", "0.46.0") +def test_check_nopython_kwargs(): + def incorrect_function(x, **kwargs): + return sum(x) * 2.7 + + data = DataFrame( + {"key": ["a", "a", "b", "b", "a"], "data": [1.0, 2.0, 3.0, 4.0, 5.0]}, + columns=["key", "data"], + ) + with pytest.raises(ValueError, match="numba does not support"): + data.groupby("key").agg(incorrect_function, engine="numba", a=1) + + with pytest.raises(ValueError, match="numba does not support"): + data.groupby("key")["data"].agg(incorrect_function, engine="numba", a=1) + + +@td.skip_if_no("numba", "0.46.0") +@pytest.mark.filterwarnings("ignore:\\nThe keyword argument") +# Filter warnings when parallel=True and the function can't be parallelized by Numba +@pytest.mark.parametrize("jit", [True, False]) +@pytest.mark.parametrize("pandas_obj", ["Series", "DataFrame"]) +def test_numba_vs_cython(jit, pandas_obj, nogil, parallel, nopython): + def func_numba(values, index): + return np.mean(values) * 2.7 + + if jit: + # Test accepted jitted functions + import numba + + func_numba = numba.jit(func_numba) + + data = DataFrame( + {0: ["a", "a", "b", "b", "a"], 1: [1.0, 2.0, 3.0, 4.0, 5.0]}, columns=[0, 1], + ) + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + grouped = data.groupby(0) + if pandas_obj == "Series": + grouped = grouped[1] + + result = grouped.agg(func_numba, engine="numba", engine_kwargs=engine_kwargs) + expected = grouped.agg(lambda x: np.mean(x) * 2.7, engine="cython") + + tm.assert_equal(result, expected) + + +@td.skip_if_no("numba", "0.46.0") +@pytest.mark.filterwarnings("ignore:\\nThe keyword argument") +# Filter warnings when parallel=True and the function can't be parallelized by Numba +@pytest.mark.parametrize("jit", [True, False]) +@pytest.mark.parametrize("pandas_obj", ["Series", "DataFrame"]) +def test_cache(jit, pandas_obj, nogil, parallel, nopython): + # Test that the functions are cached correctly if we switch functions + def func_1(values, index): + return np.mean(values) - 3.4 + + def func_2(values, index): + return np.mean(values) * 2.7 + + if jit: + import numba + + func_1 = numba.jit(func_1) + func_2 = numba.jit(func_2) + + data = DataFrame( + {0: ["a", "a", "b", "b", "a"], 1: [1.0, 2.0, 3.0, 4.0, 5.0]}, columns=[0, 1], + ) + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + grouped = data.groupby(0) + if pandas_obj == "Series": + grouped = grouped[1] + + result = grouped.agg(func_1, engine="numba", engine_kwargs=engine_kwargs) + expected = grouped.agg(lambda x: np.mean(x) - 3.4, engine="cython") + tm.assert_equal(result, expected) + # func_1 should be in the cache now + assert (func_1, "groupby_agg") in NUMBA_FUNC_CACHE + + # Add func_2 to the cache + result = grouped.agg(func_2, engine="numba", engine_kwargs=engine_kwargs) + expected = grouped.agg(lambda x: np.mean(x) * 2.7, engine="cython") + tm.assert_equal(result, expected) + assert (func_2, "groupby_agg") in NUMBA_FUNC_CACHE + + # Retest func_1 which should use the cache + result = grouped.agg(func_1, engine="numba", engine_kwargs=engine_kwargs) + expected = grouped.agg(lambda x: np.mean(x) - 3.4, engine="cython") + tm.assert_equal(result, expected) From 9fc1068283688d472db82a830ca4e31e94fd2b84 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 21 Apr 2020 09:22:30 -0700 Subject: [PATCH 10/16] Add benchmarks for new engine --- asv_bench/benchmarks/groupby.py | 58 +++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index eb637c78806c0..d3f7d053fecc1 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -660,4 +660,62 @@ def function(values): self.grouper.transform(function, engine="cython") +class AggEngine: + def setup(self): + N = 10 ** 3 + data = DataFrame( + {0: [str(i) for i in range(100)] * N, 1: list(range(100)) * N}, + columns=[0, 1], + ) + self.grouper = data.groupby(0) + + def time_series_numba(self): + def function(values, index): + total = 0 + for i, value in enumerate(values): + if i % 2: + total += value + 5 + else: + total += value * 2 + return total + + self.grouper[1].transform(function, engine="numba") + + def time_series_cython(self): + def function(values): + total = 0 + for i, value in enumerate(values): + if i % 2: + total += value + 5 + else: + total += value * 2 + return total + + self.grouper[1].transform(function, engine="cython") + + def time_dataframe_numba(self): + def function(values, index): + total = 0 + for i, value in enumerate(values): + if i % 2: + total += value + 5 + else: + total += value * 2 + return total + + self.grouper.transform(function, engine="numba") + + def time_dataframe_cython(self): + def function(values): + total = 0 + for i, value in enumerate(values): + if i % 2: + total += value + 5 + else: + total += value * 2 + return total + + self.grouper.transform(function, engine="cython") + + from .pandas_vb_common import setup # noqa: F401 isort:skip From 4d1cbd5c4d9a2d68e3a46611acbb27c4988310aa Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 21 Apr 2020 09:31:21 -0700 Subject: [PATCH 11/16] Add whatsnew entry --- doc/source/whatsnew/v1.1.0.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v1.1.0.rst b/doc/source/whatsnew/v1.1.0.rst index 07849702c646d..38a7a1919890b 100644 --- a/doc/source/whatsnew/v1.1.0.rst +++ b/doc/source/whatsnew/v1.1.0.rst @@ -98,7 +98,7 @@ Other enhancements This can be used to set a custom compression level, e.g., ``df.to_csv(path, compression={'method': 'gzip', 'compresslevel': 1}`` (:issue:`33196`) -- :meth:`~pandas.core.groupby.GroupBy.transform` has gained ``engine`` and ``engine_kwargs`` arguments that supports executing functions with ``Numba`` (:issue:`32854`) +- :meth:`~pandas.core.groupby.GroupBy.transform` and :meth:`~pandas.core.groupby.GroupBy.aggregate` has gained ``engine`` and ``engine_kwargs`` arguments that supports executing functions with ``Numba`` (:issue:`32854`, :issue:`33388`) - .. --------------------------------------------------------------------------- From 75541908afaac105f99d4f487be233f16cf64aa5 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 21 Apr 2020 11:07:01 -0700 Subject: [PATCH 12/16] Fix benchmarks and lint --- asv_bench/benchmarks/groupby.py | 8 ++++---- pandas/core/groupby/groupby.py | 2 +- pandas/core/groupby/ops.py | 4 +++- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index d3f7d053fecc1..c9ac275cc4ea7 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -679,7 +679,7 @@ def function(values, index): total += value * 2 return total - self.grouper[1].transform(function, engine="numba") + self.grouper[1].agg(function, engine="numba") def time_series_cython(self): def function(values): @@ -691,7 +691,7 @@ def function(values): total += value * 2 return total - self.grouper[1].transform(function, engine="cython") + self.grouper[1].agg(function, engine="cython") def time_dataframe_numba(self): def function(values, index): @@ -703,7 +703,7 @@ def function(values, index): total += value * 2 return total - self.grouper.transform(function, engine="numba") + self.grouper.agg(function, engine="numba") def time_dataframe_cython(self): def function(values): @@ -715,7 +715,7 @@ def function(values): total += value * 2 return total - self.grouper.transform(function, engine="cython") + self.grouper.agg(function, engine="cython") from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index d62bebc0b0f24..6924c7d320bc4 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -914,7 +914,7 @@ def _python_agg_general( self, func, *args, engine="cython", engine_kwargs=None, **kwargs ): func = self._is_builtin_func(func) - if engine != 'numba': + if engine != "numba": f = lambda x: func(x, *args, **kwargs) # iterate through "columns" ex exclusions to populate output dict diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index b2d7bf85437be..3c7794fa52d86 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -874,7 +874,9 @@ def groupings(self) -> "List[grouper.Grouping]": for lvl, name in zip(self.levels, self.names) ] - def agg_series(self, obj: Series, func): + def agg_series( + self, obj: Series, func, *args, engine="cython", engine_kwargs=None, **kwargs + ): # Caller is responsible for checking ngroups != 0 assert self.ngroups != 0 assert len(self.bins) > 0 # otherwise we'd get IndexError in get_result From 07292305f2f0a1f8ae0fa0d0512e3160188434af Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Wed, 22 Apr 2020 00:05:21 -0700 Subject: [PATCH 13/16] Reorder function arguments --- pandas/core/groupby/generic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index d6af62b0fe295..18752cdc1642e 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -246,7 +246,7 @@ def apply(self, func, *args, **kwargs): ) @Appender(_shared_docs["aggregate"]) def aggregate( - self, func=None, engine="cython", engine_kwargs=None, *args, **kwargs + self, func=None, *args, engine="cython", engine_kwargs=None, **kwargs ): relabeling = func is None @@ -952,7 +952,7 @@ class DataFrameGroupBy(GroupBy[DataFrame]): ) @Appender(_shared_docs["aggregate"]) def aggregate( - self, func=None, engine="cython", engine_kwargs=None, *args, **kwargs + self, func=None, *args, engine="cython", engine_kwargs=None, **kwargs ): relabeling = func is None and is_multi_agg_with_relabel(**kwargs) From 7a9055c89038c7f0b8d3b084a74b2f4b9c66bc69 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 23 Apr 2020 21:20:51 -0700 Subject: [PATCH 14/16] Add documentation about groupby functions with numba access --- doc/source/user_guide/computation.rst | 4 +- doc/source/user_guide/groupby.rst | 61 +++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/doc/source/user_guide/computation.rst b/doc/source/user_guide/computation.rst index d7d025981f2f4..37ec7ca9c98d6 100644 --- a/doc/source/user_guide/computation.rst +++ b/doc/source/user_guide/computation.rst @@ -380,8 +380,8 @@ and their default values are set to ``False``, ``True`` and ``False`` respective .. note:: In terms of performance, **the first time a function is run using the Numba engine will be slow** - as Numba will have some function compilation overhead. However, ``rolling`` objects will cache - the function and subsequent calls will be fast. In general, the Numba engine is performant with + as Numba will have some function compilation overhead. However, the compiled functions are cached, + and subsequent calls will be fast. In general, the Numba engine is performant with a larger amount of data points (e.g. 1+ million). .. code-block:: ipython diff --git a/doc/source/user_guide/groupby.rst b/doc/source/user_guide/groupby.rst index 5927f1a4175ee..18d611549550f 100644 --- a/doc/source/user_guide/groupby.rst +++ b/doc/source/user_guide/groupby.rst @@ -1021,6 +1021,67 @@ that is itself a series, and possibly upcast the result to a DataFrame: the output as well as set the indices. +Numba Accelerated Routines +-------------------------- + +.. versionadded:: 1.1 + +If `Numba `__ is installed as an optional dependency, the ``transform`` and +``aggregate`` methods support ``engine='numba'`` and ``engine_kwargs`` arguments. The ``engine_kwargs`` +argument is a dictionary of keyword arguments that will be passed into the +`numba.jit decorator `__. +These keyword arguments will be applied to the passed function. Currently only ``nogil``, ``nopython``, +and ``parallel`` are supported, and their default values are set to ``False``, ``True`` and ``False`` respectively. + +The function signature must start with ``values, index`` **exactly** as the data belonging to each group +will be passed into ``values``, and the group index will be passed into ``index``. + +.. note:: + + In terms of performance, **the first time a function is run using the Numba engine will be slow** + as Numba will have some function compilation overhead. However, the compiled functions are cached, + and subsequent calls will be fast. In general, the Numba engine is performant with + a larger amount of data points (e.g. 1+ million). + +.. code-block:: ipython + + In [1]: N = 10 ** 3 + + In [2]: data = {0: [str(i) for i in range(100)] * N, 1: list(range(100)) * N} + + In [3]: df = pd.DataFrame(data, columns=[0, 1]) + + In [4]: def f_numba(values, index): + ...: total = 0 + ...: for i, value in enumerate(values): + ...: if i % 2: + ...: total += value + 5 + ...: else: + ...: total += value * 2 + ...: return total + ...: + + In [5]: def f_cython(values): + ...: total = 0 + ...: for i, value in enumerate(values): + ...: if i % 2: + ...: total += value + 5 + ...: else: + ...: total += value * 2 + ...: return total + ...: + + In [6]: groupby = df.groupby(0) + # Run the first time, compilation time will affect performance + In [7]: %timeit -r 1 -n 1 groupby.aggregate(f_numba, engine='numba') + 2.14 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each) + # Function is cached and performance will improve + In [8]: %timeit groupby.aggregate(f_numba, engine='numba') + 4.93 ms ± 32.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) + + In [9]: %timeit groupby.aggregate(f_cython, engine='cython') + 18.6 ms ± 84.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) + Other useful features --------------------- From 3004046b49cfe36594124665ee493c6ef2fb931b Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 23 Apr 2020 21:28:44 -0700 Subject: [PATCH 15/16] Add warning about no fall back behavior --- doc/source/user_guide/groupby.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/doc/source/user_guide/groupby.rst b/doc/source/user_guide/groupby.rst index 18d611549550f..14240b3735940 100644 --- a/doc/source/user_guide/groupby.rst +++ b/doc/source/user_guide/groupby.rst @@ -1036,6 +1036,12 @@ and ``parallel`` are supported, and their default values are set to ``False``, ` The function signature must start with ``values, index`` **exactly** as the data belonging to each group will be passed into ``values``, and the group index will be passed into ``index``. +.. warning:: + + When using ``engine='numba'``, there will be no "fall back" behavior internally. The group + data and group index will be passed as numpy arrays to the JITed user defined function, and no + alternative execution attempts will be tried. + .. note:: In terms of performance, **the first time a function is run using the Numba engine will be slow** From 123e53a0fba00bb86cdbf8da4fc9b99e4e85bb8c Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 23 Apr 2020 22:09:01 -0700 Subject: [PATCH 16/16] Add noqa to timeit --- doc/source/user_guide/groupby.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/user_guide/groupby.rst b/doc/source/user_guide/groupby.rst index 14240b3735940..c5f58425139ee 100644 --- a/doc/source/user_guide/groupby.rst +++ b/doc/source/user_guide/groupby.rst @@ -1079,7 +1079,7 @@ will be passed into ``values``, and the group index will be passed into ``index` In [6]: groupby = df.groupby(0) # Run the first time, compilation time will affect performance - In [7]: %timeit -r 1 -n 1 groupby.aggregate(f_numba, engine='numba') + In [7]: %timeit -r 1 -n 1 groupby.aggregate(f_numba, engine='numba') # noqa: E225 2.14 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each) # Function is cached and performance will improve In [8]: %timeit groupby.aggregate(f_numba, engine='numba')