diff --git a/doc/source/whatsnew/v3.0.0.rst b/doc/source/whatsnew/v3.0.0.rst index 25c26253e687b..7800e7a5a4241 100644 --- a/doc/source/whatsnew/v3.0.0.rst +++ b/doc/source/whatsnew/v3.0.0.rst @@ -66,6 +66,7 @@ Other enhancements - :class:`Rolling` and :class:`Expanding` now support aggregations ``first`` and ``last`` (:issue:`33155`) - :func:`read_parquet` accepts ``to_pandas_kwargs`` which are forwarded to :meth:`pyarrow.Table.to_pandas` which enables passing additional keywords to customize the conversion to pandas, such as ``maps_as_pydicts`` to read the Parquet map data type as python dictionaries (:issue:`56842`) - :meth:`.DataFrameGroupBy.transform`, :meth:`.SeriesGroupBy.transform`, :meth:`.DataFrameGroupBy.agg`, :meth:`.SeriesGroupBy.agg`, :meth:`.SeriesGroupBy.apply`, :meth:`.DataFrameGroupBy.apply` now support ``kurt`` (:issue:`40139`) +- :meth:`DataFrame.apply` supports using third-party execution engines like the Bodo.ai JIT compiler (:issue:`60668`) - :meth:`DataFrameGroupBy.transform`, :meth:`SeriesGroupBy.transform`, :meth:`DataFrameGroupBy.agg`, :meth:`SeriesGroupBy.agg`, :meth:`RollingGroupby.apply`, :meth:`ExpandingGroupby.apply`, :meth:`Rolling.apply`, :meth:`Expanding.apply`, :meth:`DataFrame.apply` with ``engine="numba"`` now supports positional arguments passed as kwargs (:issue:`58995`) - :meth:`Rolling.agg`, :meth:`Expanding.agg` and :meth:`ExponentialMovingWindow.agg` now accept :class:`NamedAgg` aggregations through ``**kwargs`` (:issue:`28333`) - :meth:`Series.map` can now accept kwargs to pass on to func (:issue:`59814`) diff --git a/pandas/api/__init__.py b/pandas/api/__init__.py index 8f659e3cd14c8..a016e67a41360 100644 --- a/pandas/api/__init__.py +++ b/pandas/api/__init__.py @@ -1,6 +1,7 @@ """public toolkit API""" from pandas.api import ( + executors, extensions, indexers, interchange, @@ -9,6 +10,7 @@ ) __all__ = [ + "executors", "extensions", "indexers", "interchange", diff --git a/pandas/api/executors/__init__.py b/pandas/api/executors/__init__.py new file mode 100644 index 0000000000000..04c94ee688332 --- /dev/null +++ b/pandas/api/executors/__init__.py @@ -0,0 +1,7 @@ +""" +Public API for function executor engines to be used with ``map`` and ``apply``. +""" + +from pandas.core.apply import BaseExecutionEngine + +__all__ = ["BaseExecutionEngine"] diff --git a/pandas/core/apply.py b/pandas/core/apply.py index f36fc82fb1a11..da6124307e3f1 100644 --- a/pandas/core/apply.py +++ b/pandas/core/apply.py @@ -74,6 +74,110 @@ ResType = dict[int, Any] +class BaseExecutionEngine(abc.ABC): + """ + Base class for execution engines for map and apply methods. + + An execution engine receives all the parameters of a call to + ``apply`` or ``map``, such as the data container, the function, + etc. and takes care of running the execution. + + Supporting different engines allows functions to be JIT compiled, + run in parallel, and others. Besides the default executor which + simply runs the code with the Python interpreter and pandas. + """ + + @staticmethod + @abc.abstractmethod + def map( + data: Series | DataFrame | np.ndarray, + func: AggFuncType, + args: tuple, + kwargs: dict[str, Any], + decorator: Callable | None, + skip_na: bool, + ): + """ + Executor method to run functions elementwise. + + In general, pandas uses ``map`` for running functions elementwise, + but ``Series.apply`` with the default ``by_row='compat'`` will also + call this executor function. + + Parameters + ---------- + data : Series, DataFrame or NumPy ndarray + The object to use for the data. Some methods implement a ``raw`` + parameter which will convert the original pandas object to a + NumPy array, which will then be passed here to the executor. + func : function or NumPy ufunc + The function to execute. + args : tuple + Positional arguments to be passed to ``func``. + kwargs : dict + Keyword arguments to be passed to ``func``. + decorator : function, optional + For JIT compilers and other engines that need to decorate the + function ``func``, this is the decorator to use. While the + executor may already know which is the decorator to use, this + is useful as for a single executor the user can specify for + example ``numba.jit`` or ``numba.njit(nogil=True)``, and this + decorator parameter will contain the exact decorator from the + executor the user wants to use. + skip_na : bool + Whether the function should be called for missing values or not. + This is specified by the pandas user as ``map(na_action=None)`` + or ``map(na_action='ignore')``. + """ + + @staticmethod + @abc.abstractmethod + def apply( + data: Series | DataFrame | np.ndarray, + func: AggFuncType, + args: tuple, + kwargs: dict[str, Any], + decorator: Callable, + axis: Axis, + ): + """ + Executor method to run functions by an axis. + + While we can see ``map`` as executing the function for each cell + in a ``DataFrame`` (or ``Series``), ``apply`` will execute the + function for each column (or row). + + Parameters + ---------- + data : Series, DataFrame or NumPy ndarray + The object to use for the data. Some methods implement a ``raw`` + parameter which will convert the original pandas object to a + NumPy array, which will then be passed here to the executor. + func : function or NumPy ufunc + The function to execute. + args : tuple + Positional arguments to be passed to ``func``. + kwargs : dict + Keyword arguments to be passed to ``func``. + decorator : function, optional + For JIT compilers and other engines that need to decorate the + function ``func``, this is the decorator to use. While the + executor may already know which is the decorator to use, this + is useful as for a single executor the user can specify for + example ``numba.jit`` or ``numba.njit(nogil=True)``, and this + decorator parameter will contain the exact decorator from the + executor the user wants to use. + axis : {0 or 'index', 1 or 'columns'} + 0 or 'index' should execute the function passing each column as + parameter. 1 or 'columns' should execute the function passing + each row as parameter. The default executor engine passes rows + as pandas ``Series``. Other executor engines should probably + expect functions to be implemented this way for compatibility. + But passing rows as other data structures is technically possible + as far as the function ``func`` is implemented accordingly. + """ + + def frame_apply( obj: DataFrame, func: AggFuncType, diff --git a/pandas/core/frame.py b/pandas/core/frame.py index c862b7dbaf973..8f65277f660f7 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -10275,7 +10275,7 @@ def apply( result_type: Literal["expand", "reduce", "broadcast"] | None = None, args=(), by_row: Literal[False, "compat"] = "compat", - engine: Literal["python", "numba"] = "python", + engine: Callable | None | Literal["python", "numba"] = None, engine_kwargs: dict[str, bool] | None = None, **kwargs, ): @@ -10339,28 +10339,24 @@ def apply( .. versionadded:: 2.1.0 - engine : {'python', 'numba'}, default 'python' - Choose between the python (default) engine or the numba engine in apply. + engine : decorator or {'python', 'numba'}, optional + Choose the execution engine to use. If not provided the function + will be executed by the regular Python interpreter. - The numba engine will attempt to JIT compile the passed function, - which may result in speedups for large DataFrames. - It also supports the following engine_kwargs : + Other options include JIT compilers such Numba and Bodo, which in some + cases can speed up the execution. To use an executor you can provide + the decorators ``numba.jit``, ``numba.njit`` or ``bodo.jit``. You can + also provide the decorator with parameters, like ``numba.jit(nogit=True)``. - - nopython (compile the function in nopython mode) - - nogil (release the GIL inside the JIT compiled function) - - parallel (try to apply the function in parallel over the DataFrame) + Not all functions can be executed with all execution engines. In general, + JIT compilers will require type stability in the function (no variable + should change data type during the execution). And not all pandas and + NumPy APIs are supported. Check the engine documentation [1]_ and [2]_ + for limitations. - Note: Due to limitations within numba/how pandas interfaces with numba, - you should only use this if raw=True - - Note: The numba compiler only supports a subset of - valid Python/numpy operations. + .. warning:: - Please read more about the `supported python features - `_ - and `supported numpy features - `_ - in numba to learn what you can or cannot use in the passed function. + String parameters will stop being supported in a future pandas version. .. versionadded:: 2.2.0 @@ -10368,6 +10364,7 @@ def apply( Pass keyword arguments to the engine. This is currently only used by the numba engine, see the documentation for the engine argument for more information. + **kwargs Additional keyword arguments to pass as keywords arguments to `func`. @@ -10390,6 +10387,13 @@ def apply( behavior or errors and are not supported. See :ref:`gotchas.udf-mutation` for more details. + References + ---------- + .. [1] `Numba documentation + `_ + .. [2] `Bodo documentation + `/ + Examples -------- >>> df = pd.DataFrame([[4, 9]] * 3, columns=["A", "B"]) @@ -10458,22 +10462,99 @@ def apply( 0 1 2 1 1 2 2 1 2 + + Advanced users can speed up their code by using a Just-in-time (JIT) compiler + with ``apply``. The main JIT compilers available for pandas are Numba and Bodo. + In general, JIT compilation is only possible when the function passed to + ``apply`` has type stability (variables in the function do not change their + type during the execution). + + >>> import bodo + >>> df.apply(lambda x: x.A + x.B, axis=1, engine=bodo.jit) + + Note that JIT compilation is only recommended for functions that take a + significant amount of time to run. Fast functions are unlikely to run faster + with JIT compilation. """ - from pandas.core.apply import frame_apply + if engine is None or isinstance(engine, str): + from pandas.core.apply import frame_apply - op = frame_apply( - self, - func=func, - axis=axis, - raw=raw, - result_type=result_type, - by_row=by_row, - engine=engine, - engine_kwargs=engine_kwargs, - args=args, - kwargs=kwargs, - ) - return op.apply().__finalize__(self, method="apply") + if engine is None: + engine = "python" + + if engine not in ["python", "numba"]: + raise ValueError(f"Unknown engine '{engine}'") + + op = frame_apply( + self, + func=func, + axis=axis, + raw=raw, + result_type=result_type, + by_row=by_row, + engine=engine, + engine_kwargs=engine_kwargs, + args=args, + kwargs=kwargs, + ) + return op.apply().__finalize__(self, method="apply") + elif hasattr(engine, "__pandas_udf__"): + if result_type is not None: + raise NotImplementedError( + f"{result_type=} only implemented for the default engine" + ) + + agg_axis = self._get_agg_axis(self._get_axis_number(axis)) + + # one axis is empty + if not all(self.shape): + func = cast(Callable, func) + try: + if axis == 0: + r = func(Series([], dtype=np.float64), *args, **kwargs) + else: + r = func( + Series(index=self.columns, dtype=np.float64), + *args, + **kwargs, + ) + except Exception: + pass + else: + if not isinstance(r, Series): + if len(agg_axis): + r = func(Series([], dtype=np.float64), *args, **kwargs) + else: + r = np.nan + + return self._constructor_sliced(r, index=agg_axis) + return self.copy() + + data: DataFrame | np.ndarray = self + if raw: + # This will upcast the whole DataFrame to the same type, + # and likely result in an object 2D array. + # We should probably pass a list of 1D arrays instead, at + # lest for ``axis=0`` + data = self.values + result = engine.__pandas_udf__.apply( + data=data, + func=func, + args=args, + kwargs=kwargs, + decorator=engine, + axis=axis, + ) + if raw: + if result.ndim == 2: + return self._constructor( + result, index=self.index, columns=self.columns + ) + else: + return self._constructor_sliced(result, index=agg_axis) + return result + else: + raise ValueError(f"Unknown engine {engine}") def map( self, func: PythonFuncType, na_action: Literal["ignore"] | None = None, **kwargs @@ -10590,9 +10671,11 @@ def _append( index = Index( [other.name], - name=self.index.names - if isinstance(self.index, MultiIndex) - else self.index.name, + name=( + self.index.names + if isinstance(self.index, MultiIndex) + else self.index.name + ), ) row_df = other.to_frame().T # infer_objects is needed for diff --git a/pandas/tests/api/test_api.py b/pandas/tests/api/test_api.py index 4a05259a98087..2ba90948be399 100644 --- a/pandas/tests/api/test_api.py +++ b/pandas/tests/api/test_api.py @@ -6,6 +6,7 @@ from pandas import api import pandas._testing as tm from pandas.api import ( + executors as api_executors, extensions as api_extensions, indexers as api_indexers, interchange as api_interchange, @@ -243,6 +244,7 @@ def test_depr(self): class TestApi(Base): allowed_api_dirs = [ + "executors", "types", "extensions", "indexers", @@ -338,6 +340,7 @@ class TestApi(Base): "ExtensionArray", "ExtensionScalarOpsMixin", ] + allowed_api_executors = ["BaseExecutionEngine"] def test_api(self): self.check(api, self.allowed_api_dirs) @@ -357,6 +360,9 @@ def test_api_indexers(self): def test_api_extensions(self): self.check(api_extensions, self.allowed_api_extensions) + def test_api_executors(self): + self.check(api_executors, self.allowed_api_executors) + class TestErrors(Base): def test_errors(self): diff --git a/pandas/tests/apply/test_frame_apply.py b/pandas/tests/apply/test_frame_apply.py index b9e407adc3051..2d47cd851ad10 100644 --- a/pandas/tests/apply/test_frame_apply.py +++ b/pandas/tests/apply/test_frame_apply.py @@ -17,10 +17,63 @@ date_range, ) import pandas._testing as tm +from pandas.api.executors import BaseExecutionEngine from pandas.tests.frame.common import zip_frames from pandas.util.version import Version +class MockExecutionEngine(BaseExecutionEngine): + """ + Execution Engine to test if the execution engine interface receives and + uses all parameters provided by the user. + + Making this engine work as the default Python engine by calling it, no extra + functionality is implemented here. + + When testing, this will be called when this engine is provided, and then the + same pandas.map and pandas.apply function will be called, but without engine, + executing the default behavior from the python engine. + """ + + def map(data, func, args, kwargs, decorator, skip_na): + kwargs_to_pass = kwargs if isinstance(data, DataFrame) else {} + return data.map( + func, action_na="ignore" if skip_na else False, **kwargs_to_pass + ) + + def apply(data, func, args, kwargs, decorator, axis): + if isinstance(data, Series): + return data.apply(func, convert_dtype=True, args=args, by_row=False) + elif isinstance(data, DataFrame): + return data.apply( + func, + axis=axis, + raw=False, + result_type=None, + args=args, + by_row="compat", + **kwargs, + ) + else: + assert isinstance(data, np.ndarray) + + def wrap_function(func): + # https://github.com/numpy/numpy/issues/8352 + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + if isinstance(result, str): + result = np.array(result, dtype=object) + return result + + return wrapper + + return np.apply_along_axis(wrap_function(func), axis, data, *args, **kwargs) + + +class MockEngineDecorator: + __pandas_udf__ = MockExecutionEngine + + @pytest.fixture def int_frame_const_col(): """ @@ -35,7 +88,13 @@ def int_frame_const_col(): return df -@pytest.fixture(params=["python", pytest.param("numba", marks=pytest.mark.single_cpu)]) +@pytest.fixture( + params=[ + "python", + pytest.param("numba", marks=pytest.mark.single_cpu), + MockEngineDecorator, + ] +) def engine(request): if request.param == "numba": pytest.importorskip("numba") @@ -1079,12 +1138,21 @@ def test_result_type_broadcast(int_frame_const_col, request, engine): mark = pytest.mark.xfail(reason="numba engine doesn't support list return") request.node.add_marker(mark) df = int_frame_const_col - # broadcast result - result = df.apply( - lambda x: [1, 2, 3], axis=1, result_type="broadcast", engine=engine - ) - expected = df.copy() - tm.assert_frame_equal(result, expected) + if engine is MockEngineDecorator: + with pytest.raises( + NotImplementedError, + match="result_type='broadcast' only implemented for the default engine", + ): + df.apply( + lambda x: [1, 2, 3], axis=1, result_type="broadcast", engine=engine + ) + else: + # broadcast result + result = df.apply( + lambda x: [1, 2, 3], axis=1, result_type="broadcast", engine=engine + ) + expected = df.copy() + tm.assert_frame_equal(result, expected) def test_result_type_broadcast_series_func(int_frame_const_col, engine, request): @@ -1097,14 +1165,27 @@ def test_result_type_broadcast_series_func(int_frame_const_col, engine, request) request.node.add_marker(mark) df = int_frame_const_col columns = ["other", "col", "names"] - result = df.apply( - lambda x: Series([1, 2, 3], index=columns), - axis=1, - result_type="broadcast", - engine=engine, - ) - expected = df.copy() - tm.assert_frame_equal(result, expected) + + if engine is MockEngineDecorator: + with pytest.raises( + NotImplementedError, + match="result_type='broadcast' only implemented for the default engine", + ): + df.apply( + lambda x: Series([1, 2, 3], index=columns), + axis=1, + result_type="broadcast", + engine=engine, + ) + else: + result = df.apply( + lambda x: Series([1, 2, 3], index=columns), + axis=1, + result_type="broadcast", + engine=engine, + ) + expected = df.copy() + tm.assert_frame_equal(result, expected) def test_result_type_series_result(int_frame_const_col, engine, request): @@ -1791,3 +1872,9 @@ def test_agg_dist_like_and_nonunique_columns(): result = df.agg({"A": "count"}) expected = df["A"].count() tm.assert_series_equal(result, expected) + + +@pytest.mark.parametrize("engine_name", ["unknown", 25]) +def test_wrong_engine(engine_name): + with pytest.raises(ValueError, match="Unknown engine "): + DataFrame().apply(lambda x: x, engine=engine_name)