Skip to content

COMPAT: Ensure rolling indexers return intp #35228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 38 additions & 34 deletions pandas/_libs/window/aggregations.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ from pandas._libs.algos import is_monotonic

from pandas._libs.util cimport numeric

ctypedef fused indexer:
int64_t
long

cdef extern from "../src/skiplist.h":
ctypedef struct node_t:
node_t **next
Expand Down Expand Up @@ -93,8 +97,8 @@ cdef bint is_monotonic_start_end_bounds(

def roll_count(
ndarray[float64_t] values,
ndarray[int64_t] start,
ndarray[int64_t] end,
ndarray[indexer] start,
ndarray[indexer] end,
int64_t minp,
):
cdef:
Expand Down Expand Up @@ -175,8 +179,8 @@ cdef inline void remove_sum(float64_t val, int64_t *nobs, float64_t *sum_x) nogi
sum_x[0] = sum_x[0] - val


def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp):
def roll_sum_variable(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp):
cdef:
float64_t sum_x = 0
int64_t s, e
Expand Down Expand Up @@ -219,8 +223,8 @@ def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start,
return output


def roll_sum_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int64_t win):
def roll_sum_fixed(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int64_t win):
cdef:
float64_t val, prev_x, sum_x = 0
int64_t range_endpoint
Expand Down Expand Up @@ -296,8 +300,8 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x,
neg_ct[0] = neg_ct[0] - 1


def roll_mean_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int64_t win):
def roll_mean_fixed(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int64_t win):
cdef:
float64_t val, prev_x, sum_x = 0
Py_ssize_t nobs = 0, i, neg_ct = 0, N = len(values)
Expand All @@ -324,8 +328,8 @@ def roll_mean_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
return output


def roll_mean_variable(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp):
def roll_mean_variable(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp):
cdef:
float64_t val, sum_x = 0
int64_t s, e
Expand Down Expand Up @@ -431,8 +435,8 @@ cdef inline void remove_var(float64_t val, float64_t *nobs, float64_t *mean_x,
ssqdm_x[0] = 0


def roll_var_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int64_t win, int ddof=1):
def roll_var_fixed(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int64_t win, int ddof=1):
"""
Numerically stable implementation using Welford's method.
"""
Expand Down Expand Up @@ -487,8 +491,8 @@ def roll_var_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
return output


def roll_var_variable(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int ddof=1):
def roll_var_variable(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int ddof=1):
"""
Numerically stable implementation using Welford's method.
"""
Expand Down Expand Up @@ -606,8 +610,8 @@ cdef inline void remove_skew(float64_t val, int64_t *nobs,
xxx[0] = xxx[0] - val * val * val


def roll_skew_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int64_t win):
def roll_skew_fixed(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int64_t win):
cdef:
float64_t val, prev
float64_t x = 0, xx = 0, xxx = 0
Expand Down Expand Up @@ -636,8 +640,8 @@ def roll_skew_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
return output


def roll_skew_variable(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp):
def roll_skew_variable(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp):
cdef:
float64_t val, prev
float64_t x = 0, xx = 0, xxx = 0
Expand Down Expand Up @@ -761,8 +765,8 @@ cdef inline void remove_kurt(float64_t val, int64_t *nobs,
xxxx[0] = xxxx[0] - val * val * val * val


def roll_kurt_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int64_t win):
def roll_kurt_fixed(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int64_t win):
cdef:
float64_t val, prev
float64_t x = 0, xx = 0, xxx = 0, xxxx = 0
Expand Down Expand Up @@ -790,8 +794,8 @@ def roll_kurt_fixed(ndarray[float64_t] values, ndarray[int64_t] start,
return output


def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp):
def roll_kurt_variable(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp):
cdef:
float64_t val, prev
float64_t x = 0, xx = 0, xxx = 0, xxxx = 0
Expand Down Expand Up @@ -842,8 +846,8 @@ def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start,
# Rolling median, min, max


