diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index 131e0154ce8fc..fe97c048d4472 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -3,7 +3,6 @@ import cython from cython import Py_ssize_t -from libc.stdlib cimport free, malloc from libcpp.deque cimport deque import numpy as np @@ -906,7 +905,7 @@ interpolation_types = { def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, - ndarray[int64_t] end, int64_t minp, int64_t win, + ndarray[int64_t] end, int64_t minp, float64_t quantile, str interpolation): """ O(N log(window)) implementation using skip list @@ -933,7 +932,7 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, # actual skiplist ops outweigh any window computation costs output = np.empty(N, dtype=float) - if win == 0 or (end - start).max() == 0: + if (end - start).max() == 0: output[:] = NaN return output win = (end - start).max() @@ -1020,7 +1019,7 @@ def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, def roll_apply(object obj, ndarray[int64_t] start, ndarray[int64_t] end, int64_t minp, - object func, bint raw, + object function, bint raw, tuple args, dict kwargs): cdef: ndarray[float64_t] output, counts @@ -1048,9 +1047,9 @@ def roll_apply(object obj, if counts[i] >= minp: if raw: - output[i] = func(arr[s:e], *args, **kwargs) + output[i] = function(arr[s:e], *args, **kwargs) else: - output[i] = func(obj.iloc[s:e], *args, **kwargs) + output[i] = function(obj.iloc[s:e], *args, **kwargs) else: output[i] = NaN diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 25938b57d9720..9f7040943d9a3 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -1,7 +1,7 @@ import datetime from functools import partial from textwrap import dedent -from typing import Optional, Union +from typing import TYPE_CHECKING, Optional, Union import numpy as np @@ -17,6 +17,10 @@ from pandas.core.window.common import _doc_template, _shared_docs, zsqrt from pandas.core.window.rolling import BaseWindow, flex_binary_moment +if TYPE_CHECKING: + from pandas import Series + + _bias_template = """ Parameters ---------- @@ -60,6 +64,15 @@ def get_center_of_mass( return float(comass) +def wrap_result(obj: "Series", result: np.ndarray) -> "Series": + """ + Wrap a single 1D result. + """ + obj = obj._selected_obj + + return obj._constructor(result, obj.index, name=obj.name) + + class ExponentialMovingWindow(BaseWindow): r""" Provide exponential weighted (EW) functions. @@ -413,7 +426,7 @@ def _get_cov(X, Y): self.min_periods, bias, ) - return X._wrap_result(cov) + return wrap_result(X, cov) return flex_binary_moment( self._selected_obj, other._selected_obj, _get_cov, pairwise=bool(pairwise) @@ -467,7 +480,7 @@ def _cov(x, y): x_var = _cov(x_values, x_values) y_var = _cov(y_values, y_values) corr = cov / zsqrt(x_var * y_var) - return X._wrap_result(corr) + return wrap_result(X, corr) return flex_binary_moment( self._selected_obj, other._selected_obj, _get_corr, pairwise=bool(pairwise) diff --git a/pandas/core/window/expanding.py b/pandas/core/window/expanding.py index 44e0e36b61d46..aa1dfe8567c15 100644 --- a/pandas/core/window/expanding.py +++ b/pandas/core/window/expanding.py @@ -1,6 +1,9 @@ from textwrap import dedent -from typing import Dict, Optional +from typing import Any, Callable, Dict, Optional, Tuple, Union +import numpy as np + +from pandas._typing import FrameOrSeries from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, doc @@ -65,7 +68,9 @@ def __init__(self, obj, min_periods=1, center=None, axis=0, **kwargs): def _constructor(self): return Expanding - def _get_window(self, other=None, **kwargs): + def _get_window( + self, other: Optional[Union[np.ndarray, FrameOrSeries]] = None, **kwargs + ) -> int: """ Get the window length over which to perform some operation. @@ -135,12 +140,12 @@ def count(self, **kwargs): @Appender(_shared_docs["apply"]) def apply( self, - func, + func: Callable[..., Any], raw: bool = False, engine: Optional[str] = None, engine_kwargs: Optional[Dict[str, bool]] = None, - args=None, - kwargs=None, + args: Optional[Tuple[Any, ...]] = None, + kwargs: Optional[Dict[str, Any]] = None, ): return super().apply( func, @@ -183,19 +188,19 @@ def median(self, **kwargs): @Substitution(name="expanding", versionadded="") @Appender(_shared_docs["std"]) - def std(self, ddof=1, *args, **kwargs): + def std(self, ddof: int = 1, *args, **kwargs): nv.validate_expanding_func("std", args, kwargs) return super().std(ddof=ddof, **kwargs) @Substitution(name="expanding", versionadded="") @Appender(_shared_docs["var"]) - def var(self, ddof=1, *args, **kwargs): + def var(self, ddof: int = 1, *args, **kwargs): nv.validate_expanding_func("var", args, kwargs) return super().var(ddof=ddof, **kwargs) @Substitution(name="expanding") @Appender(_shared_docs["sem"]) - def sem(self, ddof=1, *args, **kwargs): + def sem(self, ddof: int = 1, *args, **kwargs): return super().sem(ddof=ddof, **kwargs) @Substitution(name="expanding", func_name="skew") @@ -245,12 +250,23 @@ def quantile(self, quantile, interpolation="linear", **kwargs): @Substitution(name="expanding", func_name="cov") @Appender(_doc_template) @Appender(_shared_docs["cov"]) - def cov(self, other=None, pairwise=None, ddof=1, **kwargs): + def cov( + self, + other: Optional[Union[np.ndarray, FrameOrSeries]] = None, + pairwise: Optional[bool] = None, + ddof: int = 1, + **kwargs, + ): return super().cov(other=other, pairwise=pairwise, ddof=ddof, **kwargs) @Substitution(name="expanding") @Appender(_shared_docs["corr"]) - def corr(self, other=None, pairwise=None, **kwargs): + def corr( + self, + other: Optional[Union[np.ndarray, FrameOrSeries]] = None, + pairwise: Optional[bool] = None, + **kwargs, + ): return super().corr(other=other, pairwise=pairwise, **kwargs) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index fdeb91d37724d..4077e4e7e039e 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -8,6 +8,7 @@ from textwrap import dedent from typing import ( TYPE_CHECKING, + Any, Callable, Dict, List, @@ -73,23 +74,6 @@ from pandas.core.internals import Block # noqa:F401 -def calculate_center_offset(window: np.ndarray) -> int: - """ - Calculate an offset necessary to have the window label to be centered - for weighted windows. - - Parameters - ---------- - window: ndarray - window weights - - Returns - ------- - int - """ - return (len(window) - 1) // 2 - - def calculate_min_periods( window: int, min_periods: Optional[int], @@ -125,28 +109,6 @@ def calculate_min_periods( return max(min_periods, floor) -def get_weighted_roll_func(cfunc: Callable) -> Callable: - """ - Wrap weighted rolling cython function with min periods argument. - - Parameters - ---------- - cfunc : function - Cython weighted rolling function - - Returns - ------- - function - """ - - def func(arg, window, min_periods=None): - if min_periods is None: - min_periods = len(window) - return cfunc(arg, window, min_periods) - - return func - - class BaseWindow(ShallowMixin, SelectionMixin): """Provides utilities for performing windowing operations.""" @@ -213,25 +175,19 @@ def validate(self) -> None: if not isinstance(self.obj, (ABCSeries, ABCDataFrame)): raise TypeError(f"invalid type: {type(self)}") if isinstance(self.window, BaseIndexer): - self._validate_get_window_bounds_signature(self.window) - - @staticmethod - def _validate_get_window_bounds_signature(window: BaseIndexer) -> None: - """ - Validate that the passed BaseIndexer subclass has - a get_window_bounds with the correct signature. - """ - get_window_bounds_signature = inspect.signature( - window.get_window_bounds - ).parameters.keys() - expected_signature = inspect.signature( - BaseIndexer().get_window_bounds - ).parameters.keys() - if get_window_bounds_signature != expected_signature: - raise ValueError( - f"{type(window).__name__} does not implement the correct signature for " - f"get_window_bounds" - ) + # Validate that the passed BaseIndexer subclass has + # a get_window_bounds with the correct signature. + get_window_bounds_signature = inspect.signature( + self.window.get_window_bounds + ).parameters.keys() + expected_signature = inspect.signature( + BaseIndexer().get_window_bounds + ).parameters.keys() + if get_window_bounds_signature != expected_signature: + raise ValueError( + f"{type(self.window).__name__} does not implement " + f"the correct signature for get_window_bounds" + ) def _create_data(self, obj: FrameOrSeries) -> FrameOrSeries: """ @@ -285,31 +241,16 @@ def __getattr__(self, attr: str): def _dir_additions(self): return self.obj._dir_additions() - def _get_win_type(self, kwargs: Dict): - """ - Exists for compatibility, overridden by subclass Window. - - Parameters - ---------- - kwargs : dict - ignored, exists for compatibility - - Returns - ------- - None - """ - return None - - def _get_window(self, other=None, win_type: Optional[str] = None) -> int: + def _get_window( + self, other: Optional[Union[np.ndarray, FrameOrSeries]] = None + ) -> int: """ Return window length. Parameters ---------- other : - ignored, exists for compatibility - win_type : - ignored, exists for compatibility + Used in Expanding Returns ------- @@ -336,7 +277,7 @@ def __repr__(self) -> str: return f"{self._window_type} [{attrs}]" def __iter__(self): - window = self._get_window(win_type=None) + window = self._get_window() obj = self._create_data(self._selected_obj) index = self._get_window_indexer(window=window) @@ -382,14 +323,6 @@ def _prep_values(self, values: Optional[np.ndarray] = None) -> np.ndarray: return values - def _wrap_result(self, result: np.ndarray) -> "Series": - """ - Wrap a single 1D result. - """ - obj = self._selected_obj - - return obj._constructor(result, obj.index, name=obj.name) - def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"): # if we have an 'on' column we want to put it back into # the results in the same location @@ -415,21 +348,7 @@ def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"): # insert at the end result[name] = extra_col - def _center_window(self, result: np.ndarray, window: np.ndarray) -> np.ndarray: - """ - Center the result in the window for weighted rolling aggregations. - """ - if self.axis > result.ndim - 1: - raise ValueError("Requested axis is larger then no. of argument dimensions") - - offset = calculate_center_offset(window) - if offset > 0: - lead_indexer = [slice(None)] * result.ndim - lead_indexer[self.axis] = slice(offset, None) - result = np.copy(result[tuple(lead_indexer)]) - return result - - def _get_roll_func(self, func_name: str) -> Callable: + def _get_roll_func(self, func_name: str) -> Callable[..., Any]: """ Wrap rolling function to check values passed. @@ -513,10 +432,9 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: def _apply( self, - func: Callable, + func: Callable[..., Any], require_min_periods: int = 0, floor: int = 1, - is_weighted: bool = False, name: Optional[str] = None, use_numba_cache: bool = False, **kwargs, @@ -531,9 +449,7 @@ def _apply( func : callable function to apply require_min_periods : int floor : int - is_weighted : bool name : str, - compatibility with groupby.rolling use_numba_cache : bool whether to cache a numba compiled function. Only available for numba enabled methods (so far only apply) @@ -544,8 +460,7 @@ def _apply( ------- y : type of input """ - win_type = self._get_win_type(kwargs) - window = self._get_window(win_type=win_type) + window = self._get_window() window_indexer = self._get_window_indexer(window) def homogeneous_func(values: np.ndarray): @@ -554,36 +469,26 @@ def homogeneous_func(values: np.ndarray): if values.size == 0: return values.copy() - if not is_weighted: - - def calc(x): - if not isinstance(self.window, BaseIndexer): - min_periods = calculate_min_periods( - window, self.min_periods, len(x), require_min_periods, floor - ) - else: - min_periods = calculate_min_periods( - window_indexer.window_size, - self.min_periods, - len(x), - require_min_periods, - floor, - ) - start, end = window_indexer.get_window_bounds( - num_values=len(x), - min_periods=self.min_periods, - center=self.center, - closed=self.closed, + def calc(x): + if not isinstance(self.window, BaseIndexer): + min_periods = calculate_min_periods( + window, self.min_periods, len(x), require_min_periods, floor ) - return func(x, start, end, min_periods) - - else: - - def calc(x): - offset = calculate_center_offset(window) if self.center else 0 - additional_nans = np.array([np.nan] * offset) - x = np.concatenate((x, additional_nans)) - return func(x, window, self.min_periods) + else: + min_periods = calculate_min_periods( + window_indexer.window_size, + self.min_periods, + len(x), + require_min_periods, + floor, + ) + start, end = window_indexer.get_window_bounds( + num_values=len(x), + min_periods=self.min_periods, + center=self.center, + closed=self.closed, + ) + return func(x, start, end, min_periods) with np.errstate(all="ignore"): if values.ndim > 1: @@ -595,9 +500,6 @@ def calc(x): if use_numba_cache: NUMBA_FUNC_CACHE[(kwargs["original_func"], "rolling_apply")] = func - if self.center and is_weighted: - result = self._center_window(result, window) - return result return self._apply_blockwise(homogeneous_func, name) @@ -890,19 +792,17 @@ def __init__(self, obj, *args, **kwargs): def _apply( self, - func: Callable, + func: Callable[..., Any], require_min_periods: int = 0, floor: int = 1, - is_weighted: bool = False, name: Optional[str] = None, use_numba_cache: bool = False, **kwargs, - ): + ) -> FrameOrSeries: result = super()._apply( func, require_min_periods, floor, - is_weighted, name, use_numba_cache, **kwargs, @@ -1166,7 +1066,7 @@ def validate(self): else: raise ValueError(f"Invalid window {window}") - def _get_win_type(self, kwargs: Dict) -> Union[str, Tuple]: + def _get_win_type(self, kwargs: Dict[str, Any]) -> Union[str, Tuple]: """ Extract arguments for the window type, provide validation for it and return the validated window type. @@ -1210,16 +1110,27 @@ def _pop_args(win_type, arg_names, kwargs): return _validate_win_type(self.win_type, kwargs) - def _get_window( - self, other=None, win_type: Optional[Union[str, Tuple]] = None + def _center_window(self, result: np.ndarray, offset: int) -> np.ndarray: + """ + Center the result in the window for weighted rolling aggregations. + """ + if self.axis > result.ndim - 1: + raise ValueError("Requested axis is larger then no. of argument dimensions") + + if offset > 0: + lead_indexer = [slice(None)] * result.ndim + lead_indexer[self.axis] = slice(offset, None) + result = np.copy(result[tuple(lead_indexer)]) + return result + + def _get_window_weights( + self, win_type: Optional[Union[str, Tuple]] = None ) -> np.ndarray: """ Get the window, weights. Parameters ---------- - other : - ignored, exists for compatibility win_type : str, or tuple type of window to create @@ -1237,6 +1148,65 @@ def _get_window( # GH #15662. `False` makes symmetric window, rather than periodic. return sig.get_window(win_type, window, False).astype(float) + def _apply( + self, + func: Callable[[np.ndarray, int, int], np.ndarray], + require_min_periods: int = 0, + floor: int = 1, + name: Optional[str] = None, + use_numba_cache: bool = False, + **kwargs, + ): + """ + Rolling with weights statistical measure using supplied function. + + Designed to be used with passed-in Cython array-based functions. + + Parameters + ---------- + func : callable function to apply + require_min_periods : int + floor : int + name : str, + use_numba_cache : bool + whether to cache a numba compiled function. Only available for numba + enabled methods (so far only apply) + **kwargs + additional arguments for rolling function and window function + + Returns + ------- + y : type of input + """ + win_type = self._get_win_type(kwargs) + window = self._get_window_weights(win_type=win_type) + offset = (len(window) - 1) // 2 if self.center else 0 + + def homogeneous_func(values: np.ndarray): + # calculation function + + if values.size == 0: + return values.copy() + + def calc(x): + additional_nans = np.array([np.nan] * offset) + x = np.concatenate((x, additional_nans)) + return func(x, window, self.min_periods or len(window)) + + with np.errstate(all="ignore"): + if values.ndim > 1: + result = np.apply_along_axis(calc, self.axis, values) + else: + result = calc(values) + result = np.asarray(result) + + if self.center: + result = self._center_window(result, offset) + + return result + + return self._apply_blockwise(homogeneous_func, name) + _agg_see_also_doc = dedent( """ See Also @@ -1288,29 +1258,26 @@ def aggregate(self, func, *args, **kwargs): def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) window_func = self._get_roll_func("roll_weighted_sum") - window_func = get_weighted_roll_func(window_func) - return self._apply(window_func, is_weighted=True, name="sum", **kwargs) + return self._apply(window_func, name="sum", **kwargs) @Substitution(name="window") @Appender(_shared_docs["mean"]) def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) window_func = self._get_roll_func("roll_weighted_mean") - window_func = get_weighted_roll_func(window_func) - return self._apply(window_func, is_weighted=True, name="mean", **kwargs) + return self._apply(window_func, name="mean", **kwargs) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["var"]) - def var(self, ddof=1, *args, **kwargs): + def var(self, ddof: int = 1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) window_func = partial(self._get_roll_func("roll_weighted_var"), ddof=ddof) - window_func = get_weighted_roll_func(window_func) kwargs.pop("name", None) - return self._apply(window_func, is_weighted=True, name="var", **kwargs) + return self._apply(window_func, name="var", **kwargs) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["std"]) - def std(self, ddof=1, *args, **kwargs): + def std(self, ddof: int = 1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) return zsqrt(self.var(ddof=ddof, name="std", **kwargs)) @@ -1425,12 +1392,12 @@ def count(self): def apply( self, - func, + func: Callable[..., Any], raw: bool = False, engine: Optional[str] = None, - engine_kwargs: Optional[Dict] = None, - args: Optional[Tuple] = None, - kwargs: Optional[Dict] = None, + engine_kwargs: Optional[Dict[str, bool]] = None, + args: Optional[Tuple[Any, ...]] = None, + kwargs: Optional[Dict[str, Any]] = None, ): if args is None: args = () @@ -1460,7 +1427,13 @@ def apply( kwargs=kwargs, ) - def _generate_cython_apply_func(self, args, kwargs, raw, func): + def _generate_cython_apply_func( + self, + args: Tuple[Any, ...], + kwargs: Dict[str, Any], + raw: bool, + function: Callable[..., Any], + ) -> Callable[[np.ndarray, np.ndarray, np.ndarray, int], np.ndarray]: from pandas import Series window_func = partial( @@ -1468,7 +1441,7 @@ def _generate_cython_apply_func(self, args, kwargs, raw, func): args=args, kwargs=kwargs, raw=raw, - func=func, + function=function, ) def apply_func(values, begin, end, min_periods, raw=raw): @@ -1589,7 +1562,7 @@ def median(self, **kwargs): # the median function implementation return self._apply(window_func, name="median", **kwargs) - def std(self, ddof=1, *args, **kwargs): + def std(self, ddof: int = 1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) window_func = self._get_roll_func("roll_var") @@ -1603,7 +1576,7 @@ def zsqrt_func(values, begin, end, min_periods): **kwargs, ) - def var(self, ddof=1, *args, **kwargs): + def var(self, ddof: int = 1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) window_func = partial(self._get_roll_func("roll_var"), ddof=ddof) return self._apply( @@ -1665,7 +1638,7 @@ def skew(self, **kwargs): """ ) - def sem(self, ddof=1, *args, **kwargs): + def sem(self, ddof: int = 1, *args, **kwargs): return self.std(*args, **kwargs) / (self.count() - ddof).pow(0.5) _shared_docs["sem"] = dedent( @@ -1781,7 +1754,7 @@ def kurt(self, **kwargs): """ ) - def quantile(self, quantile, interpolation="linear", **kwargs): + def quantile(self, quantile: float, interpolation: str = "linear", **kwargs): if quantile == 1.0: window_func = self._get_roll_func("roll_max") elif quantile == 0.0: @@ -1789,14 +1762,10 @@ def quantile(self, quantile, interpolation="linear", **kwargs): else: window_func = partial( self._get_roll_func("roll_quantile"), - win=self._get_window(), quantile=quantile, interpolation=interpolation, ) - # Pass through for groupby.rolling - kwargs["quantile"] = quantile - kwargs["interpolation"] = interpolation return self._apply(window_func, name="quantile", **kwargs) _shared_docs[ @@ -2305,7 +2274,7 @@ def _get_window_indexer(self, window: int) -> GroupbyIndexer: GroupbyIndexer """ rolling_indexer: Type[BaseIndexer] - indexer_kwargs: Optional[Dict] = None + indexer_kwargs: Optional[Dict[str, Any]] = None index_array = self._on.asi8 if isinstance(self.window, BaseIndexer): rolling_indexer = type(self.window)