diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 0ab95dd260a9c..57e3c9dd66afb 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -310,6 +310,7 @@ Performance improvements - The internal index method :meth:`~Index._shallow_copy` now makes the new index and original index share cached attributes, avoiding creating these again, if created on either. This can speed up operations that depend on creating copies of existing indexes (:issue:`36840`) - Performance improvement in :meth:`RollingGroupby.count` (:issue:`35625`) +- Small performance decrease to :meth:`Rolling.min` and :meth:`Rolling.max` for fixed windows (:issue:`36567`) .. --------------------------------------------------------------------------- diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index c6fd569247b90..937f7d8df7728 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -137,8 +137,8 @@ cdef inline void remove_sum(float64_t val, int64_t *nobs, float64_t *sum_x, sum_x[0] = t -def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_sum(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t sum_x = 0, compensation_add = 0, compensation_remove = 0 int64_t s, e @@ -181,36 +181,6 @@ def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, return output -def roll_sum_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev_x, sum_x = 0, compensation_add = 0, compensation_remove = 0 - int64_t range_endpoint - int64_t nobs = 0, i, N = len(values) - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - range_endpoint = int_max(minp, 1) - 1 - - with nogil: - - for i in range(0, range_endpoint): - add_sum(values[i], &nobs, &sum_x, &compensation_add) - output[i] = NaN - - for i in range(range_endpoint, N): - val = values[i] - add_sum(val, &nobs, &sum_x, &compensation_add) - - if i > win - 1: - prev_x = values[i - win] - remove_sum(prev_x, &nobs, &sum_x, &compensation_remove) - - output[i] = calc_sum(minp, nobs, sum_x) - - return output - # ---------------------------------------------------------------------- # Rolling mean @@ -268,36 +238,8 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x, neg_ct[0] = neg_ct[0] - 1 -def roll_mean_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev_x, sum_x = 0, compensation_add = 0, compensation_remove = 0 - Py_ssize_t nobs = 0, i, neg_ct = 0, N = len(values) - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - with nogil: - for i in range(minp - 1): - val = values[i] - add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add) - output[i] = NaN - - for i in range(minp - 1, N): - val = values[i] - add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add) - - if i > win - 1: - prev_x = values[i - win] - remove_mean(prev_x, &nobs, &sum_x, &neg_ct, &compensation_remove) - - output[i] = calc_mean(minp, nobs, neg_ct, sum_x) - - return output - - -def roll_mean_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_mean(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t val, compensation_add = 0, compensation_remove = 0, sum_x = 0 int64_t s, e @@ -358,7 +300,9 @@ cdef inline float64_t calc_var(int64_t minp, int ddof, float64_t nobs, result = 0 else: result = ssqdm_x / (nobs - ddof) - if result < 0: + # Fix for numerical imprecision. + # Can be result < 0 once Kahan Summation is implemented + if result < 1e-15: result = 0 else: result = NaN @@ -403,64 +347,8 @@ cdef inline void remove_var(float64_t val, float64_t *nobs, float64_t *mean_x, ssqdm_x[0] = 0 -def roll_var_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win, int ddof=1): - """ - Numerically stable implementation using Welford's method. - """ - cdef: - float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, - float64_t val, prev, delta, mean_x_old - int64_t s, e - Py_ssize_t i, j, N = len(values) - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - # Check for windows larger than array, addresses #7297 - win = min(win, N) - - with nogil: - - # Over the first window, observations can only be added, never - # removed - for i in range(win): - add_var(values[i], &nobs, &mean_x, &ssqdm_x) - output[i] = calc_var(minp, ddof, nobs, ssqdm_x) - - # a part of Welford's method for the online variance-calculation - # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - - # After the first window, observations can both be added and - # removed - for i in range(win, N): - val = values[i] - prev = values[i - win] - - if notnan(val): - if prev == prev: - - # Adding one observation and removing another one - delta = val - prev - mean_x_old = mean_x - - mean_x += delta / nobs - ssqdm_x += ((nobs - 1) * val - + (nobs + 1) * prev - - 2 * nobs * mean_x_old) * delta / nobs - - else: - add_var(val, &nobs, &mean_x, &ssqdm_x) - elif prev == prev: - remove_var(prev, &nobs, &mean_x, &ssqdm_x) - - output[i] = calc_var(minp, ddof, nobs, ssqdm_x) - - return output - - -def roll_var_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int ddof=1): +def roll_var(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int ddof=1): """ Numerically stable implementation using Welford's method. """ @@ -578,38 +466,8 @@ cdef inline void remove_skew(float64_t val, int64_t *nobs, xxx[0] = xxx[0] - val * val * val -def roll_skew_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev - float64_t x = 0, xx = 0, xxx = 0 - int64_t nobs = 0, i, j, N = len(values) - int64_t s, e - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - with nogil: - for i in range(minp - 1): - val = values[i] - add_skew(val, &nobs, &x, &xx, &xxx) - output[i] = NaN - - for i in range(minp - 1, N): - val = values[i] - add_skew(val, &nobs, &x, &xx, &xxx) - - if i > win - 1: - prev = values[i - win] - remove_skew(prev, &nobs, &x, &xx, &xxx) - - output[i] = calc_skew(minp, nobs, x, xx, xxx) - - return output - - -def roll_skew_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0 @@ -733,37 +591,8 @@ cdef inline void remove_kurt(float64_t val, int64_t *nobs, xxxx[0] = xxxx[0] - val * val * val * val -def roll_kurt_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev - float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 - int64_t nobs = 0, i, j, N = len(values) - int64_t s, e - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - with nogil: - - for i in range(minp - 1): - add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) - output[i] = NaN - - for i in range(minp - 1, N): - add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) - - if i > win - 1: - prev = values[i - win] - remove_kurt(prev, &nobs, &x, &xx, &xxx, &xxxx) - - output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx) - - return output - - -def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 @@ -943,28 +772,8 @@ cdef inline numeric calc_mm(int64_t minp, Py_ssize_t nobs, return result -def roll_max_fixed(float64_t[:] values, int64_t[:] start, - int64_t[:] end, int64_t minp, int64_t win): - """ - Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. - - Parameters - ---------- - values : np.ndarray[np.float64] - window : int, size of rolling window - minp : if number of observations in window - is below this, output a NaN - index : ndarray, optional - index for window computation - closed : 'right', 'left', 'both', 'neither' - make the interval closed on the right, left, - both or neither endpoints - """ - return _roll_min_max_fixed(values, minp, win, is_max=1) - - -def roll_max_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_max(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): """ Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -980,11 +789,11 @@ def roll_max_variable(ndarray[float64_t] values, ndarray[int64_t] start, make the interval closed on the right, left, both or neither endpoints """ - return _roll_min_max_variable(values, start, end, minp, is_max=1) + return _roll_min_max(values, start, end, minp, is_max=1) -def roll_min_fixed(float64_t[:] values, int64_t[:] start, - int64_t[:] end, int64_t minp, int64_t win): +def roll_min(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): """ Moving min of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -997,31 +806,14 @@ def roll_min_fixed(float64_t[:] values, int64_t[:] start, index : ndarray, optional index for window computation """ - return _roll_min_max_fixed(values, minp, win, is_max=0) - + return _roll_min_max(values, start, end, minp, is_max=0) -def roll_min_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): - """ - Moving min of 1d array of any numeric type along axis=0 ignoring NaNs. - Parameters - ---------- - values : np.ndarray[np.float64] - window : int, size of rolling window - minp : if number of observations in window - is below this, output a NaN - index : ndarray, optional - index for window computation - """ - return _roll_min_max_variable(values, start, end, minp, is_max=0) - - -cdef _roll_min_max_variable(ndarray[numeric] values, - ndarray[int64_t] starti, - ndarray[int64_t] endi, - int64_t minp, - bint is_max): +cdef _roll_min_max(ndarray[numeric] values, + ndarray[int64_t] starti, + ndarray[int64_t] endi, + int64_t minp, + bint is_max): cdef: numeric ai int64_t i, k, curr_win_size, start @@ -1084,93 +876,6 @@ cdef _roll_min_max_variable(ndarray[numeric] values, return output -cdef _roll_min_max_fixed(numeric[:] values, - int64_t minp, - int64_t win, - bint is_max): - cdef: - numeric ai - bint should_replace - int64_t i, removed, window_i, - Py_ssize_t nobs = 0, N = len(values) - int64_t* death - numeric* ring - numeric* minvalue - numeric* end - numeric* last - ndarray[float64_t, ndim=1] output - - output = np.empty(N, dtype=float) - # setup the rings of death! - ring = malloc(win * sizeof(numeric)) - death = malloc(win * sizeof(int64_t)) - - end = ring + win - last = ring - minvalue = ring - ai = values[0] - minvalue[0] = init_mm(values[0], &nobs, is_max) - death[0] = win - nobs = 0 - - with nogil: - - for i in range(N): - ai = init_mm(values[i], &nobs, is_max) - - if i >= win: - remove_mm(values[i - win], &nobs) - - if death[minvalue - ring] == i: - minvalue = minvalue + 1 - if minvalue >= end: - minvalue = ring - - if is_max: - should_replace = ai >= minvalue[0] - else: - should_replace = ai <= minvalue[0] - if should_replace: - - minvalue[0] = ai - death[minvalue - ring] = i + win - last = minvalue - - else: - - if is_max: - should_replace = last[0] <= ai - else: - should_replace = last[0] >= ai - while should_replace: - if last == ring: - last = end - last -= 1 - if is_max: - should_replace = last[0] <= ai - else: - should_replace = last[0] >= ai - - last += 1 - if last == end: - last = ring - last[0] = ai - death[last - ring] = i + win - - output[i] = calc_mm(minp, nobs, minvalue[0]) - - for i in range(minp - 1): - if numeric in cython.floating: - output[i] = NaN - else: - output[i] = 0 - - free(ring) - free(death) - - return output - - cdef enum InterpolationType: LINEAR, LOWER, @@ -1300,19 +1005,16 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, return output -def roll_generic_fixed(object obj, - ndarray[int64_t] start, ndarray[int64_t] end, - int64_t minp, int64_t win, - int offset, object func, bint raw, - object args, object kwargs): +def roll_apply(object obj, + ndarray[int64_t] start, ndarray[int64_t] end, + int64_t minp, + object func, bint raw, + tuple args, dict kwargs): cdef: - ndarray[float64_t] output, counts, bufarr + ndarray[float64_t] output, counts ndarray[float64_t, cast=True] arr - float64_t *buf - float64_t *oldbuf - int64_t nobs = 0, i, j, s, e, N = len(start) + Py_ssize_t i, s, e, N = len(start), n = len(obj) - n = len(obj) if n == 0: return obj @@ -1323,83 +1025,12 @@ def roll_generic_fixed(object obj, if not arr.flags.c_contiguous: arr = arr.copy('C') - counts = roll_sum_fixed(np.concatenate([np.isfinite(arr).astype(float), - np.array([0.] * offset)]), - start, end, minp, win)[offset:] + counts = roll_sum(np.isfinite(arr).astype(float), start, end, minp) output = np.empty(N, dtype=float) - if not raw: - # series - for i in range(N): - if counts[i] >= minp: - sl = slice(int_max(i + offset - win + 1, 0), - int_min(i + offset + 1, N)) - output[i] = func(obj.iloc[sl], *args, **kwargs) - else: - output[i] = NaN - - else: - - # truncated windows at the beginning, through first full-length window - for i in range((int_min(win, N) - offset)): - if counts[i] >= minp: - output[i] = func(arr[0: (i + offset + 1)], *args, **kwargs) - else: - output[i] = NaN - - # remaining full-length windows - for j, i in enumerate(range((win - offset), (N - offset)), 1): - if counts[i] >= minp: - output[i] = func(arr[j:j + win], *args, **kwargs) - else: - output[i] = NaN - - # truncated windows at the end - for i in range(int_max(N - offset, 0), N): - if counts[i] >= minp: - output[i] = func(arr[int_max(i + offset - win + 1, 0): N], - *args, - **kwargs) - else: - output[i] = NaN - - return output - - -def roll_generic_variable(object obj, - ndarray[int64_t] start, ndarray[int64_t] end, - int64_t minp, - int offset, object func, bint raw, - object args, object kwargs): - cdef: - ndarray[float64_t] output, counts, bufarr - ndarray[float64_t, cast=True] arr - float64_t *buf - float64_t *oldbuf - int64_t nobs = 0, i, j, s, e, N = len(start) - - n = len(obj) - if n == 0: - return obj - - arr = np.asarray(obj) - - # ndarray input - if raw: - if not arr.flags.c_contiguous: - arr = arr.copy('C') - - counts = roll_sum_variable(np.concatenate([np.isfinite(arr).astype(float), - np.array([0.] * offset)]), - start, end, minp)[offset:] - - output = np.empty(N, dtype=float) - - if offset != 0: - raise ValueError("unable to roll_generic with a non-zero offset") + for i in range(N): - for i in range(0, N): s = start[i] e = end[i] diff --git a/pandas/core/window/common.py b/pandas/core/window/common.py index 2e7e7cd47c336..aa71c44f75ead 100644 --- a/pandas/core/window/common.py +++ b/pandas/core/window/common.py @@ -64,7 +64,6 @@ def __init__(self, obj, *args, **kwargs): def _apply( self, func: Callable, - center: bool, require_min_periods: int = 0, floor: int = 1, is_weighted: bool = False, diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index a21521f4ce8bb..023f598f606f3 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -89,6 +89,20 @@ def get_window_bounds( end_s = np.arange(self.window_size, dtype="int64") + 1 end_e = start_e + self.window_size end = np.concatenate([end_s, end_e])[:num_values] + + if center and self.window_size > 2: + offset = min((self.window_size - 1) // 2, num_values - 1) + start_s_buffer = np.roll(start, -offset)[: num_values - offset] + end_s_buffer = np.roll(end, -offset)[: num_values - offset] + + start_e_buffer = np.arange( + start[-1] + 1, start[-1] + 1 + offset, dtype="int64" + ) + end_e_buffer = np.array([end[-1]] * offset, dtype="int64") + + start = np.concatenate([start_s_buffer, start_e_buffer]) + end = np.concatenate([end_s_buffer, end_e_buffer]) + return start, end @@ -327,10 +341,4 @@ def get_window_bounds( end_arrays.append(window_indicies.take(ensure_platform_int(end))) start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) - # GH 35552: Need to adjust start and end based on the nans appended to values - # when center=True - if num_values > len(start): - offset = num_values - len(start) - start = np.concatenate([start, np.array([end[-1]] * offset)]) - end = np.concatenate([end, np.array([end[-1]] * offset)]) return start, end diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 466b320f1771f..9e829ef774d42 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -74,22 +74,21 @@ from pandas.core.internals import Block # noqa:F401 -def calculate_center_offset(window) -> int: +def calculate_center_offset(window: np.ndarray) -> int: """ - Calculate an offset necessary to have the window label to be centered. + Calculate an offset necessary to have the window label to be centered + for weighted windows. Parameters ---------- - window: ndarray or int - window weights or window + window: ndarray + window weights Returns ------- int """ - if not is_integer(window): - window = len(window) - return int((window - 1) / 2.0) + return (len(window) - 1) // 2 def calculate_min_periods( @@ -417,9 +416,9 @@ def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"): # insert at the end result[name] = extra_col - def _center_window(self, result: np.ndarray, window) -> np.ndarray: + def _center_window(self, result: np.ndarray, window: np.ndarray) -> np.ndarray: """ - Center the result in the window. + Center the result in the window for weighted rolling aggregations. """ if self.axis > result.ndim - 1: raise ValueError("Requested axis is larger then no. of argument dimensions") @@ -451,16 +450,6 @@ def _get_roll_func(self, func_name: str) -> Callable: ) return window_func - def _get_cython_func_type(self, func: str) -> Callable: - """ - Return a variable or fixed cython function type. - - Variable algorithms do not use window while fixed do. - """ - if self.is_freq_type or isinstance(self.window, BaseIndexer): - return self._get_roll_func(f"{func}_variable") - return partial(self._get_roll_func(f"{func}_fixed"), win=self._get_window()) - def _get_window_indexer(self, window: int) -> BaseIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -526,7 +515,6 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: def _apply( self, func: Callable, - center: bool, require_min_periods: int = 0, floor: int = 1, is_weighted: bool = False, @@ -542,7 +530,6 @@ def _apply( Parameters ---------- func : callable function to apply - center : bool require_min_periods : int floor : int is_weighted : bool @@ -568,13 +555,9 @@ def homogeneous_func(values: np.ndarray): if values.size == 0: return values.copy() - offset = calculate_center_offset(window) if center else 0 - additional_nans = np.array([np.nan] * offset) - if not is_weighted: def calc(x): - x = np.concatenate((x, additional_nans)) if not isinstance(self.window, BaseIndexer): min_periods = calculate_min_periods( window, self.min_periods, len(x), require_min_periods, floor @@ -598,6 +581,8 @@ def calc(x): else: def calc(x): + offset = calculate_center_offset(window) if self.center else 0 + additional_nans = np.array([np.nan] * offset) x = np.concatenate((x, additional_nans)) return func(x, window, self.min_periods) @@ -611,7 +596,7 @@ def calc(x): if use_numba_cache: NUMBA_FUNC_CACHE[(kwargs["original_func"], "rolling_apply")] = func - if center: + if self.center and is_weighted: result = self._center_window(result, window) return result @@ -1200,9 +1185,7 @@ def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) window_func = self._get_roll_func("roll_weighted_sum") window_func = get_weighted_roll_func(window_func) - return self._apply( - window_func, center=self.center, is_weighted=True, name="sum", **kwargs - ) + return self._apply(window_func, is_weighted=True, name="sum", **kwargs) @Substitution(name="window") @Appender(_shared_docs["mean"]) @@ -1210,9 +1193,7 @@ def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) window_func = self._get_roll_func("roll_weighted_mean") window_func = get_weighted_roll_func(window_func) - return self._apply( - window_func, center=self.center, is_weighted=True, name="mean", **kwargs - ) + return self._apply(window_func, is_weighted=True, name="mean", **kwargs) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["var"]) @@ -1221,9 +1202,7 @@ def var(self, ddof=1, *args, **kwargs): window_func = partial(self._get_roll_func("roll_weighted_var"), ddof=ddof) window_func = get_weighted_roll_func(window_func) kwargs.pop("name", None) - return self._apply( - window_func, center=self.center, is_weighted=True, name="var", **kwargs - ) + return self._apply(window_func, is_weighted=True, name="var", **kwargs) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["std"]) @@ -1275,8 +1254,8 @@ class RollingAndExpandingMixin(BaseWindow): ) def count(self): - window_func = self._get_cython_func_type("roll_sum") - return self._apply(window_func, center=self.center, name="count") + window_func = self._get_roll_func("roll_sum") + return self._apply(window_func, name="count") _shared_docs["apply"] = dedent( r""" @@ -1362,25 +1341,16 @@ def apply( if raw is False: raise ValueError("raw must be `True` when using the numba engine") apply_func = generate_numba_apply_func(args, kwargs, func, engine_kwargs) - center = self.center elif engine in ("cython", None): if engine_kwargs is not None: raise ValueError("cython engine does not accept engine_kwargs") - # Cython apply functions handle center, so don't need to use - # _apply's center handling - window = self._get_window() - offset = calculate_center_offset(window) if self.center else 0 - apply_func = self._generate_cython_apply_func( - args, kwargs, raw, offset, func - ) - center = False + apply_func = self._generate_cython_apply_func(args, kwargs, raw, func) else: raise ValueError("engine must be either 'numba' or 'cython'") # name=func & raw=raw for WindowGroupByMixin._apply return self._apply( apply_func, - center=center, floor=0, name=func, use_numba_cache=maybe_use_numba(engine), @@ -1390,15 +1360,14 @@ def apply( kwargs=kwargs, ) - def _generate_cython_apply_func(self, args, kwargs, raw, offset, func): + def _generate_cython_apply_func(self, args, kwargs, raw, func): from pandas import Series window_func = partial( - self._get_cython_func_type("roll_generic"), + self._get_roll_func("roll_apply"), args=args, kwargs=kwargs, raw=raw, - offset=offset, func=func, ) @@ -1411,11 +1380,9 @@ def apply_func(values, begin, end, min_periods, raw=raw): def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) - window_func = self._get_cython_func_type("roll_sum") + window_func = self._get_roll_func("roll_sum") kwargs.pop("floor", None) - return self._apply( - window_func, center=self.center, floor=0, name="sum", **kwargs - ) + return self._apply(window_func, floor=0, name="sum", **kwargs) _shared_docs["max"] = dedent( """ @@ -1430,8 +1397,8 @@ def sum(self, *args, **kwargs): def max(self, *args, **kwargs): nv.validate_window_func("max", args, kwargs) - window_func = self._get_cython_func_type("roll_max") - return self._apply(window_func, center=self.center, name="max", **kwargs) + window_func = self._get_roll_func("roll_max") + return self._apply(window_func, name="max", **kwargs) _shared_docs["min"] = dedent( """ @@ -1472,13 +1439,13 @@ def max(self, *args, **kwargs): def min(self, *args, **kwargs): nv.validate_window_func("min", args, kwargs) - window_func = self._get_cython_func_type("roll_min") - return self._apply(window_func, center=self.center, name="min", **kwargs) + window_func = self._get_roll_func("roll_min") + return self._apply(window_func, name="min", **kwargs) def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) - window_func = self._get_cython_func_type("roll_mean") - return self._apply(window_func, center=self.center, name="mean", **kwargs) + window_func = self._get_roll_func("roll_mean") + return self._apply(window_func, name="mean", **kwargs) _shared_docs["median"] = dedent( """ @@ -1521,12 +1488,12 @@ def median(self, **kwargs): window_func = self._get_roll_func("roll_median_c") # GH 32865. Move max window size calculation to # the median function implementation - return self._apply(window_func, center=self.center, name="median", **kwargs) + return self._apply(window_func, name="median", **kwargs) def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) kwargs.pop("require_min_periods", None) - window_func = self._get_cython_func_type("roll_var") + window_func = self._get_roll_func("roll_var") def zsqrt_func(values, begin, end, min_periods): return zsqrt(window_func(values, begin, end, min_periods, ddof=ddof)) @@ -1534,7 +1501,6 @@ def zsqrt_func(values, begin, end, min_periods): # ddof passed again for compat with groupby.rolling return self._apply( zsqrt_func, - center=self.center, require_min_periods=1, name="std", ddof=ddof, @@ -1544,11 +1510,10 @@ def zsqrt_func(values, begin, end, min_periods): def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) kwargs.pop("require_min_periods", None) - window_func = partial(self._get_cython_func_type("roll_var"), ddof=ddof) + window_func = partial(self._get_roll_func("roll_var"), ddof=ddof) # ddof passed again for compat with groupby.rolling return self._apply( window_func, - center=self.center, require_min_periods=1, name="var", ddof=ddof, @@ -1567,11 +1532,10 @@ def var(self, ddof=1, *args, **kwargs): """ def skew(self, **kwargs): - window_func = self._get_cython_func_type("roll_skew") + window_func = self._get_roll_func("roll_skew") kwargs.pop("require_min_periods", None) return self._apply( window_func, - center=self.center, require_min_periods=3, name="skew", **kwargs, @@ -1610,11 +1574,10 @@ def skew(self, **kwargs): ) def kurt(self, **kwargs): - window_func = self._get_cython_func_type("roll_kurt") + window_func = self._get_roll_func("roll_kurt") kwargs.pop("require_min_periods", None) return self._apply( window_func, - center=self.center, require_min_periods=4, name="kurt", **kwargs, @@ -1676,9 +1639,9 @@ def kurt(self, **kwargs): def quantile(self, quantile, interpolation="linear", **kwargs): if quantile == 1.0: - window_func = self._get_cython_func_type("roll_max") + window_func = self._get_roll_func("roll_max") elif quantile == 0.0: - window_func = self._get_cython_func_type("roll_min") + window_func = self._get_roll_func("roll_min") else: window_func = partial( self._get_roll_func("roll_quantile"), @@ -1690,7 +1653,7 @@ def quantile(self, quantile, interpolation="linear", **kwargs): # Pass through for groupby.rolling kwargs["quantile"] = quantile kwargs["interpolation"] = interpolation - return self._apply(window_func, center=self.center, name="quantile", **kwargs) + return self._apply(window_func, name="quantile", **kwargs) _shared_docs[ "cov" @@ -2179,7 +2142,6 @@ class RollingGroupby(WindowGroupByMixin, Rolling): def _apply( self, func: Callable, - center: bool, require_min_periods: int = 0, floor: int = 1, is_weighted: bool = False, @@ -2190,7 +2152,6 @@ def _apply( result = Rolling._apply( self, func, - center, require_min_periods, floor, is_weighted, @@ -2239,16 +2200,6 @@ def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: obj = obj.take(groupby_order) return super()._create_data(obj) - def _get_cython_func_type(self, func: str) -> Callable: - """ - Return the cython function type. - - RollingGroupby needs to always use "variable" algorithms since processing - the data in group order may not be monotonic with the data which - "fixed" algorithms assume - """ - return self._get_roll_func(f"{func}_variable") - def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: """ Return an indexer class that will compute the window start and end bounds diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index eaee276c7a388..73831d518032d 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -774,7 +774,7 @@ def test_rolling_numerical_too_large_numbers(): ds[2] = -9e33 result = ds.rolling(5).mean() expected = pd.Series( - [np.nan, np.nan, np.nan, np.nan, -1.8e33, -1.8e33, -1.8e33, 0.0, 6.0, 7.0], + [np.nan, np.nan, np.nan, np.nan, -1.8e33, -1.8e33, -1.8e33, 5.0, 6.0, 7.0], index=dates, ) tm.assert_series_equal(result, expected)