diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py index d35770b720f7a..97294fc02834b 100644 --- a/asv_bench/benchmarks/rolling.py +++ b/asv_bench/benchmarks/rolling.py @@ -296,5 +296,8 @@ def time_apply(self, method): table_method_func, raw=True, engine="numba" ) + def time_ewm_mean(self, method): + self.df.ewm(1, method=method).mean(engine="numba") + from .pandas_vb_common import setup # noqa: F401 isort:skip diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 764a50e13586a..e76bb6013c3e7 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -30,6 +30,7 @@ enhancement2 Other enhancements ^^^^^^^^^^^^^^^^^^ - :meth:`Series.sample`, :meth:`DataFrame.sample`, and :meth:`.GroupBy.sample` now accept a ``np.random.Generator`` as input to ``random_state``. A generator will be more performant, especially with ``replace=False`` (:issue:`38100`) +- :meth:`Series.ewm`, :meth:`DataFrame.ewm`, now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire :class:`DataFrame`. See :ref:`Window Overview ` for performance and functional benefits (:issue:`42273`) - .. --------------------------------------------------------------------------- diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 4019ef8537294..da3f041e61a2b 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -10846,6 +10846,7 @@ def ewm( ignore_na: bool_t = False, axis: Axis = 0, times: str | np.ndarray | FrameOrSeries | None = None, + method: str = "single", ) -> ExponentialMovingWindow: axis = self._get_axis_number(axis) # error: Value of type variable "FrameOrSeries" of "ExponentialMovingWindow" @@ -10861,6 +10862,7 @@ def ewm( ignore_na=ignore_na, axis=axis, times=times, + method=method, ) # ---------------------------------------------------------------------- diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 664089fa37b83..ee99692b85432 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -44,7 +44,10 @@ ExponentialMovingWindowIndexer, GroupbyIndexer, ) -from pandas.core.window.numba_ import generate_numba_ewma_func +from pandas.core.window.numba_ import ( + generate_ewma_numba_table_func, + generate_numba_ewma_func, +) from pandas.core.window.online import ( EWMMeanState, generate_online_numba_ewma_func, @@ -204,6 +207,16 @@ class ExponentialMovingWindow(BaseWindow): If 1-D array like, a sequence with the same shape as the observations. Only applicable to ``mean()``. + method : str {'single', 'table'}, default 'single' + Execute the rolling operation per single column or row (``'single'``) + or over the entire object (``'table'``). + + This argument is only implemented when specifying ``engine='numba'`` + in the method call. + + Only applicable to ``mean()`` + + .. versionadded:: 1.4.0 Returns ------- @@ -262,6 +275,7 @@ class ExponentialMovingWindow(BaseWindow): "ignore_na", "axis", "times", + "method", ] def __init__( @@ -276,6 +290,7 @@ def __init__( ignore_na: bool = False, axis: Axis = 0, times: str | np.ndarray | FrameOrSeries | None = None, + method: str = "single", *, selection=None, ): @@ -285,7 +300,7 @@ def __init__( on=None, center=False, closed=None, - method="single", + method=method, axis=axis, selection=selection, ) @@ -441,12 +456,19 @@ def aggregate(self, func, *args, **kwargs): ) def mean(self, *args, engine=None, engine_kwargs=None, **kwargs): if maybe_use_numba(engine): - ewma_func = generate_numba_ewma_func( - engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas - ) + if self.method == "single": + ewma_func = generate_numba_ewma_func( + engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas + ) + numba_cache_key = (lambda x: x, "ewma") + else: + ewma_func = generate_ewma_numba_table_func( + engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas + ) + numba_cache_key = (lambda x: x, "ewma_table") return self._apply( ewma_func, - numba_cache_key=(lambda x: x, "ewma"), + numba_cache_key=numba_cache_key, ) elif engine in ("cython", None): if engine_kwargs is not None: diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index 9d9376e8ba43d..ab1eb9d3a2688 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -250,3 +250,82 @@ def nan_agg_with_axis(table): return result return nan_agg_with_axis + + +def generate_ewma_numba_table_func( + engine_kwargs: dict[str, bool] | None, + com: float, + adjust: bool, + ignore_na: bool, + deltas: np.ndarray, +): + """ + Generate a numba jitted ewma function applied table wise specified + by values from engine_kwargs. + + Parameters + ---------- + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + com : float + adjust : bool + ignore_na : bool + deltas : numpy.ndarray + + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + + cache_key = (lambda x: x, "ewma_table") + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba = import_optional_dependency("numba") + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def ewma_table( + values: np.ndarray, + begin: np.ndarray, + end: np.ndarray, + minimum_periods: int, + ) -> np.ndarray: + alpha = 1.0 / (1.0 + com) + old_wt_factor = 1.0 - alpha + new_wt = 1.0 if adjust else alpha + old_wt = np.ones(values.shape[1]) + + result = np.empty(values.shape) + weighted_avg = values[0].copy() + nobs = (~np.isnan(weighted_avg)).astype(np.int64) + result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) + for i in range(1, len(values)): + cur = values[i] + is_observations = ~np.isnan(cur) + nobs += is_observations.astype(np.int64) + for j in numba.prange(len(cur)): + if not np.isnan(weighted_avg[j]): + if is_observations[j] or not ignore_na: + + # note that len(deltas) = len(vals) - 1 and deltas[i] is to be + # used in conjunction with vals[i+1] + old_wt[j] *= old_wt_factor ** deltas[i - 1] + if is_observations[j]: + # avoid numerical errors on constant series + if weighted_avg[j] != cur[j]: + weighted_avg[j] = ( + (old_wt[j] * weighted_avg[j]) + (new_wt * cur[j]) + ) / (old_wt[j] + new_wt) + if adjust: + old_wt[j] += new_wt + else: + old_wt[j] = 1.0 + elif is_observations[j]: + weighted_avg[j] = cur[j] + + result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) + + return result + + return ewma_table diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 5bc27436fd1d7..a8ec9086e6b02 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -332,3 +332,17 @@ def test_table_method_expanding_methods( engine_kwargs=engine_kwargs, engine="numba" ) tm.assert_frame_equal(result, expected) + + @pytest.mark.parametrize("data", [np.eye(3), np.ones((2, 3)), np.ones((3, 2))]) + def test_table_method_ewm(self, data, axis, nogil, parallel, nopython): + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + + df = DataFrame(data) + + result = df.ewm(com=1, method="table", axis=axis).mean( + engine_kwargs=engine_kwargs, engine="numba" + ) + expected = df.ewm(com=1, method="single", axis=axis).mean( + engine_kwargs=engine_kwargs, engine="numba" + ) + tm.assert_frame_equal(result, expected)