Skip to content

BUG: various groupby ewm times issues #40952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/source/whatsnew/v1.3.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,9 @@ Groupby/resample/rolling
- Bug in :class:`core.window.RollingGroupby` where ``as_index=False`` argument in ``groupby`` was ignored (:issue:`39433`)
- Bug in :meth:`.GroupBy.any` and :meth:`.GroupBy.all` raising ``ValueError`` when using with nullable type columns holding ``NA`` even with ``skipna=True`` (:issue:`40585`)
- Bug in :meth:`GroupBy.cummin` and :meth:`GroupBy.cummax` incorrectly rounding integer values near the ``int64`` implementations bounds (:issue:`40767`)
- Bug in :meth:`core.window.ewm.ExponentialMovingWindowGroupby.mean` where the times argument was ignored in the numba implementation (:issue:`40951`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the numba implementation -> when engine='numba'

- Bug in :meth:`core.window.ewm.ExponentialMovingWindowGroupby.mean` where the wrong times were used in the cython implementation in case of multiple groups (:issue:`40951`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove in the cython implementation

- Bug in :class:`core.window.ewm.ExponentialMovingWindowGroupby` where the times vector and values became out of sync for non-trivial groups (:issue:`40951`)

Reshaping
^^^^^^^^^
Expand Down
5 changes: 3 additions & 2 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end,

cdef:
Py_ssize_t i, j, s, e, nobs, win_size, N = len(vals), M = len(start)
const float64_t[:] sub_vals
const float64_t[:] sub_deltas, sub_vals
ndarray[float64_t] sub_output, output = np.empty(N, dtype=float)
float64_t alpha, old_wt_factor, new_wt, weighted_avg, old_wt, cur
bint is_observation
Expand All @@ -1511,6 +1511,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end,
s = start[j]
e = end[j]
sub_vals = vals[s:e]
sub_deltas = deltas[s:e - 1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not s:e? (like sub_vals)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deltas are the (scaled) np.diff of the times vector. so len(deltas) = len(times) - 1 = len(vals) - 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk can you add this comment on the shape of sub_vals vs sub_deltas for future readers

win_size = len(sub_vals)
sub_output = np.empty(win_size, dtype=float)

Expand All @@ -1528,7 +1529,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end,
if weighted_avg == weighted_avg:

if is_observation or not ignore_na:
old_wt *= old_wt_factor ** deltas[i - 1]
old_wt *= old_wt_factor ** sub_deltas[i - 1]
if is_observation:

# avoid numerical errors on constant series
Expand Down
41 changes: 28 additions & 13 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,7 @@ def __init__(
)
if isna(self.times).any():
raise ValueError("Cannot convert NaT values to integer")
# error: Item "str" of "Union[str, ndarray, FrameOrSeries, None]" has no
# attribute "view"
# error: Item "None" of "Union[str, ndarray, FrameOrSeries, None]" has no
# attribute "view"
_times = np.asarray(
self.times.view(np.int64), dtype=np.float64 # type: ignore[union-attr]
)
_halflife = float(Timedelta(self.halflife).value)
self._deltas = np.diff(_times) / _halflife
self._calculate_deltas(self.times, self.halflife)
# Halflife is no longer applicable when calculating COM
# But allow COM to still be calculated if the user passes other decay args
if common.count_not_none(self.com, self.span, self.alpha) > 0:
Expand All @@ -303,6 +295,21 @@ def __init__(
self.alpha,
)

def _calculate_deltas(
self,
times: str | np.ndarray | FrameOrSeries | None,
halflife: float | TimedeltaConvertibleTypes | None,
) -> None:
# error: Item "str" of "Union[str, ndarray, FrameOrSeries, None]" has no
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give a doc-string, describe the params and what this is doing

# attribute "view"
# error: Item "None" of "Union[str, ndarray, FrameOrSeries, None]" has no
# attribute "view"
_times = np.asarray(
times.view(np.int64), dtype=np.float64 # type: ignore[union-attr]
)
_halflife = float(Timedelta(halflife).value)
self._deltas = np.diff(_times) / _halflife
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you return _deltas and assign in the caller (change the return type as well)


def _get_window_indexer(self) -> BaseIndexer:
"""
Return an indexer class that will compute the window start and end bounds
Expand Down Expand Up @@ -585,6 +592,17 @@ class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow)

_attributes = ExponentialMovingWindow._attributes + BaseWindowGroupby._attributes

def __init__(self, obj, *args, _grouper=None, **kwargs):
super().__init__(obj, *args, _grouper=_grouper, **kwargs)

if not obj.empty and self.times is not None:
# sort the times and recalculate the deltas according to the groups
groupby_order = np.concatenate(list(self._grouper.indices.values()))
self._calculate_deltas(
self.times.take(groupby_order), # type: ignore[union-attr]
self.halflife,
)

def _get_window_indexer(self) -> GroupbyIndexer:
"""
Return an indexer class that will compute the window start and end bounds
Expand Down Expand Up @@ -628,10 +646,7 @@ def mean(self, engine=None, engine_kwargs=None):
"""
if maybe_use_numba(engine):
groupby_ewma_func = generate_numba_groupby_ewma_func(
engine_kwargs,
self._com,
self.adjust,
self.ignore_na,
engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas
)
return self._apply(
groupby_ewma_func,
Expand Down
3 changes: 2 additions & 1 deletion pandas/core/window/numba_.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def generate_numba_groupby_ewma_func(
com: float,
adjust: bool,
ignore_na: bool,
deltas: np.ndarray,
):
"""
Generate a numba jitted groupby ewma function specified by values
Expand Down Expand Up @@ -141,7 +142,7 @@ def groupby_ewma(

if is_observation or not ignore_na:

old_wt *= old_wt_factor
old_wt *= old_wt_factor ** deltas[start + j - 1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to use delta[i-1] with values[i]. cur = window[j] = values[start + j] (and j starts at 1).

I could introduce a new variable delta_window = deltas[start:stop-1] as in the cython implementation if you think that would make it clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no its fine, if you'd just add some documentation to this effect for future readers

if is_observation:

# avoid numerical errors on constant series
Expand Down
26 changes: 26 additions & 0 deletions pandas/tests/window/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Series,
bdate_range,
notna,
to_datetime,
)


Expand Down Expand Up @@ -302,6 +303,31 @@ def frame():
)


@pytest.fixture
def times_frame():
"""Frame for testing times argument in EWM groupby."""
return DataFrame(
{
"A": ["a", "b", "c", "a", "b", "c", "a", "b", "c", "a"],
"B": [0, 0, 0, 1, 1, 1, 2, 2, 2, 3],
"C": to_datetime(
[
"2020-01-01",
"2020-01-01",
"2020-01-01",
"2020-01-02",
"2020-01-10",
"2020-01-22",
"2020-01-03",
"2020-01-23",
"2020-01-23",
"2020-01-04",
]
),
}
)


@pytest.fixture
def series():
"""Make mocked series as fixture."""
Expand Down
60 changes: 60 additions & 0 deletions pandas/tests/window/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,3 +926,63 @@ def test_pairwise_methods(self, method, expected_data):

expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)())
tm.assert_frame_equal(result, expected)

def test_times(self, times_frame):
# GH 40951
halflife = "23 days"
result = times_frame.groupby("A").ewm(halflife=halflife, times="C").mean()
expected = DataFrame(
{
"B": [
0.0,
0.507534,
1.020088,
1.537661,
0.0,
0.567395,
1.221209,
0.0,
0.653141,
1.195003,
]
},
index=MultiIndex.from_tuples(
[
("a", 0),
("a", 3),
("a", 6),
("a", 9),
("b", 1),
("b", 4),
("b", 7),
("c", 2),
("c", 5),
("c", 8),
],
names=["A", None],
),
)
tm.assert_frame_equal(result, expected)

def test_times_vs_apply(self, times_frame):
# GH 40951
halflife = "23 days"
result = times_frame.groupby("A").ewm(halflife=halflife, times="C").mean()
expected = (
times_frame.groupby("A")
.apply(lambda x: x.ewm(halflife=halflife, times="C").mean())
.iloc[[0, 3, 6, 9, 1, 4, 7, 2, 5, 8]]
.reset_index(drop=True)
)
tm.assert_frame_equal(result.reset_index(drop=True), expected)

def test_times_array(self, times_frame):
# GH 40951
halflife = "23 days"
result = times_frame.groupby("A").ewm(halflife=halflife, times="C").mean()
expected = (
times_frame.groupby("A")
.ewm(halflife=halflife, times=times_frame["C"].values)
.mean()
)
tm.assert_frame_equal(result, expected)
25 changes: 25 additions & 0 deletions pandas/tests/window/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DataFrame,
Series,
option_context,
to_datetime,
)
import pandas._testing as tm
from pandas.core.util.numba_ import NUMBA_FUNC_CACHE
Expand Down Expand Up @@ -145,6 +146,30 @@ def test_cython_vs_numba(self, nogil, parallel, nopython, ignore_na, adjust):

tm.assert_frame_equal(result, expected)

def test_cython_vs_numba_times(self, nogil, parallel, nopython, ignore_na):
# GH 40951
halflife = "23 days"
times = to_datetime(
[
"2020-01-01",
"2020-01-01",
"2020-01-02",
"2020-01-10",
"2020-02-23",
"2020-01-03",
]
)
df = DataFrame({"A": ["a", "b", "a", "b", "b", "a"], "B": [0, 0, 1, 1, 2, 2]})
gb_ewm = df.groupby("A").ewm(
halflife=halflife, adjust=True, ignore_na=ignore_na, times=times
)

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
result = gb_ewm.mean(engine="numba", engine_kwargs=engine_kwargs)
expected = gb_ewm.mean(engine="cython")

tm.assert_frame_equal(result, expected)


@td.skip_if_no("numba", "0.46.0")
def test_use_global_config():
Expand Down