diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index 38a1802340c69..065390820d7e6 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -787,6 +787,9 @@ Groupby/resample/rolling - Bug in :class:`core.window.ewm.ExponentialMovingWindow` when calling ``__getitem__`` would incorrectly raise a ``ValueError`` when providing ``times`` (:issue:`40164`) - Bug in :class:`core.window.ewm.ExponentialMovingWindow` when calling ``__getitem__`` would not retain ``com``, ``span``, ``alpha`` or ``halflife`` attributes (:issue:`40164`) - :class:`core.window.ewm.ExponentialMovingWindow` now raises a ``NotImplementedError`` when specifying ``times`` with ``adjust=False`` due to an incorrect calculation (:issue:`40098`) +- Bug in :meth:`core.window.ewm.ExponentialMovingWindowGroupby.mean` where the times argument was ignored when ``engine='numba'`` (:issue:`40951`) +- Bug in :meth:`core.window.ewm.ExponentialMovingWindowGroupby.mean` where the wrong times were used in case of multiple groups (:issue:`40951`) +- Bug in :class:`core.window.ewm.ExponentialMovingWindowGroupby` where the times vector and values became out of sync for non-trivial groups (:issue:`40951`) - Bug in :meth:`Series.asfreq` and :meth:`DataFrame.asfreq` dropping rows when the index is not sorted (:issue:`39805`) - Bug in aggregation functions for :class:`DataFrame` not respecting ``numeric_only`` argument when ``level`` keyword was given (:issue:`40660`) - Bug in :class:`core.window.RollingGroupby` where ``as_index=False`` argument in ``groupby`` was ignored (:issue:`39433`) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 46041b6a37a17..8c8629ad6f032 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1485,8 +1485,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, com : float64 adjust : bool ignore_na : bool - times : ndarray (float64 type) - halflife : float64 + deltas : ndarray (float64 type) Returns ------- @@ -1495,7 +1494,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, cdef: Py_ssize_t i, j, s, e, nobs, win_size, N = len(vals), M = len(start) - const float64_t[:] sub_vals + const float64_t[:] sub_deltas, sub_vals ndarray[float64_t] sub_output, output = np.empty(N, dtype=float) float64_t alpha, old_wt_factor, new_wt, weighted_avg, old_wt, cur bint is_observation @@ -1511,6 +1510,9 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, s = start[j] e = end[j] sub_vals = vals[s:e] + # note that len(deltas) = len(vals) - 1 and deltas[i] is to be used in + # conjunction with vals[i+1] + sub_deltas = deltas[s:e - 1] win_size = len(sub_vals) sub_output = np.empty(win_size, dtype=float) @@ -1528,7 +1530,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end, if weighted_avg == weighted_avg: if is_observation or not ignore_na: - old_wt *= old_wt_factor ** deltas[i - 1] + old_wt *= old_wt_factor ** sub_deltas[i - 1] if is_observation: # avoid numerical errors on constant series diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 67bcdb0a387dd..eee9cb3976e39 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -78,6 +78,38 @@ def get_center_of_mass( return float(comass) +def _calculate_deltas( + times: str | np.ndarray | FrameOrSeries | None, + halflife: float | TimedeltaConvertibleTypes | None, +) -> np.ndarray: + """ + Return the diff of the times divided by the half-life. These values are used in + the calculation of the ewm mean. + + Parameters + ---------- + times : str, np.ndarray, Series, default None + Times corresponding to the observations. Must be monotonically increasing + and ``datetime64[ns]`` dtype. + halflife : float, str, timedelta, optional + Half-life specifying the decay + + Returns + ------- + np.ndarray + Diff of the times divided by the half-life + """ + # error: Item "str" of "Union[str, ndarray, FrameOrSeries, None]" has no + # attribute "view" + # error: Item "None" of "Union[str, ndarray, FrameOrSeries, None]" has no + # attribute "view" + _times = np.asarray( + times.view(np.int64), dtype=np.float64 # type: ignore[union-attr] + ) + _halflife = float(Timedelta(halflife).value) + return np.diff(_times) / _halflife + + class ExponentialMovingWindow(BaseWindow): r""" Provide exponential weighted (EW) functions. @@ -268,15 +300,7 @@ def __init__( ) if isna(self.times).any(): raise ValueError("Cannot convert NaT values to integer") - # error: Item "str" of "Union[str, ndarray, FrameOrSeries, None]" has no - # attribute "view" - # error: Item "None" of "Union[str, ndarray, FrameOrSeries, None]" has no - # attribute "view" - _times = np.asarray( - self.times.view(np.int64), dtype=np.float64 # type: ignore[union-attr] - ) - _halflife = float(Timedelta(self.halflife).value) - self._deltas = np.diff(_times) / _halflife + self._deltas = _calculate_deltas(self.times, self.halflife) # Halflife is no longer applicable when calculating COM # But allow COM to still be calculated if the user passes other decay args if common.count_not_none(self.com, self.span, self.alpha) > 0: @@ -585,6 +609,17 @@ class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow) _attributes = ExponentialMovingWindow._attributes + BaseWindowGroupby._attributes + def __init__(self, obj, *args, _grouper=None, **kwargs): + super().__init__(obj, *args, _grouper=_grouper, **kwargs) + + if not obj.empty and self.times is not None: + # sort the times and recalculate the deltas according to the groups + groupby_order = np.concatenate(list(self._grouper.indices.values())) + self._deltas = _calculate_deltas( + self.times.take(groupby_order), # type: ignore[union-attr] + self.halflife, + ) + def _get_window_indexer(self) -> GroupbyIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -628,10 +663,7 @@ def mean(self, engine=None, engine_kwargs=None): """ if maybe_use_numba(engine): groupby_ewma_func = generate_numba_groupby_ewma_func( - engine_kwargs, - self._com, - self.adjust, - self.ignore_na, + engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas ) return self._apply( groupby_ewma_func, diff --git a/pandas/core/window/numba_.py b/pandas/core/window/numba_.py index c9107c8ed0aa7..d84dea7ee622c 100644 --- a/pandas/core/window/numba_.py +++ b/pandas/core/window/numba_.py @@ -85,6 +85,7 @@ def generate_numba_groupby_ewma_func( com: float, adjust: bool, ignore_na: bool, + deltas: np.ndarray, ): """ Generate a numba jitted groupby ewma function specified by values @@ -97,6 +98,7 @@ def generate_numba_groupby_ewma_func( com : float adjust : bool ignore_na : bool + deltas : numpy.ndarray Returns ------- @@ -141,7 +143,9 @@ def groupby_ewma( if is_observation or not ignore_na: - old_wt *= old_wt_factor + # note that len(deltas) = len(vals) - 1 and deltas[i] is to be + # used in conjunction with vals[i+1] + old_wt *= old_wt_factor ** deltas[start + j - 1] if is_observation: # avoid numerical errors on constant series diff --git a/pandas/tests/window/conftest.py b/pandas/tests/window/conftest.py index d394a4b2be548..b1f1bb7086149 100644 --- a/pandas/tests/window/conftest.py +++ b/pandas/tests/window/conftest.py @@ -13,6 +13,7 @@ Series, bdate_range, notna, + to_datetime, ) @@ -302,6 +303,31 @@ def frame(): ) +@pytest.fixture +def times_frame(): + """Frame for testing times argument in EWM groupby.""" + return DataFrame( + { + "A": ["a", "b", "c", "a", "b", "c", "a", "b", "c", "a"], + "B": [0, 0, 0, 1, 1, 1, 2, 2, 2, 3], + "C": to_datetime( + [ + "2020-01-01", + "2020-01-01", + "2020-01-01", + "2020-01-02", + "2020-01-10", + "2020-01-22", + "2020-01-03", + "2020-01-23", + "2020-01-23", + "2020-01-04", + ] + ), + } + ) + + @pytest.fixture def series(): """Make mocked series as fixture.""" diff --git a/pandas/tests/window/test_groupby.py b/pandas/tests/window/test_groupby.py index 51a6288598c32..5d7fc50620ef8 100644 --- a/pandas/tests/window/test_groupby.py +++ b/pandas/tests/window/test_groupby.py @@ -926,3 +926,63 @@ def test_pairwise_methods(self, method, expected_data): expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)()) tm.assert_frame_equal(result, expected) + + def test_times(self, times_frame): + # GH 40951 + halflife = "23 days" + result = times_frame.groupby("A").ewm(halflife=halflife, times="C").mean() + expected = DataFrame( + { + "B": [ + 0.0, + 0.507534, + 1.020088, + 1.537661, + 0.0, + 0.567395, + 1.221209, + 0.0, + 0.653141, + 1.195003, + ] + }, + index=MultiIndex.from_tuples( + [ + ("a", 0), + ("a", 3), + ("a", 6), + ("a", 9), + ("b", 1), + ("b", 4), + ("b", 7), + ("c", 2), + ("c", 5), + ("c", 8), + ], + names=["A", None], + ), + ) + tm.assert_frame_equal(result, expected) + + def test_times_vs_apply(self, times_frame): + # GH 40951 + halflife = "23 days" + result = times_frame.groupby("A").ewm(halflife=halflife, times="C").mean() + expected = ( + times_frame.groupby("A") + .apply(lambda x: x.ewm(halflife=halflife, times="C").mean()) + .iloc[[0, 3, 6, 9, 1, 4, 7, 2, 5, 8]] + .reset_index(drop=True) + ) + tm.assert_frame_equal(result.reset_index(drop=True), expected) + + def test_times_array(self, times_frame): + # GH 40951 + halflife = "23 days" + result = times_frame.groupby("A").ewm(halflife=halflife, times="C").mean() + expected = ( + times_frame.groupby("A") + .ewm(halflife=halflife, times=times_frame["C"].values) + .mean() + ) + tm.assert_frame_equal(result, expected) diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index f64d242a4e820..06b34201e0dba 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -8,6 +8,7 @@ DataFrame, Series, option_context, + to_datetime, ) import pandas._testing as tm from pandas.core.util.numba_ import NUMBA_FUNC_CACHE @@ -145,6 +146,30 @@ def test_cython_vs_numba(self, nogil, parallel, nopython, ignore_na, adjust): tm.assert_frame_equal(result, expected) + def test_cython_vs_numba_times(self, nogil, parallel, nopython, ignore_na): + # GH 40951 + halflife = "23 days" + times = to_datetime( + [ + "2020-01-01", + "2020-01-01", + "2020-01-02", + "2020-01-10", + "2020-02-23", + "2020-01-03", + ] + ) + df = DataFrame({"A": ["a", "b", "a", "b", "b", "a"], "B": [0, 0, 1, 1, 2, 2]}) + gb_ewm = df.groupby("A").ewm( + halflife=halflife, adjust=True, ignore_na=ignore_na, times=times + ) + + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + result = gb_ewm.mean(engine="numba", engine_kwargs=engine_kwargs) + expected = gb_ewm.mean(engine="cython") + + tm.assert_frame_equal(result, expected) + @td.skip_if_no("numba", "0.46.0") def test_use_global_config():