From e20bb0696994c2663edf4eafa511f933a0eb1bca Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 11 Feb 2020 23:12:53 -0800 Subject: [PATCH 1/7] Pass through engine keywords --- pandas/core/groupby/generic.py | 6 ++++-- pandas/core/groupby/groupby.py | 10 ++++++---- pandas/core/groupby/ops.py | 10 +++++++--- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index f194c774cf329..190da4ebb9254 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -223,8 +223,10 @@ def _selection_name(self): input="series", examples=_apply_docs["series_examples"] ) ) - def apply(self, func, *args, **kwargs): - return super().apply(func, *args, **kwargs) + def apply(self, func, engine="cython", engine_kwargs=None, *args, **kwargs): + return super().apply( + func, engine=engine, engine_kwargs=engine_kwargs, *args, **kwargs + ) @Substitution( see_also=_agg_see_also_doc, diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 153bf386d4f33..92b03121d70fd 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -703,7 +703,7 @@ def __iter__(self): input="dataframe", examples=_apply_docs["dataframe_examples"] ) ) - def apply(self, func, *args, **kwargs): + def apply(self, func, engine="cython", engine_kwargs=None, *args, **kwargs): func = self._is_builtin_func(func) @@ -732,7 +732,7 @@ def f(g): # ignore SettingWithCopy here in case the user mutates with option_context("mode.chained_assignment", None): try: - result = self._python_apply_general(f) + result = self._python_apply_general(f, engine, engine_kwargs) except TypeError: # gh-20949 # try again, with .apply acting as a filtering @@ -743,12 +743,14 @@ def f(g): # on a string grouper column with _group_selection_context(self): - return self._python_apply_general(f) + return self._python_apply_general(f, engine, engine_kwargs) return result def _python_apply_general(self, f): - keys, values, mutated = self.grouper.apply(f, self._selected_obj, self.axis) + keys, values, mutated = self.grouper.apply( + f, self._selected_obj, self.axis, engine="cython", engine_kwargs=None + ) return self._wrap_applied_output( keys, values, not_indexed_same=mutated or self.mutated diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 4e593ce543ea6..0179be7923f51 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -148,7 +148,9 @@ def _get_group_keys(self): # provide "flattened" iterator for multi-group setting return get_flattened_iterator(comp_ids, ngroups, self.levels, self.codes) - def apply(self, f, data: FrameOrSeries, axis: int = 0): + def apply( + self, f, data: FrameOrSeries, axis: int = 0, engine="cython", engine_kwargs=None + ): mutated = self.mutated splitter = self._get_splitter(data, axis=axis) group_keys = self._get_group_keys() @@ -169,7 +171,9 @@ def apply(self, f, data: FrameOrSeries, axis: int = 0): and not sdata.index._has_complex_internals ): try: - result_values, mutated = splitter.fast_apply(f, group_keys) + result_values, mutated = splitter.fast_apply( + f, group_keys, engine=engine, engine_kwargs=engine_kwargs + ) except libreduction.InvalidApply as err: # This Exception is raised if `f` triggers an exception @@ -927,7 +931,7 @@ def _chop(self, sdata: Series, slice_obj: slice) -> Series: class FrameSplitter(DataSplitter): - def fast_apply(self, f, names): + def fast_apply(self, f, names, engine="cython", engine_kwargs=None): # must return keys::list, values::list, mutated::bool starts, ends = lib.generate_slices(self.slabels, self.ngroups) From 44bd334e3523110de0c4e61ecb69aac2ee633559 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sat, 15 Feb 2020 22:34:21 -0800 Subject: [PATCH 2/7] Add initial paths to support numba engine --- pandas/core/groupby/groupby.py | 58 ++++++++++------- pandas/core/groupby/numba_.py | 110 +++++++++++++++++++++++++++++++++ pandas/core/groupby/ops.py | 12 ++-- 3 files changed, 153 insertions(+), 27 deletions(-) create mode 100644 pandas/core/groupby/numba_.py diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 29b7235f4180c..5e418aa9f6bae 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -58,7 +58,7 @@ class providing the base-class of operations. import pandas.core.common as com from pandas.core.frame import DataFrame from pandas.core.generic import NDFrame -from pandas.core.groupby import base, ops +from pandas.core.groupby import base, ops, numba_ from pandas.core.indexes.api import CategoricalIndex, Index, MultiIndex from pandas.core.series import Series from pandas.core.sorting import get_group_index_sorter @@ -707,32 +707,46 @@ def apply(self, func, engine="cython", engine_kwargs=None, *args, **kwargs): func = self._is_builtin_func(func) - # this is needed so we don't try and wrap strings. If we could - # resolve functions to their callable functions prior, this - # wouldn't be needed - if args or kwargs: - if callable(func): - - @wraps(func) - def f(g): - with np.errstate(all="ignore"): - return func(g, *args, **kwargs) + if engine == "cython": + # this is needed so we don't try and wrap strings. If we could + # resolve functions to their callable functions prior, this + # wouldn't be needed + if args or kwargs: + if callable(func): + + @wraps(func) + def f(g): + with np.errstate(all="ignore"): + return func(g, *args, **kwargs) + + elif hasattr(nanops, "nan" + func): + # TODO: should we wrap this in to e.g. _is_builtin_func? + f = getattr(nanops, "nan" + func) + + else: + raise ValueError( + "func must be a callable if args or kwargs are supplied" + ) + else: + f = func + elif engine == "numba": - elif hasattr(nanops, "nan" + func): - # TODO: should we wrap this in to e.g. _is_builtin_func? - f = getattr(nanops, "nan" + func) + numba_.validate_apply_function_signature(func) + if func in self.grouper._numba_func_cache: + # Return an already compiled version of the function if available + # TODO: this cache needs to be populated + f = self.grouper._numba_func_cache[func] else: - raise ValueError( - "func must be a callable if args or kwargs are supplied" - ) + # TODO: support args + f = numba_.generate_numba_apply_func(args, kwargs, func, engine_kwargs) else: - f = func + raise ValueError("engine must be either 'numba' or 'cython'") # ignore SettingWithCopy here in case the user mutates with option_context("mode.chained_assignment", None): try: - result = self._python_apply_general(f, engine, engine_kwargs) + result = self._python_apply_general(f, engine) except TypeError: # gh-20949 # try again, with .apply acting as a filtering @@ -743,13 +757,13 @@ def f(g): # on a string grouper column with _group_selection_context(self): - return self._python_apply_general(f, engine, engine_kwargs) + return self._python_apply_general(f, engine) return result - def _python_apply_general(self, f): + def _python_apply_general(self, f, engine="cython"): keys, values, mutated = self.grouper.apply( - f, self._selected_obj, self.axis, engine="cython", engine_kwargs=None + f, self._selected_obj, self.axis, engine=engine ) return self._wrap_applied_output( diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py new file mode 100644 index 0000000000000..ee50f7bcc7c85 --- /dev/null +++ b/pandas/core/groupby/numba_.py @@ -0,0 +1,110 @@ +import inspect +import types + +import numpy as np + +from pandas.compat._optional import import_optional_dependency + + +def validate_apply_function_signature(func): + """ + Validate that the apply function's first 2 arguments are 'values' and 'index'. + + func : function + function to be applied to each group and will be JITed + """ + apply_function_signature = list(inspect.signature(func).parameters.keys())[:2] + if apply_function_signature != ["values", "index"]: + raise ValueError( + "The apply function's first 2 arguments must be 'values' and 'index'" + ) + + +def make_groupby_apply( + func, args, nogil, parallel, nopython, +): + """ + Creates a JITted groupby apply function with a JITted version of + the user's function. + + Parameters + ---------- + func : function + function to be applied to each group and will be JITed + args : tuple + *args to be passed into the function + nogil : bool + nogil parameter from engine_kwargs for numba.jit + parallel : bool + parallel parameter from engine_kwargs for numba.jit + nopython : bool + nopython parameter from engine_kwargs for numba.jit + + Returns + ------- + Numba function + """ + numba = import_optional_dependency("numba") + + if isinstance(func, numba.targets.registry.CPUDispatcher): + # Don't jit a user passed jitted function + numba_func = func + else: + + @numba.generated_jit(nopython=nopython, nogil=nogil, parallel=parallel) + def numba_func(group, *_args): + if getattr(np, func.__name__, False) is func or isinstance( + func, types.BuiltinFunctionType + ): + jf = func + else: + jf = numba.jit(func, nopython=nopython, nogil=nogil) + + def impl(group, *_args): + return jf(group, *_args) + + return impl + + return numba_func + + +def generate_numba_apply_func( + args, kwargs, func, engine_kwargs, +): + """ + Generate a numba jitted apply function specified by values from engine_kwargs. + + 1. jit the user's function + + Configurations specified in engine_kwargs apply to both the user's + function _AND_ the rolling apply function. + + Parameters + ---------- + args : tuple + *args to be passed into the function + kwargs : dict + **kwargs to be passed into the function + func : function + function to be applied to each group and will be JITed + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + + Returns + ------- + Numba function + """ + if engine_kwargs is None: + engine_kwargs = {} + + nopython = engine_kwargs.get("nopython", True) + nogil = engine_kwargs.get("nogil", False) + parallel = engine_kwargs.get("parallel", False) + + if kwargs and nopython: + raise ValueError( + "numba does not support kwargs with nopython=True: " + "https://github.com/numba/numba/issues/2916" + ) + + return make_groupby_apply(func, args, nogil, parallel, nopython) diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 95e2c2d9551e8..406acf033335d 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -96,6 +96,7 @@ def __init__( self.group_keys = group_keys self.mutated = mutated self.indexer = indexer + self._numba_apply_cache = dict() @property def groupings(self) -> List["grouper.Grouping"]: @@ -148,9 +149,7 @@ def _get_group_keys(self): # provide "flattened" iterator for multi-group setting return get_flattened_iterator(comp_ids, ngroups, self.levels, self.codes) - def apply( - self, f, data: FrameOrSeries, axis: int = 0, engine="cython", engine_kwargs=None - ): + def apply(self, f, data: FrameOrSeries, axis: int = 0, engine="cython"): mutated = self.mutated splitter = self._get_splitter(data, axis=axis) group_keys = self._get_group_keys() @@ -172,7 +171,7 @@ def apply( ): try: result_values, mutated = splitter.fast_apply( - f, group_keys, engine=engine, engine_kwargs=engine_kwargs + f, group_keys, engine=engine ) except libreduction.InvalidApply as err: @@ -929,11 +928,14 @@ def _chop(self, sdata: Series, slice_obj: slice) -> Series: class FrameSplitter(DataSplitter): - def fast_apply(self, f, names, engine="cython", engine_kwargs=None): + def fast_apply(self, f, names, engine="cython"): # must return keys::list, values::list, mutated::bool starts, ends = lib.generate_slices(self.slabels, self.ngroups) sdata = self._get_sorted_data() + if engine == "numba": + # TODO: Raise if we don't have a series here (or dataframe with >1 columns?) + pass return libreduction.apply_frame_axis0(sdata, f, names, starts, ends) def _chop(self, sdata: DataFrame, slice_obj: slice) -> DataFrame: From 2be4e2735a870a59b6f4743bda2980edb7352523 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 16 Feb 2020 15:01:29 -0800 Subject: [PATCH 3/7] Start constructing algorithm --- pandas/_libs/reduction.pyx | 2 +- pandas/core/groupby/numba_.py | 30 ++++++++++++++++++++++++++++++ pandas/core/groupby/ops.py | 9 ++++----- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/pandas/_libs/reduction.pyx b/pandas/_libs/reduction.pyx index b27072aa66708..77d0f0f21dc7f 100644 --- a/pandas/_libs/reduction.pyx +++ b/pandas/_libs/reduction.pyx @@ -483,7 +483,7 @@ def apply_frame_axis0(object frame, object f, object names, item_cache.clear() # ugh chunk = slider.dummy object.__setattr__(chunk, 'name', names[i]) - + print(chunk) try: piece = f(chunk) except Exception: diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index ee50f7bcc7c85..8b24c71bf96d9 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -6,6 +6,36 @@ from pandas.compat._optional import import_optional_dependency +class InvalidApply(Exception): + pass + + +def execute_groupby_function(sdata, f, names, starts, ends): + """Mimics apply_frame_axis0 which is the Cython equivalent of this function.""" + # Run function + # Check if its mutated? + # copy data (maybe not necessary) + # TODO: Raise if we don't have a series here (or dataframe with >1 columns?) + results = [] + for start, end in zip(starts, ends): + group = sdata.iloc[start, end] + group_ndarray = group.values.to_numpy() + group_index = group.index.to_numpy() + try: + # TODO: support *args, **kwargs here + group_result = f(group_ndarray, group_index) + except Exception: + # We can't be more specific without knowing something about `f` + # Like we do in Cython + raise InvalidApply('Let this error raise above us') + # Reconstruct the pandas object (expected downstream) + group_result = group._constructor(group_result, index=group.index, columns = group.columns) + results.append(group_result) + + # TODO: How do we check mutated? + return results, False + + def validate_apply_function_signature(func): """ Validate that the apply function's first 2 arguments are 'values' and 'index'. diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 406acf033335d..b5518a2d3f622 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -43,7 +43,7 @@ import pandas.core.common as com from pandas.core.frame import DataFrame from pandas.core.generic import NDFrame -from pandas.core.groupby import base, grouper +from pandas.core.groupby import base, grouper, numba_ from pandas.core.indexes.api import Index, MultiIndex, ensure_index from pandas.core.series import Series from pandas.core.sorting import ( @@ -187,7 +187,7 @@ def apply(self, f, data: FrameOrSeries, axis: int = 0, engine="cython"): # Otherwise we need to fall back to the slow implementation. if len(result_values) == len(group_keys): return group_keys, result_values, mutated - + import pdb; pdb.set_trace() for key, (i, group) in zip(group_keys, splitter): object.__setattr__(group, "name", key) @@ -931,11 +931,10 @@ class FrameSplitter(DataSplitter): def fast_apply(self, f, names, engine="cython"): # must return keys::list, values::list, mutated::bool starts, ends = lib.generate_slices(self.slabels, self.ngroups) - + import pdb; pdb.set_trace() sdata = self._get_sorted_data() if engine == "numba": - # TODO: Raise if we don't have a series here (or dataframe with >1 columns?) - pass + return numba_.execute_groupby_function(sdata, f, names, starts, ends) return libreduction.apply_frame_axis0(sdata, f, names, starts, ends) def _chop(self, sdata: DataFrame, slice_obj: slice) -> DataFrame: From 5cec57be292ec9bad7575ff26f2f52f97a277d9f Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 3 Mar 2020 22:58:14 -0800 Subject: [PATCH 4/7] Remove frame apply for now, print cleanup --- pandas/_libs/reduction.pyx | 1 - pandas/core/groupby/ops.py | 8 +++----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/pandas/_libs/reduction.pyx b/pandas/_libs/reduction.pyx index 77d0f0f21dc7f..04107f04ffb67 100644 --- a/pandas/_libs/reduction.pyx +++ b/pandas/_libs/reduction.pyx @@ -483,7 +483,6 @@ def apply_frame_axis0(object frame, object f, object names, item_cache.clear() # ugh chunk = slider.dummy object.__setattr__(chunk, 'name', names[i]) - print(chunk) try: piece = f(chunk) except Exception: diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index 5946efd88eb7b..d088c21067585 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -171,7 +171,7 @@ def apply(self, f, data: FrameOrSeries, axis: int = 0, engine="cython"): ): try: result_values, mutated = splitter.fast_apply( - f, sdata, group_keys, engine=engine + f, sdata, group_keys ) except libreduction.InvalidApply as err: @@ -187,7 +187,7 @@ def apply(self, f, data: FrameOrSeries, axis: int = 0, engine="cython"): # Otherwise we need to fall back to the slow implementation. if len(result_values) == len(group_keys): return group_keys, result_values, mutated - import pdb; pdb.set_trace() + for key, (i, group) in zip(group_keys, splitter): object.__setattr__(group, "name", key) @@ -928,11 +928,9 @@ def _chop(self, sdata: Series, slice_obj: slice) -> Series: class FrameSplitter(DataSplitter): - def fast_apply(self, f, sdata: FrameOrSeries, names, engine="cython"): + def fast_apply(self, f, sdata: FrameOrSeries, names): # must return keys::list, values::list, mutated::bool starts, ends = lib.generate_slices(self.slabels, self.ngroups) - if engine == "numba": - return numba_.execute_groupby_function(sdata, f, names, starts, ends) return libreduction.apply_frame_axis0(sdata, f, names, starts, ends) def _chop(self, sdata: DataFrame, slice_obj: slice) -> DataFrame: From e2bd9481827a742b8898009e32d26a90c54ecdc9 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Wed, 4 Mar 2020 00:36:17 -0800 Subject: [PATCH 5/7] Working for series :) --- pandas/core/groupby/groupby.py | 4 ++-- pandas/core/groupby/numba_.py | 25 +++++++++++-------------- pandas/core/groupby/ops.py | 14 +++++++++++--- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 309b28cb1464a..f17bb03f4cffa 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -733,10 +733,10 @@ def f(g): numba_.validate_apply_function_signature(func) - if func in self.grouper._numba_func_cache: + if func in self.grouper._numba_apply_cache: # Return an already compiled version of the function if available # TODO: this cache needs to be populated - f = self.grouper._numba_func_cache[func] + f = self.grouper._numba_apply_cache[func] else: # TODO: support args f = numba_.generate_numba_apply_func(args, kwargs, func, engine_kwargs) diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index 8b24c71bf96d9..19ffd49abe979 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -10,30 +10,27 @@ class InvalidApply(Exception): pass -def execute_groupby_function(sdata, f, names, starts, ends): +def execute_groupby_function(splitter, f): """Mimics apply_frame_axis0 which is the Cython equivalent of this function.""" - # Run function - # Check if its mutated? - # copy data (maybe not necessary) - # TODO: Raise if we don't have a series here (or dataframe with >1 columns?) results = [] - for start, end in zip(starts, ends): - group = sdata.iloc[start, end] - group_ndarray = group.values.to_numpy() - group_index = group.index.to_numpy() + for _, group in splitter: + # TODO: what about series names/dataframe columns + index = group.index + values_as_array = group.to_numpy() + index_as_array = index.to_numpy() try: # TODO: support *args, **kwargs here - group_result = f(group_ndarray, group_index) + group_result = f(values_as_array, index_as_array) except Exception: # We can't be more specific without knowing something about `f` # Like we do in Cython - raise InvalidApply('Let this error raise above us') + raise InvalidApply("Let this error raise above us") # Reconstruct the pandas object (expected downstream) - group_result = group._constructor(group_result, index=group.index, columns = group.columns) + # This construction will fail is there is mutation, but we're banning it with numba? + group_result = group._constructor(group_result, index=index) results.append(group_result) - # TODO: How do we check mutated? - return results, False + return results def validate_apply_function_signature(func): diff --git a/pandas/core/groupby/ops.py b/pandas/core/groupby/ops.py index d088c21067585..2d5755c4470d8 100644 --- a/pandas/core/groupby/ops.py +++ b/pandas/core/groupby/ops.py @@ -156,6 +156,16 @@ def apply(self, f, data: FrameOrSeries, axis: int = 0, engine="cython"): result_values = None sdata: FrameOrSeries = splitter._get_sorted_data() + + if engine == "numba": + result_values = numba_.execute_groupby_function(splitter, f) + + # mutation is determined based on index alignment + # numba functions always return numpy arrays w/o indexes + # therefore, mutated=False? + # or just ban mutation so mutated=False always + return group_keys, result_values, False + if sdata.ndim == 2 and np.any(sdata.dtypes.apply(is_extension_array_dtype)): # calling splitter.fast_apply will raise TypeError via apply_frame_axis0 # if we pass EA instead of ndarray @@ -170,9 +180,7 @@ def apply(self, f, data: FrameOrSeries, axis: int = 0, engine="cython"): and not sdata.index._has_complex_internals ): try: - result_values, mutated = splitter.fast_apply( - f, sdata, group_keys - ) + result_values, mutated = splitter.fast_apply(f, sdata, group_keys) except libreduction.InvalidApply as err: # This Exception is raised if `f` triggers an exception From af35cb0deaf920bf525d70e646ad37bbe98a6843 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Wed, 4 Mar 2020 00:37:12 -0800 Subject: [PATCH 6/7] Add back whitespace --- pandas/_libs/reduction.pyx | 1 + 1 file changed, 1 insertion(+) diff --git a/pandas/_libs/reduction.pyx b/pandas/_libs/reduction.pyx index 04107f04ffb67..b27072aa66708 100644 --- a/pandas/_libs/reduction.pyx +++ b/pandas/_libs/reduction.pyx @@ -483,6 +483,7 @@ def apply_frame_axis0(object frame, object f, object names, item_cache.clear() # ugh chunk = slider.dummy object.__setattr__(chunk, 'name', names[i]) + try: piece = f(chunk) except Exception: From 4c6e464d31b456a36ead62445d17eba04b7208b2 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Wed, 4 Mar 2020 00:39:55 -0800 Subject: [PATCH 7/7] Line length --- pandas/core/groupby/numba_.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/core/groupby/numba_.py b/pandas/core/groupby/numba_.py index 19ffd49abe979..2f71a46775c94 100644 --- a/pandas/core/groupby/numba_.py +++ b/pandas/core/groupby/numba_.py @@ -26,7 +26,8 @@ def execute_groupby_function(splitter, f): # Like we do in Cython raise InvalidApply("Let this error raise above us") # Reconstruct the pandas object (expected downstream) - # This construction will fail is there is mutation, but we're banning it with numba? + # This construction will fail is there is mutation, + # but we're banning it with numba? group_result = group._constructor(group_result, index=index) results.append(group_result)