Skip to content

ENH: Add method='table' for EWM.mean #42339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 6, 2021
3 changes: 3 additions & 0 deletions asv_bench/benchmarks/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,5 +296,8 @@ def time_apply(self, method):
table_method_func, raw=True, engine="numba"
)

def time_ewm_mean(self, method):
self.df.ewm(1, method=method).mean(engine="numba")


from .pandas_vb_common import setup # noqa: F401 isort:skip
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enhancement2
Other enhancements
^^^^^^^^^^^^^^^^^^
- :meth:`Series.sample`, :meth:`DataFrame.sample`, and :meth:`.GroupBy.sample` now accept a ``np.random.Generator`` as input to ``random_state``. A generator will be more performant, especially with ``replace=False`` (:issue:`38100`)
- :meth:`Series.ewm`, :meth:`DataFrame.ewm`, now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire :class:`DataFrame`. See :ref:`Window Overview <window.overview>` for performance and functional benefits (:issue:`42273`)
-

.. ---------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10846,6 +10846,7 @@ def ewm(
ignore_na: bool_t = False,
axis: Axis = 0,
times: str | np.ndarray | FrameOrSeries | None = None,
method: str = "single",
) -> ExponentialMovingWindow:
axis = self._get_axis_number(axis)
# error: Value of type variable "FrameOrSeries" of "ExponentialMovingWindow"
Expand All @@ -10861,6 +10862,7 @@ def ewm(
ignore_na=ignore_na,
axis=axis,
times=times,
method=method,
)

# ----------------------------------------------------------------------
Expand Down
34 changes: 28 additions & 6 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
ExponentialMovingWindowIndexer,
GroupbyIndexer,
)
from pandas.core.window.numba_ import generate_numba_ewma_func
from pandas.core.window.numba_ import (
generate_ewma_numba_table_func,
generate_numba_ewma_func,
)
from pandas.core.window.online import (
EWMMeanState,
generate_online_numba_ewma_func,
Expand Down Expand Up @@ -204,6 +207,16 @@ class ExponentialMovingWindow(BaseWindow):
If 1-D array like, a sequence with the same shape as the observations.

Only applicable to ``mean()``.
method : str {'single', 'table'}, default 'single'
Execute the rolling operation per single column or row (``'single'``)
or over the entire object (``'table'``).

This argument is only implemented when specifying ``engine='numba'``
in the method call.

Only applicable to ``mean()``

.. versionadded:: 1.4.0

Returns
-------
Expand Down Expand Up @@ -262,6 +275,7 @@ class ExponentialMovingWindow(BaseWindow):
"ignore_na",
"axis",
"times",
"method",
]

def __init__(
Expand All @@ -276,6 +290,7 @@ def __init__(
ignore_na: bool = False,
axis: Axis = 0,
times: str | np.ndarray | FrameOrSeries | None = None,
method: str = "single",
*,
selection=None,
):
Expand All @@ -285,7 +300,7 @@ def __init__(
on=None,
center=False,
closed=None,
method="single",
method=method,
axis=axis,
selection=selection,
)
Expand Down Expand Up @@ -441,12 +456,19 @@ def aggregate(self, func, *args, **kwargs):
)
def mean(self, *args, engine=None, engine_kwargs=None, **kwargs):
if maybe_use_numba(engine):
ewma_func = generate_numba_ewma_func(
engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas
)
if self.method == "single":
ewma_func = generate_numba_ewma_func(
engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas
)
numba_cache_key = (lambda x: x, "ewma")
else:
ewma_func = generate_ewma_numba_table_func(
engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas
)
numba_cache_key = (lambda x: x, "ewma_table")
return self._apply(
ewma_func,
numba_cache_key=(lambda x: x, "ewma"),
numba_cache_key=numba_cache_key,
)
elif engine in ("cython", None):
if engine_kwargs is not None:
Expand Down
79 changes: 79 additions & 0 deletions pandas/core/window/numba_.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,82 @@ def nan_agg_with_axis(table):
return result

return nan_agg_with_axis


def generate_ewma_numba_table_func(
engine_kwargs: dict[str, bool] | None,
com: float,
adjust: bool,
ignore_na: bool,
deltas: np.ndarray,
):
"""
Generate a numba jitted ewma function applied table wise specified
by values from engine_kwargs.

Parameters
----------
engine_kwargs : dict
dictionary of arguments to be passed into numba.jit
com : float
adjust : bool
ignore_na : bool
deltas : numpy.ndarray

Returns
-------
Numba function
"""
nopython, nogil, parallel = get_jit_arguments(engine_kwargs)

cache_key = (lambda x: x, "ewma_table")
if cache_key in NUMBA_FUNC_CACHE:
return NUMBA_FUNC_CACHE[cache_key]

numba = import_optional_dependency("numba")

@numba.jit(nopython=nopython, nogil=nogil, parallel=parallel)
def ewma_table(
values: np.ndarray,
begin: np.ndarray,
end: np.ndarray,
minimum_periods: int,
) -> np.ndarray:
alpha = 1.0 / (1.0 + com)
old_wt_factor = 1.0 - alpha
new_wt = 1.0 if adjust else alpha
old_wt = np.ones(values.shape[1])

result = np.empty(values.shape)
weighted_avg = values[0].copy()
nobs = (~np.isnan(weighted_avg)).astype(np.int64)
result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)
for i in range(1, len(values)):
cur = values[i]
is_observations = ~np.isnan(cur)
nobs += is_observations.astype(np.int64)
for j in numba.prange(len(cur)):
if not np.isnan(weighted_avg[j]):
if is_observations[j] or not ignore_na:

# note that len(deltas) = len(vals) - 1 and deltas[i] is to be
# used in conjunction with vals[i+1]
old_wt[j] *= old_wt_factor ** deltas[i - 1]
if is_observations[j]:
# avoid numerical errors on constant series
if weighted_avg[j] != cur[j]:
weighted_avg[j] = (
(old_wt[j] * weighted_avg[j]) + (new_wt * cur[j])
) / (old_wt[j] + new_wt)
if adjust:
old_wt[j] += new_wt
else:
old_wt[j] = 1.0
elif is_observations[j]:
weighted_avg[j] = cur[j]

result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan)

return result

return ewma_table
14 changes: 14 additions & 0 deletions pandas/tests/window/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,17 @@ def test_table_method_expanding_methods(
engine_kwargs=engine_kwargs, engine="numba"
)
tm.assert_frame_equal(result, expected)

@pytest.mark.parametrize("data", [np.eye(3), np.ones((2, 3)), np.ones((3, 2))])
def test_table_method_ewm(self, data, axis, nogil, parallel, nopython):
engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}

df = DataFrame(data)

result = df.ewm(com=1, method="table", axis=axis).mean(
engine_kwargs=engine_kwargs, engine="numba"
)
expected = df.ewm(com=1, method="single", axis=axis).mean(
engine_kwargs=engine_kwargs, engine="numba"
)
tm.assert_frame_equal(result, expected)