From 174c167d40217c1e9ba109c99d7909fdbd23324b Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 2 Jan 2022 19:42:57 -0800 Subject: [PATCH 1/4] ENH: Add min_max kernel for rolling --- pandas/_libs/window/aggregations.pyx | 2 +- pandas/core/_numba/kernels/__init__.py | 3 +- pandas/core/_numba/kernels/min_max_.py | 70 ++++++++++++++++++++++++++ pandas/core/window/rolling.py | 34 +++++++------ pandas/tests/window/test_numba.py | 11 ++-- 5 files changed, 99 insertions(+), 21 deletions(-) create mode 100644 pandas/core/_numba/kernels/min_max_.py diff --git a/pandas/_libs/window/aggregations.pyx b/pandas/_libs/window/aggregations.pyx index be8bb61092362..5ebb60dc7e41b 100644 --- a/pandas/_libs/window/aggregations.pyx +++ b/pandas/_libs/window/aggregations.pyx @@ -960,7 +960,7 @@ cdef _roll_min_max(ndarray[numeric_t] values, with nogil: # This is using a modified version of the C++ code in this - # SO post: http://bit.ly/2nOoHlY + # SO post: https://stackoverflow.com/a/12239580 # The original impl didn't deal with variable window sizes # So the code was optimized for that diff --git a/pandas/core/_numba/kernels/__init__.py b/pandas/core/_numba/kernels/__init__.py index 2753a1e01161d..219ff023bf7dc 100644 --- a/pandas/core/_numba/kernels/__init__.py +++ b/pandas/core/_numba/kernels/__init__.py @@ -1,5 +1,6 @@ from pandas.core._numba.kernels.mean_ import sliding_mean +from pandas.core._numba.kernels.min_max_ import sliding_min_max from pandas.core._numba.kernels.sum_ import sliding_sum from pandas.core._numba.kernels.var_ import sliding_var -__all__ = ["sliding_mean", "sliding_sum", "sliding_var"] +__all__ = ["sliding_mean", "sliding_sum", "sliding_var", "sliding_min_max"] diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py new file mode 100644 index 0000000000000..bd394750dcc09 --- /dev/null +++ b/pandas/core/_numba/kernels/min_max_.py @@ -0,0 +1,70 @@ +""" +Numba 1D min/max kernels that can be shared by +* Dataframe / Series +* groupby +* rolling / expanding + +Mirrors pandas/_libs/window/aggregation.pyx +""" +from __future__ import annotations + +import numba +import numpy as np + + +@numba.jit(nopython=True, nogil=True, parallel=False) +def sliding_min_max( + values: np.ndarray, + start: np.ndarray, + end: np.ndarray, + min_periods: int, + is_max: bool, +) -> np.ndarray: + N = len(start) + nobs = 0 + output = np.empty(N, dtype=np.float64) + # Use deque once numba supports it + # https://github.com/numba/numba/issues/7417 + Q = [] + W = [] + for i in range(N): + + curr_win_size = end[i] - start[i] + if i == 0: + start = start[i] + else: + start = end[i - 1] + + for k in range(start, end[i]): + ai = values[k] + if not np.isnan(ai): + nobs += 1 + elif is_max: + ai = -np.inf + else: + ai = np.inf + # Discard previous entries if we find new min or max + if is_max: + while Q and ((ai >= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]): + Q.pop() + else: + while Q and ((ai <= values[Q[-1]]) or values[Q[-1]] != values[Q[-1]]): + Q.pop() + Q.append(k) + W.append(k) + + # Discard entries outside and left of current window + while Q and Q[0] <= start[i] - 1: + Q.pop(0) + while W and W[0] <= start[i] - 1: + if not np.isnan(values[W[0]]): + nobs -= 1 + W.pop(0) + + # Save output based on index in input value array + if Q and curr_win_size > 0 and nobs >= min_periods: + output[i] = values[Q[0]] + else: + output[i] = np.nan + + return output diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index fec7f9468d7f7..2a9bd7de2b31e 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1385,15 +1385,18 @@ def max( if maybe_use_numba(engine): if self.method == "table": func = generate_manual_numpy_nan_agg_with_axis(np.nanmax) + return self.apply( + func, + raw=True, + engine=engine, + engine_kwargs=engine_kwargs, + ) else: - func = np.nanmax + from pandas.core._numba.kernels import sliding_min_max - return self.apply( - func, - raw=True, - engine=engine, - engine_kwargs=engine_kwargs, - ) + return self._numba_apply( + sliding_min_max, "rolling_max", engine_kwargs, True + ) window_func = window_aggregations.roll_max return self._apply(window_func, name="max", **kwargs) @@ -1408,15 +1411,18 @@ def min( if maybe_use_numba(engine): if self.method == "table": func = generate_manual_numpy_nan_agg_with_axis(np.nanmin) + return self.apply( + func, + raw=True, + engine=engine, + engine_kwargs=engine_kwargs, + ) else: - func = np.nanmin + from pandas.core._numba.kernels import sliding_min_max - return self.apply( - func, - raw=True, - engine=engine, - engine_kwargs=engine_kwargs, - ) + return self._numba_apply( + sliding_min_max, "rolling_max", engine_kwargs, False + ) window_func = window_aggregations.roll_min return self._apply(window_func, name="min", **kwargs) diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 8cae9c0182724..425e87021cb30 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -61,7 +61,7 @@ def test_numba_vs_cython_rolling_methods( expected = getattr(roll, method)(engine="cython", **kwargs) # Check the cache - if method not in ("mean", "sum", "var", "std"): + if method not in ("mean", "sum", "var", "std", "max", "min"): assert ( getattr(np, f"nan{method}"), "Rolling_apply_single", @@ -88,7 +88,7 @@ def test_numba_vs_cython_expanding_methods( expected = getattr(expand, method)(engine="cython", **kwargs) # Check the cache - if method not in ("mean", "sum", "var", "std"): + if method not in ("mean", "sum", "var", "std", "max", "min"): assert ( getattr(np, f"nan{method}"), "Expanding_apply_single", @@ -150,15 +150,16 @@ def test_dont_cache_args( def add(values, x): return np.sum(values) + x + engine_kwargs = {"nopython": nopython, "nogil": nogil, "parallel": parallel} df = DataFrame({"value": [0, 0, 0]}) result = getattr(df, window)(**window_kwargs).apply( - add, raw=True, engine="numba", args=(1,) + add, raw=True, engine="numba", args=(1,), **engine_kwargs ) expected = DataFrame({"value": [1.0, 1.0, 1.0]}) tm.assert_frame_equal(result, expected) - result = getattr(df, window)(**window_kwargs).apply( - add, raw=True, engine="numba", args=(2,) + result = getattr(df, window)(method=method, **window_kwargs).apply( + add, raw=True, engine="numba", args=(2,), **engine_kwargs ) expected = DataFrame({"value": [2.0, 2.0, 2.0]}) tm.assert_frame_equal(result, expected) From 3e6e2ddea58fd6a3d0591daf781fd824699e1859 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 2 Jan 2022 20:23:44 -0800 Subject: [PATCH 2/4] PERF: expanding/rolling.min/max with engine='numba' --- doc/source/whatsnew/v1.4.0.rst | 2 +- pandas/core/_numba/kernels/min_max_.py | 6 +++--- pandas/tests/window/test_numba.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 4c3e53ddcfa26..b4bd77de05e96 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -664,7 +664,7 @@ Performance improvements - :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`) - Indexing into a :class:`SparseArray` with a ``slice`` with ``step=1`` no longer requires converting to a dense array (:issue:`43777`) - Performance improvement in :meth:`SparseArray.take` with ``allow_fill=False`` (:issue:`43654`) -- Performance improvement in :meth:`.Rolling.mean`, :meth:`.Expanding.mean`, :meth:`.Rolling.sum`, :meth:`.Expanding.sum` with ``engine="numba"`` (:issue:`43612`, :issue:`44176`) +- Performance improvement in :meth:`.Rolling.mean`, :meth:`.Expanding.mean`, :meth:`.Rolling.sum`, :meth:`.Expanding.sum`, :meth:`.Rolling.max`, :meth:`.Expanding.max`, :meth:`.Rolling.min` and :meth:`.Expanding.min` with ``engine="numba"`` (:issue:`43612`, :issue:`44176`, :issue:``) - Improved performance of :meth:`pandas.read_csv` with ``memory_map=True`` when file encoding is UTF-8 (:issue:`43787`) - Performance improvement in :meth:`RangeIndex.sort_values` overriding :meth:`Index.sort_values` (:issue:`43666`) - Performance improvement in :meth:`RangeIndex.insert` (:issue:`43988`) diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index bd394750dcc09..3fe57f667a21d 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -31,11 +31,11 @@ def sliding_min_max( curr_win_size = end[i] - start[i] if i == 0: - start = start[i] + st = start[i] else: - start = end[i - 1] + st = end[i - 1] - for k in range(start, end[i]): + for k in range(st, end[i]): ai = values[k] if not np.isnan(ai): nobs += 1 diff --git a/pandas/tests/window/test_numba.py b/pandas/tests/window/test_numba.py index 425e87021cb30..a14515ca9c018 100644 --- a/pandas/tests/window/test_numba.py +++ b/pandas/tests/window/test_numba.py @@ -152,14 +152,14 @@ def add(values, x): engine_kwargs = {"nopython": nopython, "nogil": nogil, "parallel": parallel} df = DataFrame({"value": [0, 0, 0]}) - result = getattr(df, window)(**window_kwargs).apply( - add, raw=True, engine="numba", args=(1,), **engine_kwargs + result = getattr(df, window)(method=method, **window_kwargs).apply( + add, raw=True, engine="numba", engine_kwargs=engine_kwargs, args=(1,) ) expected = DataFrame({"value": [1.0, 1.0, 1.0]}) tm.assert_frame_equal(result, expected) result = getattr(df, window)(method=method, **window_kwargs).apply( - add, raw=True, engine="numba", args=(2,), **engine_kwargs + add, raw=True, engine="numba", engine_kwargs=engine_kwargs, args=(2,) ) expected = DataFrame({"value": [2.0, 2.0, 2.0]}) tm.assert_frame_equal(result, expected) From 7fa52c159c12c003653cece4fbb24d2e7dd4541a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 2 Jan 2022 20:26:44 -0800 Subject: [PATCH 3/4] Fix cache str, whatsnew number --- doc/source/whatsnew/v1.4.0.rst | 2 +- pandas/core/window/rolling.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index b4bd77de05e96..bf9837ac73926 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -664,7 +664,7 @@ Performance improvements - :meth:`SparseArray.min` and :meth:`SparseArray.max` no longer require converting to a dense array (:issue:`43526`) - Indexing into a :class:`SparseArray` with a ``slice`` with ``step=1`` no longer requires converting to a dense array (:issue:`43777`) - Performance improvement in :meth:`SparseArray.take` with ``allow_fill=False`` (:issue:`43654`) -- Performance improvement in :meth:`.Rolling.mean`, :meth:`.Expanding.mean`, :meth:`.Rolling.sum`, :meth:`.Expanding.sum`, :meth:`.Rolling.max`, :meth:`.Expanding.max`, :meth:`.Rolling.min` and :meth:`.Expanding.min` with ``engine="numba"`` (:issue:`43612`, :issue:`44176`, :issue:``) +- Performance improvement in :meth:`.Rolling.mean`, :meth:`.Expanding.mean`, :meth:`.Rolling.sum`, :meth:`.Expanding.sum`, :meth:`.Rolling.max`, :meth:`.Expanding.max`, :meth:`.Rolling.min` and :meth:`.Expanding.min` with ``engine="numba"`` (:issue:`43612`, :issue:`44176`, :issue:`45170`) - Improved performance of :meth:`pandas.read_csv` with ``memory_map=True`` when file encoding is UTF-8 (:issue:`43787`) - Performance improvement in :meth:`RangeIndex.sort_values` overriding :meth:`Index.sort_values` (:issue:`43666`) - Performance improvement in :meth:`RangeIndex.insert` (:issue:`43988`) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index 2a9bd7de2b31e..c8538f0cf7c90 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -1421,7 +1421,7 @@ def min( from pandas.core._numba.kernels import sliding_min_max return self._numba_apply( - sliding_min_max, "rolling_max", engine_kwargs, False + sliding_min_max, "rolling_min", engine_kwargs, False ) window_func = window_aggregations.roll_min return self._apply(window_func, name="min", **kwargs) From 46b19faf1720db5e53eacbe0df60c56191526853 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 2 Jan 2022 21:50:19 -0800 Subject: [PATCH 4/4] Address mypy --- pandas/core/_numba/kernels/min_max_.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/core/_numba/kernels/min_max_.py b/pandas/core/_numba/kernels/min_max_.py index 3fe57f667a21d..4f237fc1a0559 100644 --- a/pandas/core/_numba/kernels/min_max_.py +++ b/pandas/core/_numba/kernels/min_max_.py @@ -25,8 +25,8 @@ def sliding_min_max( output = np.empty(N, dtype=np.float64) # Use deque once numba supports it # https://github.com/numba/numba/issues/7417 - Q = [] - W = [] + Q: list = [] + W: list = [] for i in range(N): curr_win_size = end[i] - start[i]