From 30d609e2d9be1e7ac14297224486b5b76a9b74fb Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 20 Sep 2020 10:32:23 -0700 Subject: [PATCH 01/13] Fix rolling test result, removed fixed algorithms, some rolling apply with center tests still failing --- pandas/_libs/window/aggregations.pyx | 432 ++------------------------- pandas/core/window/rolling.py | 45 +-- pandas/tests/window/test_rolling.py | 2 +- 3 files changed, 46 insertions(+), 433 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 5f60b884c6ada..427cfff72eccd 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -193,8 +193,8 @@ cdef inline void remove_sum(float64_t val, int64_t *nobs, float64_t *sum_x, sum_x[0] = t -def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_sum(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t sum_x = 0, compensation_add = 0, compensation_remove = 0 int64_t s, e @@ -237,36 +237,6 @@ def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, return output -def roll_sum_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev_x, sum_x = 0, compensation_add = 0, compensation_remove = 0 - int64_t range_endpoint - int64_t nobs = 0, i, N = len(values) - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - range_endpoint = int_max(minp, 1) - 1 - - with nogil: - - for i in range(0, range_endpoint): - add_sum(values[i], &nobs, &sum_x, &compensation_add) - output[i] = NaN - - for i in range(range_endpoint, N): - val = values[i] - add_sum(val, &nobs, &sum_x, &compensation_add) - - if i > win - 1: - prev_x = values[i - win] - remove_sum(prev_x, &nobs, &sum_x, &compensation_remove) - - output[i] = calc_sum(minp, nobs, sum_x) - - return output - # ---------------------------------------------------------------------- # Rolling mean @@ -324,36 +294,8 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x, neg_ct[0] = neg_ct[0] - 1 -def roll_mean_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev_x, sum_x = 0, compensation_add = 0, compensation_remove = 0 - Py_ssize_t nobs = 0, i, neg_ct = 0, N = len(values) - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - with nogil: - for i in range(minp - 1): - val = values[i] - add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add) - output[i] = NaN - - for i in range(minp - 1, N): - val = values[i] - add_mean(val, &nobs, &sum_x, &neg_ct, &compensation_add) - - if i > win - 1: - prev_x = values[i - win] - remove_mean(prev_x, &nobs, &sum_x, &neg_ct, &compensation_remove) - - output[i] = calc_mean(minp, nobs, neg_ct, sum_x) - - return output - - -def roll_mean_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_mean(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t val, compensation_add = 0, compensation_remove = 0, sum_x = 0 int64_t s, e @@ -459,64 +401,8 @@ cdef inline void remove_var(float64_t val, float64_t *nobs, float64_t *mean_x, ssqdm_x[0] = 0 -def roll_var_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win, int ddof=1): - """ - Numerically stable implementation using Welford's method. - """ - cdef: - float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, - float64_t val, prev, delta, mean_x_old - int64_t s, e - Py_ssize_t i, j, N = len(values) - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - # Check for windows larger than array, addresses #7297 - win = min(win, N) - - with nogil: - - # Over the first window, observations can only be added, never - # removed - for i in range(win): - add_var(values[i], &nobs, &mean_x, &ssqdm_x) - output[i] = calc_var(minp, ddof, nobs, ssqdm_x) - - # a part of Welford's method for the online variance-calculation - # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - - # After the first window, observations can both be added and - # removed - for i in range(win, N): - val = values[i] - prev = values[i - win] - - if notnan(val): - if prev == prev: - - # Adding one observation and removing another one - delta = val - prev - mean_x_old = mean_x - - mean_x += delta / nobs - ssqdm_x += ((nobs - 1) * val - + (nobs + 1) * prev - - 2 * nobs * mean_x_old) * delta / nobs - - else: - add_var(val, &nobs, &mean_x, &ssqdm_x) - elif prev == prev: - remove_var(prev, &nobs, &mean_x, &ssqdm_x) - - output[i] = calc_var(minp, ddof, nobs, ssqdm_x) - - return output - - -def roll_var_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int ddof=1): +def roll_var(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int ddof=1): """ Numerically stable implementation using Welford's method. """ @@ -634,38 +520,8 @@ cdef inline void remove_skew(float64_t val, int64_t *nobs, xxx[0] = xxx[0] - val * val * val -def roll_skew_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev - float64_t x = 0, xx = 0, xxx = 0 - int64_t nobs = 0, i, j, N = len(values) - int64_t s, e - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - with nogil: - for i in range(minp - 1): - val = values[i] - add_skew(val, &nobs, &x, &xx, &xxx) - output[i] = NaN - - for i in range(minp - 1, N): - val = values[i] - add_skew(val, &nobs, &x, &xx, &xxx) - - if i > win - 1: - prev = values[i - win] - remove_skew(prev, &nobs, &x, &xx, &xxx) - - output[i] = calc_skew(minp, nobs, x, xx, xxx) - - return output - - -def roll_skew_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_skew(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0 @@ -789,37 +645,8 @@ cdef inline void remove_kurt(float64_t val, int64_t *nobs, xxxx[0] = xxxx[0] - val * val * val * val -def roll_kurt_fixed(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win): - cdef: - float64_t val, prev - float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 - int64_t nobs = 0, i, j, N = len(values) - int64_t s, e - ndarray[float64_t] output - - output = np.empty(N, dtype=float) - - with nogil: - - for i in range(minp - 1): - add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) - output[i] = NaN - - for i in range(minp - 1, N): - add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) - - if i > win - 1: - prev = values[i - win] - remove_kurt(prev, &nobs, &x, &xx, &xxx, &xxxx) - - output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx) - - return output - - -def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_kurt(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 @@ -999,28 +826,8 @@ cdef inline numeric calc_mm(int64_t minp, Py_ssize_t nobs, return result -def roll_max_fixed(float64_t[:] values, int64_t[:] start, - int64_t[:] end, int64_t minp, int64_t win): - """ - Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. - - Parameters - ---------- - values : np.ndarray[np.float64] - window : int, size of rolling window - minp : if number of observations in window - is below this, output a NaN - index : ndarray, optional - index for window computation - closed : 'right', 'left', 'both', 'neither' - make the interval closed on the right, left, - both or neither endpoints - """ - return _roll_min_max_fixed(values, minp, win, is_max=1) - - -def roll_max_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): +def roll_max(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): """ Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -1036,11 +843,11 @@ def roll_max_variable(ndarray[float64_t] values, ndarray[int64_t] start, make the interval closed on the right, left, both or neither endpoints """ - return _roll_min_max_variable(values, start, end, minp, is_max=1) + return _roll_min_max(values, start, end, minp, is_max=1) -def roll_min_fixed(float64_t[:] values, int64_t[:] start, - int64_t[:] end, int64_t minp, int64_t win): +def roll_min(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): """ Moving min of 1d array of any numeric type along axis=0 ignoring NaNs. @@ -1053,31 +860,14 @@ def roll_min_fixed(float64_t[:] values, int64_t[:] start, index : ndarray, optional index for window computation """ - return _roll_min_max_fixed(values, minp, win, is_max=0) - + return _roll_min_max(values, start, end, minp, is_max=0) -def roll_min_variable(ndarray[float64_t] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp): - """ - Moving min of 1d array of any numeric type along axis=0 ignoring NaNs. - Parameters - ---------- - values : np.ndarray[np.float64] - window : int, size of rolling window - minp : if number of observations in window - is below this, output a NaN - index : ndarray, optional - index for window computation - """ - return _roll_min_max_variable(values, start, end, minp, is_max=0) - - -cdef _roll_min_max_variable(ndarray[numeric] values, - ndarray[int64_t] starti, - ndarray[int64_t] endi, - int64_t minp, - bint is_max): +cdef _roll_min_max(ndarray[numeric] values, + ndarray[int64_t] starti, + ndarray[int64_t] endi, + int64_t minp, + bint is_max): cdef: numeric ai int64_t i, k, curr_win_size, start @@ -1140,93 +930,6 @@ cdef _roll_min_max_variable(ndarray[numeric] values, return output -cdef _roll_min_max_fixed(numeric[:] values, - int64_t minp, - int64_t win, - bint is_max): - cdef: - numeric ai - bint should_replace - int64_t i, removed, window_i, - Py_ssize_t nobs = 0, N = len(values) - int64_t* death - numeric* ring - numeric* minvalue - numeric* end - numeric* last - ndarray[float64_t, ndim=1] output - - output = np.empty(N, dtype=float) - # setup the rings of death! - ring = malloc(win * sizeof(numeric)) - death = malloc(win * sizeof(int64_t)) - - end = ring + win - last = ring - minvalue = ring - ai = values[0] - minvalue[0] = init_mm(values[0], &nobs, is_max) - death[0] = win - nobs = 0 - - with nogil: - - for i in range(N): - ai = init_mm(values[i], &nobs, is_max) - - if i >= win: - remove_mm(values[i - win], &nobs) - - if death[minvalue - ring] == i: - minvalue = minvalue + 1 - if minvalue >= end: - minvalue = ring - - if is_max: - should_replace = ai >= minvalue[0] - else: - should_replace = ai <= minvalue[0] - if should_replace: - - minvalue[0] = ai - death[minvalue - ring] = i + win - last = minvalue - - else: - - if is_max: - should_replace = last[0] <= ai - else: - should_replace = last[0] >= ai - while should_replace: - if last == ring: - last = end - last -= 1 - if is_max: - should_replace = last[0] <= ai - else: - should_replace = last[0] >= ai - - last += 1 - if last == end: - last = ring - last[0] = ai - death[last - ring] = i + win - - output[i] = calc_mm(minp, nobs, minvalue[0]) - - for i in range(minp - 1): - if numeric in cython.floating: - output[i] = NaN - else: - output[i] = 0 - - free(ring) - free(death) - - return output - - cdef enum InterpolationType: LINEAR, LOWER, @@ -1356,19 +1059,16 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, return output -def roll_generic_fixed(object obj, - ndarray[int64_t] start, ndarray[int64_t] end, - int64_t minp, int64_t win, - int offset, object func, bint raw, - object args, object kwargs): +def roll_generic(object obj, + ndarray[int64_t] start, ndarray[int64_t] end, + int64_t minp, + int offset, object func, bint raw, + object args, object kwargs): cdef: - ndarray[float64_t] output, counts, bufarr + ndarray[float64_t] output, counts ndarray[float64_t, cast=True] arr - float64_t *buf - float64_t *oldbuf - int64_t nobs = 0, i, j, s, e, N = len(start) + Py_ssize_t i, s, e, N = len(start), n = len(obj) - n = len(obj) if n == 0: return obj @@ -1379,83 +1079,15 @@ def roll_generic_fixed(object obj, if not arr.flags.c_contiguous: arr = arr.copy('C') - counts = roll_sum_fixed(np.concatenate([np.isfinite(arr).astype(float), - np.array([0.] * offset)]), - start, end, minp, win)[offset:] + counts = roll_sum(np.isfinite(arr).astype(float), start, end, minp) output = np.empty(N, dtype=float) - if not raw: - # series - for i in range(N): - if counts[i] >= minp: - sl = slice(int_max(i + offset - win + 1, 0), - int_min(i + offset + 1, N)) - output[i] = func(obj.iloc[sl], *args, **kwargs) - else: - output[i] = NaN - - else: - - # truncated windows at the beginning, through first full-length window - for i in range((int_min(win, N) - offset)): - if counts[i] >= minp: - output[i] = func(arr[0: (i + offset + 1)], *args, **kwargs) - else: - output[i] = NaN - - # remaining full-length windows - for j, i in enumerate(range((win - offset), (N - offset)), 1): - if counts[i] >= minp: - output[i] = func(arr[j:j + win], *args, **kwargs) - else: - output[i] = NaN - - # truncated windows at the end - for i in range(int_max(N - offset, 0), N): - if counts[i] >= minp: - output[i] = func(arr[int_max(i + offset - win + 1, 0): N], - *args, - **kwargs) - else: - output[i] = NaN - - return output + #if offset != 0: + # raise ValueError("unable to roll_generic with a non-zero offset") + for i in range(N): -def roll_generic_variable(object obj, - ndarray[int64_t] start, ndarray[int64_t] end, - int64_t minp, - int offset, object func, bint raw, - object args, object kwargs): - cdef: - ndarray[float64_t] output, counts, bufarr - ndarray[float64_t, cast=True] arr - float64_t *buf - float64_t *oldbuf - int64_t nobs = 0, i, j, s, e, N = len(start) - - n = len(obj) - if n == 0: - return obj - - arr = np.asarray(obj) - - # ndarray input - if raw: - if not arr.flags.c_contiguous: - arr = arr.copy('C') - - counts = roll_sum_variable(np.concatenate([np.isfinite(arr).astype(float), - np.array([0.] * offset)]), - start, end, minp)[offset:] - - output = np.empty(N, dtype=float) - - if offset != 0: - raise ValueError("unable to roll_generic with a non-zero offset") - - for i in range(0, N): s = start[i] e = end[i] diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 21a7164411fb7..885185281ad6c 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -445,16 +445,6 @@ def _get_roll_func(self, func_name: str) -> Callable: ) return window_func - def _get_cython_func_type(self, func: str) -> Callable: - """ - Return a variable or fixed cython function type. - - Variable algorithms do not use window while fixed do. - """ - if self.is_freq_type or isinstance(self.window, BaseIndexer): - return self._get_roll_func(f"{func}_variable") - return partial(self._get_roll_func(f"{func}_fixed"), win=self._get_window()) - def _get_window_indexer(self, window: int) -> BaseIndexer: """ Return an indexer class that will compute the window start and end bounds @@ -1386,7 +1376,8 @@ def apply( apply_func = self._generate_cython_apply_func( args, kwargs, raw, offset, func ) - center = False + center = self.center + # center = False else: raise ValueError("engine must be either 'numba' or 'cython'") @@ -1407,7 +1398,7 @@ def _generate_cython_apply_func(self, args, kwargs, raw, offset, func): from pandas import Series window_func = partial( - self._get_cython_func_type("roll_generic"), + self._get_roll_func("roll_generic"), args=args, kwargs=kwargs, raw=raw, @@ -1424,7 +1415,7 @@ def apply_func(values, begin, end, min_periods, raw=raw): def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) - window_func = self._get_cython_func_type("roll_sum") + window_func = self._get_roll_func("roll_sum") kwargs.pop("floor", None) return self._apply( window_func, center=self.center, floor=0, name="sum", **kwargs @@ -1443,7 +1434,7 @@ def sum(self, *args, **kwargs): def max(self, *args, **kwargs): nv.validate_window_func("max", args, kwargs) - window_func = self._get_cython_func_type("roll_max") + window_func = self._get_roll_func("roll_max") return self._apply(window_func, center=self.center, name="max", **kwargs) _shared_docs["min"] = dedent( @@ -1485,12 +1476,12 @@ def max(self, *args, **kwargs): def min(self, *args, **kwargs): nv.validate_window_func("min", args, kwargs) - window_func = self._get_cython_func_type("roll_min") + window_func = self._get_roll_func("roll_min") return self._apply(window_func, center=self.center, name="min", **kwargs) def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) - window_func = self._get_cython_func_type("roll_mean") + window_func = self._get_roll_func("roll_mean") return self._apply(window_func, center=self.center, name="mean", **kwargs) _shared_docs["median"] = dedent( @@ -1539,7 +1530,7 @@ def median(self, **kwargs): def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) kwargs.pop("require_min_periods", None) - window_func = self._get_cython_func_type("roll_var") + window_func = self._get_roll_func("roll_var") def zsqrt_func(values, begin, end, min_periods): return zsqrt(window_func(values, begin, end, min_periods, ddof=ddof)) @@ -1557,7 +1548,7 @@ def zsqrt_func(values, begin, end, min_periods): def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) kwargs.pop("require_min_periods", None) - window_func = partial(self._get_cython_func_type("roll_var"), ddof=ddof) + window_func = partial(self._get_roll_func("roll_var"), ddof=ddof) # ddof passed again for compat with groupby.rolling return self._apply( window_func, @@ -1580,7 +1571,7 @@ def var(self, ddof=1, *args, **kwargs): """ def skew(self, **kwargs): - window_func = self._get_cython_func_type("roll_skew") + window_func = self._get_roll_func("roll_skew") kwargs.pop("require_min_periods", None) return self._apply( window_func, @@ -1623,7 +1614,7 @@ def skew(self, **kwargs): ) def kurt(self, **kwargs): - window_func = self._get_cython_func_type("roll_kurt") + window_func = self._get_roll_func("roll_kurt") kwargs.pop("require_min_periods", None) return self._apply( window_func, @@ -1689,9 +1680,9 @@ def kurt(self, **kwargs): def quantile(self, quantile, interpolation="linear", **kwargs): if quantile == 1.0: - window_func = self._get_cython_func_type("roll_max") + window_func = self._get_roll_func("roll_max") elif quantile == 0.0: - window_func = self._get_cython_func_type("roll_min") + window_func = self._get_roll_func("roll_min") else: window_func = partial( self._get_roll_func("roll_quantile"), @@ -2237,16 +2228,6 @@ def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: obj = obj.take(groupby_order) return super()._create_data(obj) - def _get_cython_func_type(self, func: str) -> Callable: - """ - Return the cython function type. - - RollingGroupby needs to always use "variable" algorithms since processing - the data in group order may not be monotonic with the data which - "fixed" algorithms assume - """ - return self._get_roll_func(f"{func}_variable") - def _get_window_indexer(self, window: int) -> GroupbyRollingIndexer: """ Return an indexer class that will compute the window start and end bounds diff --git a/pandas/tests/window/test_rolling.py b/pandas/tests/window/test_rolling.py index 88afcec0f7bf4..1df9a291e1b2e 100644 --- a/pandas/tests/window/test_rolling.py +++ b/pandas/tests/window/test_rolling.py @@ -767,7 +767,7 @@ def test_rolling_numerical_too_large_numbers(): ds[2] = -9e33 result = ds.rolling(5).mean() expected = pd.Series( - [np.nan, np.nan, np.nan, np.nan, -1.8e33, -1.8e33, -1.8e33, 0.0, 6.0, 7.0], + [np.nan, np.nan, np.nan, np.nan, -1.8e33, -1.8e33, -1.8e33, 5.0, 6.0, 7.0], index=dates, ) tm.assert_series_equal(result, expected) From 57d86074d31edea5b774595d7aed56b85b4d5788 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Tue, 22 Sep 2020 17:49:45 -0700 Subject: [PATCH 02/13] Rename function and move offset to correct location --- pandas/_libs/window/aggregations.pyx | 10 +++++----- pandas/core/window/rolling.py | 5 ++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 427cfff72eccd..3e9f5765ad88e 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1059,11 +1059,11 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, return output -def roll_generic(object obj, - ndarray[int64_t] start, ndarray[int64_t] end, - int64_t minp, - int offset, object func, bint raw, - object args, object kwargs): +def roll_apply(object obj, + ndarray[int64_t] start, ndarray[int64_t] end, + int64_t minp, + int offset, object func, bint raw, + tuple args, dict kwargs): cdef: ndarray[float64_t] output, counts ndarray[float64_t, cast=True] arr diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index b674be6adbaf5..380bde9954bc9 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1383,7 +1383,6 @@ def apply( args, kwargs, raw, offset, func ) center = self.center - # center = False else: raise ValueError("engine must be either 'numba' or 'cython'") @@ -1404,11 +1403,11 @@ def _generate_cython_apply_func(self, args, kwargs, raw, offset, func): from pandas import Series window_func = partial( - self._get_roll_func("roll_generic"), + self._get_roll_func("roll_apply"), + offset=offset, args=args, kwargs=kwargs, raw=raw, - offset=offset, func=func, ) From f057ac6e84e756f42e65248f4346b7bd32d53885 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 24 Sep 2020 00:36:10 -0700 Subject: [PATCH 03/13] Impliment center in terms of indexers --- pandas/_libs/window/aggregations.pyx | 5 +---- pandas/core/window/indexers.py | 11 +++++++++++ pandas/core/window/rolling.py | 21 ++++++--------------- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 3e9f5765ad88e..8ae5c23b5e828 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -1062,7 +1062,7 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, def roll_apply(object obj, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp, - int offset, object func, bint raw, + object func, bint raw, tuple args, dict kwargs): cdef: ndarray[float64_t] output, counts @@ -1083,9 +1083,6 @@ def roll_apply(object obj, output = np.empty(N, dtype=float) - #if offset != 0: - # raise ValueError("unable to roll_generic with a non-zero offset") - for i in range(N): s = start[i] diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index a21521f4ce8bb..2e4a353132931 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -89,6 +89,17 @@ def get_window_bounds( end_s = np.arange(self.window_size, dtype="int64") + 1 end_e = start_e + self.window_size end = np.concatenate([end_s, end_e])[:num_values] + + if center and self.window_size > 2: + offset = (self.window_size - 1) // 2 + start_s_buffer = np.roll(start, -offset)[:-offset] + start_e_buffer = np.arange(start[-1] + 1, start[-1] + 1 + offset) + start = np.concatenate([start_s_buffer, start_e_buffer]) + + end_s_buffer = np.roll(end, -offset)[:-offset] + end_e_buffer = np.array([end[-1]] * offset) + end = np.concatenate([end_s_buffer, end_e_buffer]) + return start, end diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index a06f57dfc74a8..eebc50aa3fbe8 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -550,13 +550,9 @@ def homogeneous_func(values: np.ndarray): if values.size == 0: return values.copy() - offset = calculate_center_offset(window) if center else 0 - additional_nans = np.array([np.nan] * offset) - if not is_weighted: def calc(x): - x = np.concatenate((x, additional_nans)) if not isinstance(self.window, BaseIndexer): min_periods = calculate_min_periods( window, self.min_periods, len(x), require_min_periods, floor @@ -580,6 +576,8 @@ def calc(x): else: def calc(x): + offset = calculate_center_offset(window) if center else 0 + additional_nans = np.array([np.nan] * offset) x = np.concatenate((x, additional_nans)) return func(x, window, self.min_periods) @@ -593,7 +591,7 @@ def calc(x): if use_numba_cache: NUMBA_FUNC_CACHE[(kwargs["original_func"], "rolling_apply")] = func - if center: + if center and is_weighted: result = self._center_window(result, window) return result @@ -1371,25 +1369,19 @@ def apply( if raw is False: raise ValueError("raw must be `True` when using the numba engine") apply_func = generate_numba_apply_func(args, kwargs, func, engine_kwargs) - center = self.center elif engine in ("cython", None): if engine_kwargs is not None: raise ValueError("cython engine does not accept engine_kwargs") - # Cython apply functions handle center, so don't need to use - # _apply's center handling - window = self._get_window() - offset = calculate_center_offset(window) if self.center else 0 apply_func = self._generate_cython_apply_func( - args, kwargs, raw, offset, func + args, kwargs, raw, func ) - center = self.center else: raise ValueError("engine must be either 'numba' or 'cython'") # name=func & raw=raw for WindowGroupByMixin._apply return self._apply( apply_func, - center=center, + center=self.center, floor=0, name=func, use_numba_cache=maybe_use_numba(engine), @@ -1399,12 +1391,11 @@ def apply( kwargs=kwargs, ) - def _generate_cython_apply_func(self, args, kwargs, raw, offset, func): + def _generate_cython_apply_func(self, args, kwargs, raw, func): from pandas import Series window_func = partial( self._get_roll_func("roll_apply"), - offset=offset, args=args, kwargs=kwargs, raw=raw, From f00f16ed5c006c3dd3575d1b563ed92684e6a5e0 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 24 Sep 2020 20:30:27 -0700 Subject: [PATCH 04/13] Get all the tests to pass --- pandas/core/window/indexers.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index 2e4a353132931..767de01e83739 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -91,13 +91,16 @@ def get_window_bounds( end = np.concatenate([end_s, end_e])[:num_values] if center and self.window_size > 2: - offset = (self.window_size - 1) // 2 - start_s_buffer = np.roll(start, -offset)[:-offset] - start_e_buffer = np.arange(start[-1] + 1, start[-1] + 1 + offset) - start = np.concatenate([start_s_buffer, start_e_buffer]) + offset = min((self.window_size - 1) // 2, num_values - 1) + start_s_buffer = np.roll(start, -offset)[: num_values - offset] + end_s_buffer = np.roll(end, -offset)[: num_values - offset] + + start_e_buffer = np.arange( + start[-1] + 1, start[-1] + 1 + offset, dtype="int64" + ) + end_e_buffer = np.array([end[-1]] * offset, dtype="int64") - end_s_buffer = np.roll(end, -offset)[:-offset] - end_e_buffer = np.array([end[-1]] * offset) + start = np.concatenate([start_s_buffer, start_e_buffer]) end = np.concatenate([end_s_buffer, end_e_buffer]) return start, end From 3ba5e8ab9a2131b7f7aaf919b34097dd34ce9012 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 24 Sep 2020 20:50:17 -0700 Subject: [PATCH 05/13] Remove center from _apply as no longer needed --- pandas/core/window/common.py | 1 - pandas/core/window/rolling.py | 45 ++++++++++------------------------- 2 files changed, 13 insertions(+), 33 deletions(-) diff --git a/pandas/core/window/common.py b/pandas/core/window/common.py index 6452eb8c6b3a9..d459962c14431 100644 --- a/pandas/core/window/common.py +++ b/pandas/core/window/common.py @@ -65,7 +65,6 @@ def __init__(self, obj, *args, **kwargs): def _apply( self, func: Callable, - center: bool, require_min_periods: int = 0, floor: int = 1, is_weighted: bool = False, diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index eebc50aa3fbe8..14786f4435235 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -508,7 +508,6 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: def _apply( self, func: Callable, - center: bool, require_min_periods: int = 0, floor: int = 1, is_weighted: bool = False, @@ -524,7 +523,6 @@ def _apply( Parameters ---------- func : callable function to apply - center : bool require_min_periods : int floor : int is_weighted : bool @@ -576,7 +574,7 @@ def calc(x): else: def calc(x): - offset = calculate_center_offset(window) if center else 0 + offset = calculate_center_offset(window) if self.center else 0 additional_nans = np.array([np.nan] * offset) x = np.concatenate((x, additional_nans)) return func(x, window, self.min_periods) @@ -591,7 +589,7 @@ def calc(x): if use_numba_cache: NUMBA_FUNC_CACHE[(kwargs["original_func"], "rolling_apply")] = func - if center and is_weighted: + if self.center and is_weighted: result = self._center_window(result, window) return result @@ -1176,9 +1174,7 @@ def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) window_func = self._get_roll_func("roll_weighted_sum") window_func = get_weighted_roll_func(window_func) - return self._apply( - window_func, center=self.center, is_weighted=True, name="sum", **kwargs - ) + return self._apply(window_func, is_weighted=True, name="sum", **kwargs) @Substitution(name="window") @Appender(_shared_docs["mean"]) @@ -1186,9 +1182,7 @@ def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) window_func = self._get_roll_func("roll_weighted_mean") window_func = get_weighted_roll_func(window_func) - return self._apply( - window_func, center=self.center, is_weighted=True, name="mean", **kwargs - ) + return self._apply(window_func, is_weighted=True, name="mean", **kwargs) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["var"]) @@ -1197,9 +1191,7 @@ def var(self, ddof=1, *args, **kwargs): window_func = partial(self._get_roll_func("roll_weighted_var"), ddof=ddof) window_func = get_weighted_roll_func(window_func) kwargs.pop("name", None) - return self._apply( - window_func, center=self.center, is_weighted=True, name="var", **kwargs - ) + return self._apply(window_func, is_weighted=True, name="var", **kwargs) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["std"]) @@ -1372,16 +1364,13 @@ def apply( elif engine in ("cython", None): if engine_kwargs is not None: raise ValueError("cython engine does not accept engine_kwargs") - apply_func = self._generate_cython_apply_func( - args, kwargs, raw, func - ) + apply_func = self._generate_cython_apply_func(args, kwargs, raw, func) else: raise ValueError("engine must be either 'numba' or 'cython'") # name=func & raw=raw for WindowGroupByMixin._apply return self._apply( apply_func, - center=self.center, floor=0, name=func, use_numba_cache=maybe_use_numba(engine), @@ -1413,9 +1402,7 @@ def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) window_func = self._get_roll_func("roll_sum") kwargs.pop("floor", None) - return self._apply( - window_func, center=self.center, floor=0, name="sum", **kwargs - ) + return self._apply(window_func, floor=0, name="sum", **kwargs) _shared_docs["max"] = dedent( """ @@ -1431,7 +1418,7 @@ def sum(self, *args, **kwargs): def max(self, *args, **kwargs): nv.validate_window_func("max", args, kwargs) window_func = self._get_roll_func("roll_max") - return self._apply(window_func, center=self.center, name="max", **kwargs) + return self._apply(window_func, name="max", **kwargs) _shared_docs["min"] = dedent( """ @@ -1473,12 +1460,12 @@ def max(self, *args, **kwargs): def min(self, *args, **kwargs): nv.validate_window_func("min", args, kwargs) window_func = self._get_roll_func("roll_min") - return self._apply(window_func, center=self.center, name="min", **kwargs) + return self._apply(window_func, name="min", **kwargs) def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) window_func = self._get_roll_func("roll_mean") - return self._apply(window_func, center=self.center, name="mean", **kwargs) + return self._apply(window_func, name="mean", **kwargs) _shared_docs["median"] = dedent( """ @@ -1521,7 +1508,7 @@ def median(self, **kwargs): window_func = self._get_roll_func("roll_median_c") # GH 32865. Move max window size calculation to # the median function implementation - return self._apply(window_func, center=self.center, name="median", **kwargs) + return self._apply(window_func, name="median", **kwargs) def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) @@ -1534,7 +1521,6 @@ def zsqrt_func(values, begin, end, min_periods): # ddof passed again for compat with groupby.rolling return self._apply( zsqrt_func, - center=self.center, require_min_periods=1, name="std", ddof=ddof, @@ -1548,7 +1534,6 @@ def var(self, ddof=1, *args, **kwargs): # ddof passed again for compat with groupby.rolling return self._apply( window_func, - center=self.center, require_min_periods=1, name="var", ddof=ddof, @@ -1571,7 +1556,6 @@ def skew(self, **kwargs): kwargs.pop("require_min_periods", None) return self._apply( window_func, - center=self.center, require_min_periods=3, name="skew", **kwargs, @@ -1614,7 +1598,6 @@ def kurt(self, **kwargs): kwargs.pop("require_min_periods", None) return self._apply( window_func, - center=self.center, require_min_periods=4, name="kurt", **kwargs, @@ -1690,7 +1673,7 @@ def quantile(self, quantile, interpolation="linear", **kwargs): # Pass through for groupby.rolling kwargs["quantile"] = quantile kwargs["interpolation"] = interpolation - return self._apply(window_func, center=self.center, name="quantile", **kwargs) + return self._apply(window_func, name="quantile", **kwargs) _shared_docs[ "cov" @@ -2037,7 +2020,7 @@ def count(self): # when using a BaseIndexer subclass as a window if self.is_freq_type or isinstance(self.window, BaseIndexer): window_func = self._get_roll_func("roll_count") - return self._apply(window_func, center=self.center, name="count") + return self._apply(window_func, name="count") return super().count() @@ -2164,7 +2147,6 @@ class RollingGroupby(WindowGroupByMixin, Rolling): def _apply( self, func: Callable, - center: bool, require_min_periods: int = 0, floor: int = 1, is_weighted: bool = False, @@ -2175,7 +2157,6 @@ def _apply( result = Rolling._apply( self, func, - center, require_min_periods, floor, is_weighted, From 980a8efc293e9e11699738a3d43c78223f7d6838 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 24 Sep 2020 21:10:40 -0700 Subject: [PATCH 06/13] Add better typing --- pandas/core/window/rolling.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 14786f4435235..d928eae95de89 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -72,22 +72,21 @@ from pandas.core.internals import Block # noqa:F401 -def calculate_center_offset(window) -> int: +def calculate_center_offset(window: np.ndarray) -> int: """ - Calculate an offset necessary to have the window label to be centered. + Calculate an offset necessary to have the window label to be centered + for weighted windows. Parameters ---------- - window: ndarray or int - window weights or window + window: ndarray + window weights Returns ------- int """ - if not is_integer(window): - window = len(window) - return int((window - 1) / 2.0) + return (len(window) - 1) // 2 def calculate_min_periods( @@ -417,9 +416,9 @@ def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"): # insert at the end result[name] = extra_col - def _center_window(self, result: np.ndarray, window) -> np.ndarray: + def _center_window(self, result: np.ndarray, window: np.ndarray) -> np.ndarray: """ - Center the result in the window. + Center the result in the window for weighted rolling aggregations. """ if self.axis > result.ndim - 1: raise ValueError("Requested axis is larger then no. of argument dimensions") From 21443f735f62a07cf1886bfa50302b9feb396b08 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Thu, 24 Sep 2020 23:06:09 -0700 Subject: [PATCH 07/13] Deal with numeric precision --- pandas/_libs/window/aggregations.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 8ae5c23b5e828..33bad4f56423d 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -356,7 +356,7 @@ cdef inline float64_t calc_var(int64_t minp, int ddof, float64_t nobs, result = 0 else: result = ssqdm_x / (nobs - ddof) - if result < 0: + if result < 1e-15: result = 0 else: result = NaN From b9ef1a028bfc0a02c24896470158963f9939c04e Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 27 Sep 2020 12:22:51 -0700 Subject: [PATCH 08/13] Remove code related to center now being implimented in the fixed indexer --- pandas/core/window/indexers.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pandas/core/window/indexers.py b/pandas/core/window/indexers.py index 767de01e83739..023f598f606f3 100644 --- a/pandas/core/window/indexers.py +++ b/pandas/core/window/indexers.py @@ -341,10 +341,4 @@ def get_window_bounds( end_arrays.append(window_indicies.take(ensure_platform_int(end))) start = np.concatenate(start_arrays) end = np.concatenate(end_arrays) - # GH 35552: Need to adjust start and end based on the nans appended to values - # when center=True - if num_values > len(start): - offset = num_values - len(start) - start = np.concatenate([start, np.array([end[-1]] * offset)]) - end = np.concatenate([end, np.array([end[-1]] * offset)]) return start, end From 7670551862b154de91d3d174f8c160661f6369f8 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 27 Sep 2020 12:44:30 -0700 Subject: [PATCH 09/13] Add note regarding precision issues --- pandas/_libs/window/aggregations.pyx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 33bad4f56423d..dadcb5d59225d 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -356,6 +356,8 @@ cdef inline float64_t calc_var(int64_t minp, int ddof, float64_t nobs, result = 0 else: result = ssqdm_x / (nobs - ddof) + # Fix for numerical imprecision. + # Can be result < 0 once Kahan Summation is implemented if result < 1e-15: result = 0 else: From e6cf7726c99cd9e7bdab5612ac877153e34af1e4 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 2 Oct 2020 13:25:59 -0700 Subject: [PATCH 10/13] Note performance hit --- doc/source/whatsnew/v1.2.0.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 016e8d90e7d21..2ffe99f598262 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -280,6 +280,7 @@ Performance improvements - Performance improvement in :meth:`GroupBy.transform` with the ``numba`` engine (:issue:`36240`) - ``Styler`` uuid method altered to compress data transmission over web whilst maintaining reasonably low table collision probability (:issue:`36345`) - Performance improvement in :meth:`pd.to_datetime` with non-ns time unit for ``float`` ``dtype`` columns (:issue:`20445`) +- Small performance decrease to :meth:`~Rolling.min` and :meth:`~Rolling.max` for fixed windows (:issue:`36567`) .. --------------------------------------------------------------------------- From 10d517747e02fe75e4b248b0bc57186f71801995 Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 2 Oct 2020 13:28:39 -0700 Subject: [PATCH 11/13] Remove tilde --- doc/source/whatsnew/v1.2.0.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 2ffe99f598262..34ed062d26e60 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -280,7 +280,7 @@ Performance improvements - Performance improvement in :meth:`GroupBy.transform` with the ``numba`` engine (:issue:`36240`) - ``Styler`` uuid method altered to compress data transmission over web whilst maintaining reasonably low table collision probability (:issue:`36345`) - Performance improvement in :meth:`pd.to_datetime` with non-ns time unit for ``float`` ``dtype`` columns (:issue:`20445`) -- Small performance decrease to :meth:`~Rolling.min` and :meth:`~Rolling.max` for fixed windows (:issue:`36567`) +- Small performance decrease to :meth:`Rolling.min` and :meth:`Rolling.max` for fixed windows (:issue:`36567`) .. --------------------------------------------------------------------------- From 4913c845e84b5999146b3f8261409175bd33579a Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Fri, 2 Oct 2020 14:27:54 -0700 Subject: [PATCH 12/13] Change useage of get_cython_func_type --- pandas/core/window/rolling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 35e0183d0a7d0..7427d3fd1fdc9 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1253,7 +1253,7 @@ class RollingAndExpandingMixin(BaseWindow): ) def count(self): - window_func = self._get_cython_func_type("roll_sum") + window_func = self._get_roll_func("roll_sum") return self._apply(window_func, center=self.center, name="count") _shared_docs["apply"] = dedent( From cbce3b0ba5175abd24c99e1e87d23094b99eb8fd Mon Sep 17 00:00:00 2001 From: Matt Roeschke Date: Sun, 4 Oct 2020 18:31:09 -0700 Subject: [PATCH 13/13] Remove self.center from count's _apply --- pandas/core/window/rolling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index a6655a6b4a8ce..bb15e60a0859c 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1254,7 +1254,7 @@ class RollingAndExpandingMixin(BaseWindow): def count(self): window_func = self._get_roll_func("roll_sum") - return self._apply(window_func, center=self.center, name="count") + return self._apply(window_func, name="count") _shared_docs["apply"] = dedent( r"""