Skip to content

BUG: various groupby ewm times issues #40952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end,

cdef:
Py_ssize_t i, j, s, e, nobs, win_size, N = len(vals), M = len(start)
const float64_t[:] sub_vals
const float64_t[:] sub_deltas, sub_vals
ndarray[float64_t] sub_output, output = np.empty(N, dtype=float)
float64_t alpha, old_wt_factor, new_wt, weighted_avg, old_wt, cur
bint is_observation
Expand All @@ -1511,6 +1511,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end,
s = start[j]
e = end[j]
sub_vals = vals[s:e]
sub_deltas = deltas[s:e-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: e - 1

win_size = len(sub_vals)
sub_output = np.empty(win_size, dtype=float)

Expand All @@ -1528,7 +1529,7 @@ def ewma(const float64_t[:] vals, const int64_t[:] start, const int64_t[:] end,
if weighted_avg == weighted_avg:

if is_observation or not ignore_na:
old_wt *= old_wt_factor ** deltas[i - 1]
old_wt *= old_wt_factor ** sub_deltas[i - 1]
if is_observation:

# avoid numerical errors on constant series
Expand Down
39 changes: 26 additions & 13 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,7 @@ def __init__(
)
if isna(self.times).any():
raise ValueError("Cannot convert NaT values to integer")
# error: Item "str" of "Union[str, ndarray, FrameOrSeries, None]" has no
# attribute "view"
# error: Item "None" of "Union[str, ndarray, FrameOrSeries, None]" has no
# attribute "view"
_times = np.asarray(
self.times.view(np.int64), dtype=np.float64 # type: ignore[union-attr]
)
_halflife = float(Timedelta(self.halflife).value)
self._deltas = np.diff(_times) / _halflife
self._calculate_deltas()
# Halflife is no longer applicable when calculating COM
# But allow COM to still be calculated if the user passes other decay args
if common.count_not_none(self.com, self.span, self.alpha) > 0:
Expand All @@ -303,6 +295,17 @@ def __init__(
self.alpha,
)

def _calculate_deltas(self) -> None:
# error: Item "str" of "Union[str, ndarray, FrameOrSeries, None]" has no
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you give a doc-string, describe the params and what this is doing

# attribute "view"
# error: Item "None" of "Union[str, ndarray, FrameOrSeries, None]" has no
# attribute "view"
_times = np.asarray(
self.times.view(np.int64), dtype=np.float64 # type: ignore[union-attr]
)
_halflife = float(Timedelta(self.halflife).value)
self._deltas = np.diff(_times) / _halflife
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you return _deltas and assign in the caller (change the return type as well)


def _get_window_indexer(self) -> BaseIndexer:
"""
Return an indexer class that will compute the window start and end bounds
Expand Down Expand Up @@ -585,6 +588,19 @@ class ExponentialMovingWindowGroupby(BaseWindowGroupby, ExponentialMovingWindow)

_attributes = ExponentialMovingWindow._attributes + BaseWindowGroupby._attributes

def __init__(self, obj, *args, _grouper=None, **kwargs):
super().__init__(obj, *args, _grouper=_grouper, **kwargs)

if not obj.empty and self.times is not None:
# sort the times and recalculate the deltas according to the groups
groupby_order = np.concatenate(list(self._grouper.indices.values())).astype(
np.int64
)
# error: Item "str" of "Union[str, ndarray, FrameOrSeries]" has no
# attribute "take"
self.times = self.times.take(groupby_order)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we don't want to change this attribute too much since its still public to the user.

Instead could you define _calculate_deltas(self.times.take(groupby_order), self.halflife)

self._calculate_deltas()

def _get_window_indexer(self) -> GroupbyIndexer:
"""
Return an indexer class that will compute the window start and end bounds
Expand Down Expand Up @@ -628,10 +644,7 @@ def mean(self, engine=None, engine_kwargs=None):
"""
if maybe_use_numba(engine):
groupby_ewma_func = generate_numba_groupby_ewma_func(
engine_kwargs,
self._com,
self.adjust,
self.ignore_na,
engine_kwargs, self._com, self.adjust, self.ignore_na, self._deltas
)
return self._apply(
groupby_ewma_func,
Expand Down
3 changes: 2 additions & 1 deletion pandas/core/window/numba_.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def generate_numba_groupby_ewma_func(
com: float,
adjust: bool,
ignore_na: bool,
deltas: np.ndarray,
):
"""
Generate a numba jitted groupby ewma function specified by values
Expand Down Expand Up @@ -141,7 +142,7 @@ def groupby_ewma(

if is_observation or not ignore_na:

old_wt *= old_wt_factor
old_wt *= old_wt_factor ** deltas[start + j - 1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to use delta[i-1] with values[i]. cur = window[j] = values[start + j] (and j starts at 1).

I could introduce a new variable delta_window = deltas[start:stop-1] as in the cython implementation if you think that would make it clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no its fine, if you'd just add some documentation to this effect for future readers

if is_observation:

# avoid numerical errors on constant series
Expand Down
68 changes: 68 additions & 0 deletions pandas/tests/window/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,3 +926,71 @@ def test_pairwise_methods(self, method, expected_data):

expected = df.groupby("A").apply(lambda x: getattr(x.ewm(com=1.0), method)())
tm.assert_frame_equal(result, expected)

def test_times(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you split this into 3 different tests? Some duplication is okay but nice to have each test case isolated.

# GH 40951
halflife = "23 days"
df = DataFrame(
{
"A": ["a", "b", "c", "a", "b", "c", "a", "b", "c", "a"],
"B": [0, 0, 0, 1, 1, 1, 2, 2, 2, 3],
"C": to_datetime(
[
"2020-01-01",
"2020-01-01",
"2020-01-01",
"2020-01-02",
"2020-01-10",
"2020-01-22",
"2020-01-03",
"2020-01-23",
"2020-01-23",
"2020-01-04",
]
),
}
)
result = df.groupby("A").ewm(halflife=halflife, times="C").mean()
expected = DataFrame(
{
"B": [
0.0,
0.507534,
1.020088,
1.537661,
0.0,
0.567395,
1.221209,
0.0,
0.653141,
1.195003,
]
},
index=MultiIndex.from_tuples(
[
("a", 0),
("a", 3),
("a", 6),
("a", 9),
("b", 1),
("b", 4),
("b", 7),
("c", 2),
("c", 5),
("c", 8),
],
names=["A", None],
),
)
tm.assert_frame_equal(result, expected)

expected = (
df.groupby("A")
.apply(lambda x: x.ewm(halflife=halflife, times="C").mean())
.iloc[[0, 3, 6, 9, 1, 4, 7, 2, 5, 8]]
.reset_index(drop=True)
)
tm.assert_frame_equal(result.reset_index(drop=True), expected)

expected = df.groupby("A").ewm(halflife=halflife, times=df["C"].values).mean()
tm.assert_frame_equal(result, expected)
25 changes: 25 additions & 0 deletions pandas/tests/window/test_numba.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
DataFrame,
Series,
option_context,
to_datetime,
)
import pandas._testing as tm
from pandas.core.util.numba_ import NUMBA_FUNC_CACHE
Expand Down Expand Up @@ -145,6 +146,30 @@ def test_cython_vs_numba(self, nogil, parallel, nopython, ignore_na, adjust):

tm.assert_frame_equal(result, expected)

def test_cython_vs_numba_times(self, nogil, parallel, nopython, ignore_na):
# GH 40951
halflife = "23 days"
times = to_datetime(
[
"2020-01-01",
"2020-01-01",
"2020-01-02",
"2020-01-10",
"2020-02-23",
"2020-01-03",
]
)
df = DataFrame({"A": ["a", "b", "a", "b", "b", "a"], "B": [0, 0, 1, 1, 2, 2]})
gb_ewm = df.groupby("A").ewm(
halflife=halflife, adjust=True, ignore_na=ignore_na, times=times
)

engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython}
result = gb_ewm.mean(engine="numba", engine_kwargs=engine_kwargs)
expected = gb_ewm.mean(engine="cython")

tm.assert_frame_equal(result, expected)


@td.skip_if_no("numba", "0.46.0")
def test_use_global_config():
Expand Down