Skip to content

BUG: support min/max functions for rolling windows with custom BaseIndexer #33180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/source/whatsnew/v1.1.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ Other API changes
- Added :meth:`DataFrame.value_counts` (:issue:`5377`)
- :meth:`Groupby.groups` now returns an abbreviated representation when called on large dataframes (:issue:`1135`)
- ``loc`` lookups with an object-dtype :class:`Index` and an integer key will now raise ``KeyError`` instead of ``TypeError`` when key is missing (:issue:`31905`)
- Using a :func:`pandas.api.indexers.BaseIndexer` with ``min``, ``max``, ``std``, ``var``, ``count``, ``skew``, ``cov``, ``corr`` will now raise a ``NotImplementedError`` (:issue:`32865`)
-
- Using a :func:`pandas.api.indexers.BaseIndexer` with ``std``, ``var``, ``count``, ``skew``, ``cov``, ``corr`` will now raise a ``NotImplementedError`` (:issue:`32865`)
- Using a :func:`pandas.api.indexers.BaseIndexer` with ``min``, ``max`` will now return correct results for any monotonic :func:`pandas.api.indexers.BaseIndexer` descendant (:issue:`32865`)

Backwards incompatible API changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
77 changes: 31 additions & 46 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ cdef _roll_min_max_variable(ndarray[numeric] values,
bint is_max):
cdef:
numeric ai
int64_t i, close_offset, curr_win_size
int64_t i, k, curr_win_size, start
Py_ssize_t nobs = 0, N = len(values)
deque Q[int64_t] # min/max always the front
deque W[int64_t] # track the whole window for nobs compute
Expand All @@ -1068,60 +1068,45 @@ cdef _roll_min_max_variable(ndarray[numeric] values,
# The original impl didn't deal with variable window sizes
# So the code was optimized for that

for i in range(starti[0], endi[0]):
ai = init_mm(values[i], &nobs, is_max)

# Discard previous entries if we find new min or max
if is_max:
while not Q.empty() and ((ai >= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
else:
while not Q.empty() and ((ai <= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
Q.push_back(i)
W.push_back(i)

# if right is open then the first window is empty
close_offset = 0 if endi[0] > starti[0] else 1
# first window's size
curr_win_size = endi[0] - starti[0]
# GH 32865
# Anchor output index to values index to provide custom
# BaseIndexer support
for i in range(N):

for i in range(endi[0], endi[N-1]):
if not Q.empty() and curr_win_size > 0:
output[i-1+close_offset] = calc_mm(
minp, nobs, values[Q.front()])
else:
output[i-1+close_offset] = NaN

ai = init_mm(values[i], &nobs, is_max)

# Discard previous entries if we find new min or max
if is_max:
while not Q.empty() and ((ai >= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
curr_win_size = endi[i] - starti[i]
if i == 0:
start = starti[i]
else:
while not Q.empty() and ((ai <= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
start = endi[i - 1]

# Maintain window/nobs retention
curr_win_size = endi[i + close_offset] - starti[i + close_offset]
while not Q.empty() and Q.front() <= i - curr_win_size:
for k in range(start, endi[i]):
ai = init_mm(values[k], &nobs, is_max)
# Discard previous entries if we find new min or max
if is_max:
while not Q.empty() and ((ai >= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
else:
while not Q.empty() and ((ai <= values[Q.back()]) or
values[Q.back()] != values[Q.back()]):
Q.pop_back()
Q.push_back(k)
W.push_back(k)

# Discard entries outside and left of current window
while not Q.empty() and Q.front() <= starti[i] - 1:
Q.pop_front()
while not W.empty() and W.front() <= i - curr_win_size:
while not W.empty() and W.front() <= starti[i] - 1:
remove_mm(values[W.front()], &nobs)
W.pop_front()

Q.push_back(i)
W.push_back(i)

if not Q.empty() and curr_win_size > 0:
output[N-1] = calc_mm(minp, nobs, values[Q.front()])
else:
output[N-1] = NaN
# Save output based on index in input value array
if not Q.empty() and curr_win_size > 0:
output[i] = calc_mm(minp, nobs, values[Q.front()])
else:
output[i] = NaN

return output

Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def func(arg, window, min_periods=None):

def validate_baseindexer_support(func_name: Optional[str]) -> None:
# GH 32865: These functions work correctly with a BaseIndexer subclass
BASEINDEXER_WHITELIST = {"mean", "sum", "median", "kurt", "quantile"}
BASEINDEXER_WHITELIST = {"min", "max", "mean", "sum", "median", "kurt", "quantile"}
if isinstance(func_name, str) and func_name not in BASEINDEXER_WHITELIST:
raise NotImplementedError(
f"{func_name} is not supported with using a BaseIndexer "
Expand Down
39 changes: 36 additions & 3 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ def get_window_bounds(self, num_values, min_periods, center, closed):
df.rolling(indexer, win_type="boxcar")


@pytest.mark.parametrize(
"func", ["min", "max", "std", "var", "count", "skew", "cov", "corr"]
)
@pytest.mark.parametrize("func", ["std", "var", "count", "skew", "cov", "corr"])
def test_notimplemented_functions(func):
# GH 32865
class CustomIndexer(BaseIndexer):
Expand All @@ -95,3 +93,38 @@ def get_window_bounds(self, num_values, min_periods, center, closed):
indexer = CustomIndexer()
with pytest.raises(NotImplementedError, match=f"{func} is not supported"):
getattr(df.rolling(indexer), func)()


@pytest.mark.parametrize("constructor", [Series, DataFrame])
@pytest.mark.parametrize(
"func,alt_func,expected",
[
("min", np.min, [0.0, 1.0, 2.0, 3.0, 4.0, 6.0, 6.0, 7.0, 8.0, np.nan]),
("max", np.max, [2.0, 3.0, 4.0, 100.0, 100.0, 100.0, 8.0, 9.0, 9.0, np.nan]),
],
)
def test_rolling_forward_window(constructor, func, alt_func, expected):
# GH 32865
class ForwardIndexer(BaseIndexer):
def get_window_bounds(self, num_values, min_periods, center, closed):
start = np.empty(num_values, dtype=np.int64)
end = np.empty(num_values, dtype=np.int64)
for i in range(num_values):
if i + min_periods <= num_values:
start[i] = i
end[i] = min(i + self.window_size, num_values)
else:
start[i] = i
end[i] = i + 1
return start, end

values = np.arange(10)
values[5] = 100.0

indexer = ForwardIndexer(window_size=3)
rolling = constructor(values).rolling(window=indexer, min_periods=2)
result = getattr(rolling, func)()
expected = constructor(expected)
tm.assert_equal(result, expected)
expected2 = constructor(rolling.apply(lambda x: alt_func(x)))
tm.assert_equal(result, expected2)