From c66dcb1e6ddb39fdd1259299b26e8714a6faed4f Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 28 Jul 2020 21:35:24 -0700 Subject: [PATCH 1/3] Add numba engine for groupby apply --- pandas/core/groupby/groupby.py | 34 +++++++++++++++-- pandas/core/groupby/numba_.py | 68 ++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 pandas/core/groupby/numba_.py diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index ac45222625569..3f91694bca87e 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -34,7 +34,7 @@ class providing the base-class of operations. from pandas._config.config import option_context -from pandas._libs import Timestamp +from pandas._libs import Timestamp, lib import pandas._libs.groupby as libgroupby from pandas._typing import F, FrameOrSeries, FrameOrSeriesUnion, Scalar from pandas.compat.numpy import function as nv @@ -61,11 +61,11 @@ class providing the base-class of operations. import pandas.core.common as com from pandas.core.frame import DataFrame from pandas.core.generic import NDFrame -from pandas.core.groupby import base, ops +from pandas.core.groupby import base, numba_, ops from pandas.core.indexes.api import CategoricalIndex, Index, MultiIndex from pandas.core.series import Series from pandas.core.sorting import get_group_index_sorter -from pandas.core.util.numba_ import maybe_use_numba +from pandas.core.util.numba_ import NUMBA_FUNC_CACHE, maybe_use_numba _common_see_also = """ See Also @@ -827,7 +827,11 @@ def __iter__(self): input="dataframe", examples=_apply_docs["dataframe_examples"] ) ) - def apply(self, func, *args, **kwargs): + def apply(self, func, *args, engine=None, engine_kwargs=None, **kwargs): + + if maybe_use_numba(engine): + result = self._apply_with_numba(func, func, *args, engine_kwargs=engine_kwargs, **kwargs) + return self.obj._constructor(result) func = self._is_builtin_func(func) @@ -871,6 +875,28 @@ def f(g): return result + def _apply_with_numba(self, func, *args, engine_kwargs=None, **kwargs): + group_keys = self.grouper._get_group_keys() + + with _group_selection_context(self): + # We always drop the column with the groupby key + data = self._selected_obj + labels, _, n_groups = self.grouper.group_info + sorted_index = get_group_index_sorter(labels, n_groups) + sorted_labels = algorithms.take_nd(labels, sorted_index, allow_fill=False) + sorted_data = data.take(sorted_index, axis=self.axis) + starts, ends = lib.generate_slices(sorted_labels, n_groups) + cache_key = (func, "groupby_apply") + if cache_key in NUMBA_FUNC_CACHE: + # Return an already compiled version of roll_apply if available + apply_func = NUMBA_FUNC_CACHE[cache_key] + else: + apply_func = numba_.generate_numba_apply_func(args, kwargs, func, engine_kwargs) + result = apply_func(sorted_data.to_numpy(), starts, ends, len(group_keys), len(data.columns)) + + return result + + def _python_apply_general( self, f: F, data: FrameOrSeriesUnion ) -> FrameOrSeriesUnion: diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py new file mode 100644 index 0000000000000..cb64a58df5180 --- /dev/null +++ b/pandas/core/groupby/numba_.py @@ -0,0 +1,68 @@ +from typing import Any, Callable, Dict, Optional, Tuple + +import numpy as np + +from pandas._typing import Scalar +from pandas.compat._optional import import_optional_dependency + +from pandas.core.util.numba_ import ( + check_kwargs_and_nopython, + get_jit_arguments, + jit_user_function, +) + + +def generate_numba_apply_func( + args: Tuple, + kwargs: Dict[str, Any], + func: Callable[..., Scalar], + engine_kwargs: Optional[Dict[str, bool]], +): + """ + Generate a numba jitted apply function specified by values from engine_kwargs. + + 1. jit the user's function + 2. Return a rolling apply function with the jitted function inline + + Configurations specified in engine_kwargs apply to both the user's + function _AND_ the rolling apply function. + + Parameters + ---------- + args : tuple + *args to be passed into the function + kwargs : dict + **kwargs to be passed into the function + func : function + function to be applied to each window and will be JITed + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + + check_kwargs_and_nopython(kwargs, nopython) + + numba_func = jit_user_function(func, nopython, nogil, parallel) + + numba = import_optional_dependency("numba") + + if parallel: + loop_range = numba.prange + else: + loop_range = range + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def roll_apply( + values: np.ndarray, begin: np.ndarray, end: np.ndarray, num_groups: int, num_columns: int + ) -> np.ndarray: + result = np.empty((num_groups, num_columns)) + for i in loop_range(num_groups): + for j in loop_range(num_columns): + result[i, j] = numba_func(values[begin[i]:end[i], j]) + return result + + return roll_apply From 0963cf0cc106f1d1b93a7b9db2461b6626fff473 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 2 Aug 2020 00:23:22 -0700 Subject: [PATCH 2/3] Black and wrap result --- pandas/core/groupby/groupby.py | 20 ++++++++++++++------ pandas/core/groupby/numba_.py | 8 ++++++-- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 3f91694bca87e..ef90509597876 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -830,8 +830,9 @@ def __iter__(self): def apply(self, func, *args, engine=None, engine_kwargs=None, **kwargs): if maybe_use_numba(engine): - result = self._apply_with_numba(func, func, *args, engine_kwargs=engine_kwargs, **kwargs) - return self.obj._constructor(result) + return self._apply_with_numba( + func, func, *args, engine_kwargs=engine_kwargs, **kwargs + ) func = self._is_builtin_func(func) @@ -891,11 +892,18 @@ def _apply_with_numba(self, func, *args, engine_kwargs=None, **kwargs): # Return an already compiled version of roll_apply if available apply_func = NUMBA_FUNC_CACHE[cache_key] else: - apply_func = numba_.generate_numba_apply_func(args, kwargs, func, engine_kwargs) - result = apply_func(sorted_data.to_numpy(), starts, ends, len(group_keys), len(data.columns)) - - return result + apply_func = numba_.generate_numba_apply_func( + args, kwargs, func, engine_kwargs + ) + result = apply_func( + sorted_data.to_numpy(), starts, ends, len(group_keys), len(data.columns) + ) + if self.grouper.nkeys > 1: + index = MultiIndex.from_tuples(group_keys, names=self.grouper.names) + else: + index = Index(group_keys, name=self.grouper.names[0]) + return self.obj._constructor(result, index=index, columns=data.columns) def _python_apply_general( self, f: F, data: FrameOrSeriesUnion diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index cb64a58df5180..40a5c491fd01a 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -57,12 +57,16 @@ def generate_numba_apply_func( @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) def roll_apply( - values: np.ndarray, begin: np.ndarray, end: np.ndarray, num_groups: int, num_columns: int + values: np.ndarray, + begin: np.ndarray, + end: np.ndarray, + num_groups: int, + num_columns: int, ) -> np.ndarray: result = np.empty((num_groups, num_columns)) for i in loop_range(num_groups): for j in loop_range(num_columns): - result[i, j] = numba_func(values[begin[i]:end[i], j]) + result[i, j] = numba_func(values[begin[i] : end[i], j]) return result return roll_apply From 376ee8f60aaaabf5a3fc6a087bfef110fe67dc0e Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 4 Aug 2020 00:22:20 -0700 Subject: [PATCH 3/3] get *args to work --- pandas/core/groupby/groupby.py | 4 ++-- pandas/core/groupby/numba_.py | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index ef90509597876..e8010f1216dd4 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -831,7 +831,7 @@ def apply(self, func, *args, engine=None, engine_kwargs=None, **kwargs): if maybe_use_numba(engine): return self._apply_with_numba( - func, func, *args, engine_kwargs=engine_kwargs, **kwargs + func, *args, engine_kwargs=engine_kwargs, **kwargs ) func = self._is_builtin_func(func) @@ -893,7 +893,7 @@ def _apply_with_numba(self, func, *args, engine_kwargs=None, **kwargs): apply_func = NUMBA_FUNC_CACHE[cache_key] else: apply_func = numba_.generate_numba_apply_func( - args, kwargs, func, engine_kwargs + tuple(args), kwargs, func, engine_kwargs ) result = apply_func( sorted_data.to_numpy(), starts, ends, len(group_keys), len(data.columns) diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index 40a5c491fd01a..6ba3659985f6b 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -56,7 +56,7 @@ def generate_numba_apply_func( loop_range = range @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) - def roll_apply( + def group_apply( values: np.ndarray, begin: np.ndarray, end: np.ndarray, @@ -66,7 +66,8 @@ def roll_apply( result = np.empty((num_groups, num_columns)) for i in loop_range(num_groups): for j in loop_range(num_columns): - result[i, j] = numba_func(values[begin[i] : end[i], j]) + group = values[begin[i] : end[i], j] + result[i, j] = numba_func(group, *args) return result - return roll_apply + return group_apply