def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int64_t win=0):
def roll_median_c(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int64_t win=0):
# GH 32865. win argument kept for compatibility
cdef:
float64_t val, res, prev
Expand Down Expand Up @@ -991,8 +995,8 @@ def roll_max_fixed(float64_t[:] values, int64_t[:] start,
return _roll_min_max_fixed(values, minp, win, is_max=1)


def roll_max_variable(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp):
def roll_max_variable(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp):
"""
Moving max of 1d array of any numeric type along axis=0 ignoring NaNs.

Expand Down Expand Up @@ -1028,8 +1032,8 @@ def roll_min_fixed(float64_t[:] values, int64_t[:] start,
return _roll_min_max_fixed(values, minp, win, is_max=0)


def roll_min_variable(ndarray[float64_t] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp):
def roll_min_variable(ndarray[float64_t] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp):
"""
Moving min of 1d array of any numeric type along axis=0 ignoring NaNs.

Expand All @@ -1046,8 +1050,8 @@ def roll_min_variable(ndarray[float64_t] values, ndarray[int64_t] start,


cdef _roll_min_max_variable(ndarray[numeric] values,
ndarray[int64_t] starti,
ndarray[int64_t] endi,
ndarray[indexer] starti,
ndarray[indexer] endi,
int64_t minp,
bint is_max):
cdef:
Expand Down Expand Up @@ -1216,8 +1220,8 @@ interpolation_types = {
}


def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,
ndarray[int64_t] end, int64_t minp, int64_t win,
def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[indexer] start,
ndarray[indexer] end, int64_t minp, int64_t win,
float64_t quantile, str interpolation):
"""
O(N log(window)) implementation using skip list
Expand Down Expand Up @@ -1329,7 +1333,7 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start,


def roll_generic_fixed(object obj,
ndarray[int64_t] start, ndarray[int64_t] end,
ndarray[indexer] start, ndarray[indexer] end,
int64_t minp, int64_t win,
int offset, object func, bint raw,
object args, object kwargs):
Expand Down Expand Up @@ -1396,7 +1400,7 @@ def roll_generic_fixed(object obj,


def roll_generic_variable(object obj,
ndarray[int64_t] start, ndarray[int64_t] end,
ndarray[indexer] start, ndarray[indexer] end,
int64_t minp,
int offset, object func, bint raw,
object args, object kwargs):
Expand Down
6 changes: 3 additions & 3 deletions pandas/_libs/window/indexers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def calculate_variable_window_bounds(

Returns
-------
(ndarray[int64], ndarray[int64])
(ndarray[intp], ndarray[intp])
"""
cdef:
bint left_closed = False
Expand All @@ -62,9 +62,9 @@ def calculate_variable_window_bounds(
if index[num_values - 1] < index[0]:
index_growth_sign = -1

start = np.empty(num_values, dtype='int64')
start = np.empty(num_values, dtype=np.intp)
start.fill(-1)
end = np.empty(num_values, dtype='int64')
end = np.empty(num_values, dtype=np.intp)
end.fill(-1)

start[0] = 0
Expand Down
26 changes: 11 additions & 15 deletions pandas/core/window/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

Returns
-------
A tuple of ndarray[int64]s, indicating the boundaries of each
A tuple of ndarray[intp]s, indicating the boundaries of each
window
"""

Expand Down Expand Up @@ -76,15 +76,15 @@ def get_window_bounds(
closed: Optional[str] = None,
) -> Tuple[np.ndarray, np.ndarray]:

start_s = np.zeros(self.window_size, dtype="int64")
start_s = np.zeros(self.window_size, dtype=np.intp)
start_e = (
np.arange(self.window_size, num_values, dtype="int64")
np.arange(self.window_size, num_values, dtype=np.intp)
- self.window_size
+ 1
)
start = np.concatenate([start_s, start_e])[:num_values]

end_s = np.arange(self.window_size, dtype="int64") + 1
end_s = np.arange(self.window_size, dtype=np.intp) + 1
end_e = start_e + self.window_size
end = np.concatenate([end_s, end_e])[:num_values]
return start, end
Expand Down Expand Up @@ -143,9 +143,9 @@ def get_window_bounds(
else:
index_growth_sign = 1

start = np.empty(num_values, dtype="int64")
start = np.empty(num_values, dtype=np.intp)
start.fill(-1)
end = np.empty(num_values, dtype="int64")
end = np.empty(num_values, dtype=np.intp)
end.fill(-1)

start[0] = 0
Expand Down Expand Up @@ -202,8 +202,8 @@ def get_window_bounds(
) -> Tuple[np.ndarray, np.ndarray]:

return (
np.zeros(num_values, dtype=np.int64),
np.arange(1, num_values + 1, dtype=np.int64),
np.zeros(num_values, dtype=np.intp),
np.arange(1, num_values + 1, dtype=np.intp),
)


Expand Down Expand Up @@ -249,9 +249,9 @@ def get_window_bounds(
"Forward-looking windows don't support setting the closed argument"
)

start = np.arange(num_values, dtype="int64")
start = np.arange(num_values, dtype=np.intp)
end_s = start[: -self.window_size] + self.window_size
end_e = np.full(self.window_size, num_values, dtype="int64")
end_e = np.full(self.window_size, num_values, dtype=np.intp)
end = np.concatenate([end_s, end_e])

return start, end
Expand Down Expand Up @@ -303,18 +303,14 @@ def get_window_bounds(
start, end = indexer.get_window_bounds(
len(indicies), min_periods, center, closed
)
start = start.astype(np.int64)
end = end.astype(np.int64)
# Cannot use groupby_indicies as they might not be monotonic with the object
# we're rolling over
window_indicies = np.arange(
window_indicies_start, window_indicies_start + len(indicies),
)
window_indicies_start += len(indicies)
# Extend as we'll be slicing window like [start, end)
window_indicies = np.append(
window_indicies, [window_indicies[-1] + 1]
).astype(np.int64)
window_indicies = np.append(window_indicies, [window_indicies[-1] + 1])
start_arrays.append(window_indicies.take(start))
end_arrays.append(window_indicies.take(end))
start = np.concatenate(start_arrays)
Expand Down
4 changes: 1 addition & 3 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2231,9 +2231,7 @@ def _create_blocks(self, obj: FrameOrSeries):
"""
# Ensure the object we're rolling over is monotonically sorted relative
# to the groups
groupby_order = np.concatenate(
list(self._groupby.grouper.indices.values())
).astype(np.int64)
groupby_order = np.concatenate(list(self._groupby.grouper.indices.values()))
obj = obj.take(groupby_order)
return super()._create_blocks(obj)

Expand Down