diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index 5a36cff7908f0..ab9c46fd2bf0b 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -252,4 +252,22 @@ def time_groupby_mean(self, engine): self.gb_ewm.mean(engine=engine) +def table_method_func(x): + return np.sum(x, axis=0) + 1 + + +class TableMethod: + + params = ["single", "table"] + param_names = ["method"] + + def setup(self, method): + self.df = pd.DataFrame(np.random.randn(10, 1000)) + + def time_apply(self, method): + self.df.rolling(2, method=method).apply( + table_method_func, raw=True, engine="numba" + ) + + from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/doc/source/user_guide/window.rst b/doc/source/user_guide/window.rst index 05f8be091fa25..08641bc5b17ae 100644 --- a/doc/source/user_guide/window.rst +++ b/doc/source/user_guide/window.rst @@ -37,14 +37,14 @@ pandas supports 4 types of windowing operations: #. Expanding window: Accumulating window over the values. #. Exponentially Weighted window: Accumulating and exponentially weighted window over the values. -============================= ================= =========================== =========================== ======================== -Concept Method Returned Object Supports time-based windows Supports chained groupby -============================= ================= =========================== =========================== ======================== -Rolling window ``rolling`` ``Rolling`` Yes Yes -Weighted window ``rolling`` ``Window`` No No -Expanding window ``expanding`` ``Expanding`` No Yes -Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) -============================= ================= =========================== =========================== ======================== +============================= ================= =========================== =========================== ======================== =================================== +Concept Method Returned Object Supports time-based windows Supports chained groupby Supports table method +============================= ================= =========================== =========================== ======================== =================================== +Rolling window ``rolling`` ``Rolling`` Yes Yes Yes (as of version 1.3) +Weighted window ``rolling`` ``Window`` No No No +Expanding window ``expanding`` ``Expanding`` No Yes Yes (as of version 1.3) +Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) No +============================= ================= =========================== =========================== ======================== =================================== As noted above, some operations support specifying a window based on a time offset: @@ -76,6 +76,29 @@ which will first group the data by the specified keys and then perform a windowi to compute the rolling sums to preserve accuracy as much as possible. +.. versionadded:: 1.3 + +Some windowing operations also support the ``method='table'`` option in the constructor which +performs the windowing operaion over an entire :class:`DataFrame` instead of a single column or row at a time. +This can provide a useful performance benefit for a :class:`DataFrame` with many columns or rows +(with the corresponding ``axis`` argument) or the ability to utilize other columns during the windowing +operation. The ``method='table'`` option can only be used if ``engine='numba'`` is specified +in the corresponding method call. + +For example, a `weighted mean `__ calculation can +be calculated with :meth:`~Rolling.apply` by specifying a separate column of weights. + +.. ipython:: python + + def weighted_mean(x): + arr = np.ones((1, x.shape[1])) + arr[:, :2] = (x[:, :2] * x[:, 2]).sum(axis=0) / x[:, 2].sum() + return arr + + df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) + df.rolling(2, method="table", min_periods=0).apply(weighted_mean, raw=True, engine="numba") # noqa:E501 + + All windowing operations support a ``min_periods`` argument that dictates the minimum amount of non-``np.nan`` values a window must have; otherwise, the resulting value is ``np.nan``. ``min_peridos`` defaults to 1 for time-based windows and ``window`` for fixed windows diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index c7573ee860744..b41931a803053 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -33,6 +33,11 @@ For example: storage_options=headers ) +.. _whatsnew_130.window_method_table: + +:class:`Rolling` and :class:`Expanding` now support a ``method`` argument with a +``'table'`` option that performs the windowing operation over an entire :class:`DataFrame`. +See ref:`window.overview` for performance and functional benefits. (:issue:`15095`) .. _whatsnew_130.enhancements.other: diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 4f588075bc830..bdb28c10a0ad2 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -10996,6 +10996,7 @@ def rolling( on: Optional[str] = None, axis: Axis = 0, closed: Optional[str] = None, + method: str = "single", ): axis = self._get_axis_number(axis) @@ -11009,6 +11010,7 @@ def rolling( on=on, axis=axis, closed=closed, + method=method, ) return Rolling( @@ -11020,12 +11022,17 @@ def rolling( on=on, axis=axis, closed=closed, + method=method, ) @final @doc(Expanding) def expanding( - self, min_periods: int = 1, center: Optional[bool_t] = None, axis: Axis = 0 + self, + min_periods: int = 1, + center: Optional[bool_t] = None, + axis: Axis = 0, + method: str = "single", ) -> Expanding: axis = self._get_axis_number(axis) if center is not None: @@ -11037,7 +11044,9 @@ def expanding( else: center = False - return Expanding(self, min_periods=min_periods, center=center, axis=axis) + return Expanding( + self, min_periods=min_periods, center=center, axis=axis, method=method + ) @final @doc(ExponentialMovingWindow) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index bc0124f83c70f..85118549300ca 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -242,6 +242,7 @@ def __init__( self.on = None self.center = False self.closed = None + self.method = "single" if times is not None: if isinstance(times, str): times = self._selected_obj[times] diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index f5f4dba59965c..81aa6699c3c61 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -24,6 +24,14 @@ class Expanding(RollingAndExpandingMixin): center : bool, default False Set the labels at the center of the window. axis : int or str, default 0 + method : str {'single', 'table'}, default 'single' + Execute the rolling operation per single column or row (``'single'``) + or over the entire object (``'table'``). + + This argument is only implemented when specifying ``engine='numba'`` + in the method call. + + .. versionadded:: 1.3.0 Returns ------- @@ -59,10 +67,14 @@ class Expanding(RollingAndExpandingMixin): 4 7.0 """ - _attributes = ["min_periods", "center", "axis"] + _attributes = ["min_periods", "center", "axis", "method"] - def __init__(self, obj, min_periods=1, center=None, axis=0, **kwargs): - super().__init__(obj=obj, min_periods=min_periods, center=center, axis=axis) + def __init__( + self, obj, min_periods=1, center=None, axis=0, method="single", **kwargs + ): + super().__init__( + obj=obj, min_periods=min_periods, center=center, axis=axis, method=method + ) def _get_window_indexer(self) -> BaseIndexer: """ diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index 2bc701a16f452..46b47b7e988c4 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -17,6 +17,7 @@ def generate_numba_apply_func( kwargs: Dict[str, Any], func: Callable[..., Scalar], engine_kwargs: Optional[Dict[str, bool]], + name: str, ): """ Generate a numba jitted apply function specified by values from engine_kwargs. @@ -37,6 +38,8 @@ def generate_numba_apply_func( function to be applied to each window and will be JITed engine_kwargs : dict dictionary of arguments to be passed into numba.jit + name: str + name of the caller (Rolling/Expanding) Returns ------- @@ -44,7 +47,7 @@ def generate_numba_apply_func( """ nopython, nogil, parallel = get_jit_arguments(engine_kwargs, kwargs) - cache_key = (func, "rolling_apply") + cache_key = (func, f"{name}_apply_single") if cache_key in NUMBA_FUNC_CACHE: return NUMBA_FUNC_CACHE[cache_key] @@ -153,3 +156,67 @@ def groupby_ewma( return result return groupby_ewma + + +def generate_numba_table_func( + args: Tuple, + kwargs: Dict[str, Any], + func: Callable[..., np.ndarray], + engine_kwargs: Optional[Dict[str, bool]], + name: str, +): + """ + Generate a numba jitted function to apply window calculations table-wise. + + Func will be passed a M window size x N number of columns array, and + must return a 1 x N number of columns array. Func is intended to operate + row-wise, but the result will be transposed for axis=1. + + 1. jit the user's function + 2. Return a rolling apply function with the jitted function inline + + Parameters + ---------- + args : tuple + *args to be passed into the function + kwargs : dict + **kwargs to be passed into the function + func : function + function to be applied to each window and will be JITed + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + name : str + caller (Rolling/Expanding) and original method name for numba cache key + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs, kwargs) + + cache_key = (func, f"{name}_table") + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba_func = jit_user_function(func, nopython, nogil, parallel) + numba = import_optional_dependency("numba") + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def roll_table( + values: np.ndarray, begin: np.ndarray, end: np.ndarray, minimum_periods: int + ): + result = np.empty(values.shape) + min_periods_mask = np.empty(values.shape) + for i in numba.prange(len(result)): + start = begin[i] + stop = end[i] + window = values[start:stop] + count_nan = np.sum(np.isnan(window), axis=0) + sub_result = numba_func(window, *args) + nan_mask = len(window) - count_nan >= minimum_periods + min_periods_mask[i, :] = nan_mask + result[i, :] = sub_result + result = np.where(min_periods_mask, result, np.nan) + return result + + return roll_table diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 5e5b810b1ff4a..053c960cc5cbd 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -64,7 +64,10 @@ GroupbyIndexer, VariableWindowIndexer, ) -from pandas.core.window.numba_ import generate_numba_apply_func +from pandas.core.window.numba_ import ( + generate_numba_apply_func, + generate_numba_table_func, +) if TYPE_CHECKING: from pandas import DataFrame, Series @@ -82,6 +85,7 @@ class BaseWindow(ShallowMixin, SelectionMixin): "axis", "on", "closed", + "method", ] exclusions: Set[str] = set() @@ -95,6 +99,7 @@ def __init__( axis: Axis = 0, on: Optional[Union[str, Index]] = None, closed: Optional[str] = None, + method: str = "single", **kwargs, ): @@ -107,6 +112,7 @@ def __init__( self.center = center self.win_type = win_type self.axis = obj._get_axis_number(axis) if axis is not None else None + self.method = method self._win_freq_i8 = None if self.on is None: if self.axis == 0: @@ -160,6 +166,8 @@ def validate(self) -> None: f"{type(self.window).__name__} does not implement " f"the correct signature for get_window_bounds" ) + if self.method not in ["table", "single"]: + raise ValueError("method must be 'table' or 'single") def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: """ @@ -384,6 +392,26 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: self._insert_on_column(out, obj) return out + def _apply_tablewise( + self, homogeneous_func: Callable[..., ArrayLike], name: Optional[str] = None + ) -> FrameOrSeriesUnion: + if self._selected_obj.ndim == 1: + raise ValueError("method='table' not applicable for Series objects.") + obj = self._create_data(self._selected_obj) + values = self._prep_values(obj.to_numpy()) + values = values.T if self.axis == 1 else values + result = homogeneous_func(values) + result = result.T if self.axis == 1 else result + out = obj._constructor(result, index=obj.index, columns=obj.columns) + + if out.shape[1] == 0 and obj.shape[1] > 0: + raise DataError("No numeric types to aggregate") + elif out.shape[1] == 0: + return obj.astype("float64") + + self._insert_on_column(out, obj) + return out + def _apply( self, func: Callable[..., Any], @@ -432,7 +460,7 @@ def calc(x): return func(x, start, end, min_periods) with np.errstate(all="ignore"): - if values.ndim > 1: + if values.ndim > 1 and self.method == "single": result = np.apply_along_axis(calc, self.axis, values) else: result = calc(values) @@ -443,7 +471,10 @@ def calc(x): return result - return self._apply_blockwise(homogeneous_func, name) + if self.method == "single": + return self._apply_blockwise(homogeneous_func, name) + else: + return self._apply_tablewise(homogeneous_func, name) def aggregate(self, func, *args, **kwargs): result, how = aggregate(self, func, *args, **kwargs) @@ -863,6 +894,14 @@ class Window(BaseWindow): .. versionchanged:: 1.2.0 The closed parameter with fixed windows is now supported. + method : str {'single', 'table'}, default 'single' + Execute the rolling operation per single column or row (``'single'``) + or over the entire object (``'table'``). + + This argument is only implemented when specifying ``engine='numba'`` + in the method call. + + .. versionadded:: 1.3.0 Returns ------- @@ -1004,6 +1043,9 @@ def validate(self): elif not is_integer(self.window) or self.window < 0: raise ValueError("window must be an integer 0 or greater") + if self.method != "single": + raise NotImplementedError("'single' is the only supported method type.") + def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray: """ Center the result in the window for weighted rolling aggregations. @@ -1274,8 +1316,17 @@ def apply( if maybe_use_numba(engine): if raw is False: raise ValueError("raw must be `True` when using the numba engine") - apply_func = generate_numba_apply_func(args, kwargs, func, engine_kwargs) - numba_cache_key = (func, "rolling_apply") + caller_name = type(self).__name__ + if self.method == "single": + apply_func = generate_numba_apply_func( + args, kwargs, func, engine_kwargs, caller_name + ) + numba_cache_key = (func, f"{caller_name}_apply_single") + else: + apply_func = generate_numba_table_func( + args, kwargs, func, engine_kwargs, f"{caller_name}_apply" + ) + numba_cache_key = (func, f"{caller_name}_apply_table") elif engine in ("cython", None): if engine_kwargs is not None: raise ValueError("cython engine does not accept engine_kwargs") diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index e890108b22c3e..4d22495e6c69a 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -63,7 +63,7 @@ def func_2(x): tm.assert_series_equal(result, expected) # func_1 should be in the cache now - assert (func_1, "rolling_apply") in NUMBA_FUNC_CACHE + assert (func_1, "Rolling_apply_single") in NUMBA_FUNC_CACHE result = roll.apply( func_2, engine="numba", engine_kwargs=engine_kwargs, raw=True @@ -121,3 +121,69 @@ def test_invalid_kwargs_nopython(): Series(range(1)).rolling(1).apply( lambda x: x, kwargs={"a": 1}, engine="numba", raw=True ) + + +@td.skip_if_no("numba", "0.46.0") +@pytest.mark.filterwarnings("ignore:\\nThe keyword argument") +# Filter warnings when parallel=True and the function can't be parallelized by Numba +class TestTableMethod: + def test_table_series_valueerror(self): + def f(x): + return np.sum(x, axis=0) + 1 + + with pytest.raises( + ValueError, match="method='table' not applicable for Series objects." + ): + Series(range(1)).rolling(1, method="table").apply( + f, engine="numba", raw=True + ) + + def test_table_method_rolling(self, axis, nogil, parallel, nopython): + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + + def f(x): + return np.sum(x, axis=0) + 1 + + df = DataFrame(np.eye(3)) + result = df.rolling(2, method="table", axis=axis, min_periods=0).apply( + f, raw=True, engine_kwargs=engine_kwargs, engine="numba" + ) + expected = df.rolling(2, method="single", axis=axis, min_periods=0).apply( + f, raw=True, engine_kwargs=engine_kwargs, engine="numba" + ) + tm.assert_frame_equal(result, expected) + + def test_table_method_rolling_weighted_mean(self): + def weighted_mean(x): + arr = np.ones((1, x.shape[1])) + arr[:, :2] = (x[:, :2] * x[:, 2]).sum(axis=0) / x[:, 2].sum() + return arr + + df = DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) + result = df.rolling(2, method="table", min_periods=0).apply( + weighted_mean, raw=True, engine="numba" + ) + expected = DataFrame( + [ + [1.0, 2.0, 1.0], + [1.8, 2.0, 1.0], + [3.333333, 2.333333, 1.0], + [1.555556, 7, 1.0], + ] + ) + tm.assert_frame_equal(result, expected) + + def test_table_method_expanding(self, axis, nogil, parallel, nopython): + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + + def f(x): + return np.sum(x, axis=0) + 1 + + df = DataFrame(np.eye(3)) + result = df.expanding(method="table", axis=axis).apply( + f, raw=True, engine_kwargs=engine_kwargs, engine="numba" + ) + expected = df.expanding(method="single", axis=axis).apply( + f, raw=True, engine_kwargs=engine_kwargs, engine="numba" + ) + tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index f75700a48c795..84056299093cf 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -1123,3 +1123,8 @@ def test_rolling_skew_kurt_large_value_range(method, values): result = getattr(s.rolling(4), method)() expected = Series([np.nan] * 3 + values) tm.assert_series_equal(result, expected) + + +def test_invalid_method(): + with pytest.raises(ValueError, match="method must be 'table' or 'single"): + Series(range(1)).rolling(1, method="foo") diff --git a/pandas/tests/window/test_win_type.py b/pandas/tests/window/test_win_type.py index f09feef54fa16..4b1028e165c80 100644 --- a/pandas/tests/window/test_win_type.py +++ b/pandas/tests/window/test_win_type.py @@ -120,6 +120,14 @@ def b(x): tm.assert_frame_equal(result, expected) +@td.skip_if_no_scipy +def test_win_type_with_method_invalid(): + with pytest.raises( + NotImplementedError, match="'single' is the only supported method type." + ): + Series(range(1)).rolling(1, win_type="triang", method="table") + + @td.skip_if_no_scipy @pytest.mark.parametrize("arg", [2000000000, "2s", Timedelta("2s")]) def test_consistent_win_type_freq(arg):