diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 9a17b04b23c18..487ef5c226f94 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -294,6 +294,8 @@ Performance improvements - Performance improvement in :meth:`to_datetime` with ``uint`` dtypes (:issue:`42606`) - Performance improvement in :meth:`Series.sparse.to_coo` (:issue:`42880`) - Performance improvement in indexing with a :class:`MultiIndex` indexer on another :class:`MultiIndex` (:issue:43370`) +- Performance improvement in :meth:`GroupBy.quantile` (:issue:`43469`) +- .. --------------------------------------------------------------------------- diff --git a/pandas/_libs/groupby.pyi b/pandas/_libs/groupby.pyi index b363524e4e592..b51cc9f4500c1 100644 --- a/pandas/_libs/groupby.pyi +++ b/pandas/_libs/groupby.pyi @@ -84,11 +84,11 @@ def group_ohlc( min_count: int = ..., ) -> None: ... def group_quantile( - out: np.ndarray, # ndarray[float64_t] + out: np.ndarray, # ndarray[float64_t, ndim=2] values: np.ndarray, # ndarray[numeric, ndim=1] labels: np.ndarray, # ndarray[int64_t] mask: np.ndarray, # ndarray[uint8_t] - q: float, # float64_t + qs: np.ndarray, # const float64_t[:] interpolation: Literal["linear", "lower", "higher", "nearest", "midpoint"], ) -> None: ... def group_last( diff --git a/pandas/_libs/groupby.pyx b/pandas/_libs/groupby.pyx index 40e1049c39588..8ba49e5753bd5 100644 --- a/pandas/_libs/groupby.pyx +++ b/pandas/_libs/groupby.pyx @@ -770,25 +770,25 @@ def group_ohlc(floating[:, ::1] out, @cython.boundscheck(False) @cython.wraparound(False) -def group_quantile(ndarray[float64_t] out, +def group_quantile(ndarray[float64_t, ndim=2] out, ndarray[numeric, ndim=1] values, ndarray[intp_t] labels, ndarray[uint8_t] mask, - float64_t q, + const float64_t[:] qs, str interpolation) -> None: """ Calculate the quantile per group. Parameters ---------- - out : np.ndarray[np.float64] + out : np.ndarray[np.float64, ndim=2] Array of aggregated values that will be written to. values : np.ndarray Array containing the values to apply the function against. labels : ndarray[np.intp] Array containing the unique group labels. - q : float - The quantile value to search for. + qs : ndarray[float64_t] + The quantile values to search for. interpolation : {'linear', 'lower', 'highest', 'nearest', 'midpoint'} Notes @@ -797,17 +797,20 @@ def group_quantile(ndarray[float64_t] out, provided `out` parameter. """ cdef: - Py_ssize_t i, N=len(labels), ngroups, grp_sz, non_na_sz + Py_ssize_t i, N=len(labels), ngroups, grp_sz, non_na_sz, k, nqs Py_ssize_t grp_start=0, idx=0 intp_t lab uint8_t interp - float64_t q_idx, frac, val, next_val + float64_t q_val, q_idx, frac, val, next_val ndarray[int64_t] counts, non_na_counts, sort_arr assert values.shape[0] == N - if not (0 <= q <= 1): - raise ValueError(f"'q' must be between 0 and 1. Got '{q}' instead") + if any(not (0 <= q <= 1) for q in qs): + wrong = [x for x in qs if not (0 <= x <= 1)][0] + raise ValueError( + f"Each 'q' must be between 0 and 1. Got '{wrong}' instead" + ) inter_methods = { 'linear': INTERPOLATION_LINEAR, @@ -818,9 +821,10 @@ def group_quantile(ndarray[float64_t] out, } interp = inter_methods[interpolation] - counts = np.zeros_like(out, dtype=np.int64) - non_na_counts = np.zeros_like(out, dtype=np.int64) - ngroups = len(counts) + nqs = len(qs) + ngroups = len(out) + counts = np.zeros(ngroups, dtype=np.int64) + non_na_counts = np.zeros(ngroups, dtype=np.int64) # First figure out the size of every group with nogil: @@ -850,33 +854,37 @@ def group_quantile(ndarray[float64_t] out, non_na_sz = non_na_counts[i] if non_na_sz == 0: - out[i] = NaN + for k in range(nqs): + out[i, k] = NaN else: - # Calculate where to retrieve the desired value - # Casting to int will intentionally truncate result - idx = grp_start + (q * (non_na_sz - 1)) - - val = values[sort_arr[idx]] - # If requested quantile falls evenly on a particular index - # then write that index's value out. Otherwise interpolate - q_idx = q * (non_na_sz - 1) - frac = q_idx % 1 - - if frac == 0.0 or interp == INTERPOLATION_LOWER: - out[i] = val - else: - next_val = values[sort_arr[idx + 1]] - if interp == INTERPOLATION_LINEAR: - out[i] = val + (next_val - val) * frac - elif interp == INTERPOLATION_HIGHER: - out[i] = next_val - elif interp == INTERPOLATION_MIDPOINT: - out[i] = (val + next_val) / 2.0 - elif interp == INTERPOLATION_NEAREST: - if frac > .5 or (frac == .5 and q > .5): # Always OK? - out[i] = next_val - else: - out[i] = val + for k in range(nqs): + q_val = qs[k] + + # Calculate where to retrieve the desired value + # Casting to int will intentionally truncate result + idx = grp_start + (q_val * (non_na_sz - 1)) + + val = values[sort_arr[idx]] + # If requested quantile falls evenly on a particular index + # then write that index's value out. Otherwise interpolate + q_idx = q_val * (non_na_sz - 1) + frac = q_idx % 1 + + if frac == 0.0 or interp == INTERPOLATION_LOWER: + out[i, k] = val + else: + next_val = values[sort_arr[idx + 1]] + if interp == INTERPOLATION_LINEAR: + out[i, k] = val + (next_val - val) * frac + elif interp == INTERPOLATION_HIGHER: + out[i, k] = next_val + elif interp == INTERPOLATION_MIDPOINT: + out[i, k] = (val + next_val) / 2.0 + elif interp == INTERPOLATION_NEAREST: + if frac > .5 or (frac == .5 and q_val > .5): # Always OK? + out[i, k] = next_val + else: + out[i, k] = val # Increment the index reference in sorted_arr for the next group grp_start += grp_sz diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index 9eea81d1aa152..2923b5b2e1f7c 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -50,6 +50,7 @@ class providing the base-class of operations. RandomState, Scalar, T, + npt, ) from pandas.compat.numpy import function as nv from pandas.errors import AbstractMethodError @@ -59,6 +60,7 @@ class providing the base-class of operations. cache_readonly, doc, ) +from pandas.util._exceptions import find_stack_level from pandas.core.dtypes.common import ( is_bool_dtype, @@ -1102,7 +1104,9 @@ def _indexed_output_to_ndframe( @final def _wrap_aggregated_output( - self, output: Series | DataFrame | Mapping[base.OutputKey, ArrayLike] + self, + output: Series | DataFrame | Mapping[base.OutputKey, ArrayLike], + qs: npt.NDArray[np.float64] | None = None, ): """ Wraps the output of GroupBy aggregations into the expected result. @@ -1131,8 +1135,17 @@ def _wrap_aggregated_output( # enforced in __init__ self._insert_inaxis_grouper_inplace(result) result = result._consolidate() + index = Index(range(self.grouper.ngroups)) + else: - result.index = self.grouper.result_index + index = self.grouper.result_index + + if qs is not None: + # We get here with len(qs) != 1 and not self.as_index + # in test_pass_args_kwargs + index = _insert_quantile_level(index, qs) + + result.index = index if self.axis == 1: # Only relevant for DataFrameGroupBy, no-op for SeriesGroupBy @@ -1142,7 +1155,7 @@ def _wrap_aggregated_output( result.index = self.obj.index.copy() # TODO: Do this more systematically - return self._reindex_output(result) + return self._reindex_output(result, qs=qs) @final def _wrap_transformed_output( @@ -2513,7 +2526,6 @@ def quantile(self, q=0.5, interpolation: str = "linear"): a 2.0 b 3.0 """ - from pandas import concat def pre_processor(vals: ArrayLike) -> tuple[np.ndarray, np.dtype | None]: if is_object_dtype(vals): @@ -2544,7 +2556,7 @@ def pre_processor(vals: ArrayLike) -> tuple[np.ndarray, np.dtype | None]: return out, inference - def post_processor(vals: np.ndarray, inference: type | None) -> np.ndarray: + def post_processor(vals: np.ndarray, inference: np.dtype | None) -> np.ndarray: if inference: # Check for edge case if not ( @@ -2556,63 +2568,71 @@ def post_processor(vals: np.ndarray, inference: type | None) -> np.ndarray: return vals if is_scalar(q): - return self._get_cythonized_result( - libgroupby.group_quantile, - aggregate=True, - numeric_only=False, - needs_values=True, - needs_mask=True, - cython_dtype=np.dtype(np.float64), - pre_processing=pre_processor, - post_processing=post_processor, - q=q, - interpolation=interpolation, - ) - else: - results = [ - self._get_cythonized_result( - libgroupby.group_quantile, - aggregate=True, - needs_values=True, - needs_mask=True, - cython_dtype=np.dtype(np.float64), - pre_processing=pre_processor, - post_processing=post_processor, - q=qi, - interpolation=interpolation, - ) - for qi in q - ] - result = concat(results, axis=self.axis, keys=q) - # fix levels to place quantiles on the inside - # TODO(GH-10710): Ideally, we could write this as - # >>> result.stack(0).loc[pd.IndexSlice[:, ..., q], :] - # but this hits https://github.com/pandas-dev/pandas/issues/10710 - # which doesn't reorder the list-like `q` on the inner level. - order = list(range(1, result.axes[self.axis].nlevels)) + [0] - - # temporarily saves the index names - index_names = np.array(result.axes[self.axis].names) - - # set index names to positions to avoid confusion - result.axes[self.axis].names = np.arange(len(index_names)) - - # place quantiles on the inside - if isinstance(result, Series): - result = result.reorder_levels(order) + res = self.quantile([q], interpolation=interpolation) + nlevels = res.index.nlevels + return res.droplevel(nlevels - 1, axis=0) + + qs = np.array(q, dtype=np.float64) + ids, _, ngroups = self.grouper.group_info + nqs = len(qs) + + func = partial( + libgroupby.group_quantile, labels=ids, qs=qs, interpolation=interpolation + ) + + def blk_func(values: ArrayLike) -> ArrayLike: + mask = isna(values) + vals, inference = pre_processor(values) + + ncols = 1 + if vals.ndim == 2: + ncols = vals.shape[0] + + out = np.empty((ncols, ngroups, nqs), dtype=np.float64) + + if vals.ndim == 1: + func(out[0], values=vals, mask=mask) else: - result = result.reorder_levels(order, axis=self.axis) + for i in range(ncols): + func(out[i], values=vals[i], mask=mask[i]) - # restore the index names in order - result.axes[self.axis].names = index_names[order] + if vals.ndim == 1: + out = out[0].ravel("K") + else: + out = out.reshape(ncols, ngroups * nqs) + return post_processor(out, inference) - # reorder rows to keep things sorted - indices = ( - np.arange(result.shape[self.axis]) - .reshape([len(q), self.ngroups]) - .T.flatten() + obj = self._obj_with_exclusions + is_ser = obj.ndim == 1 + if is_ser: + # i.e. SeriesGroupBy + mgr = obj.to_frame()._mgr + else: + mgr = self._get_data_to_aggregate() + + res_mgr = mgr.grouped_reduce(blk_func, ignore_failures=True) + if len(res_mgr.items) != len(mgr.items): + warnings.warn( + "Dropping invalid columns in " + f"{type(self).__name__}.quantile is deprecated. " + "In a future version, a TypeError will be raised. " + "Before calling .quantile, select only columns which " + "should be valid for the function.", + FutureWarning, + stacklevel=find_stack_level(), ) - return result.take(indices, axis=self.axis) + if len(res_mgr.items) == 0: + # re-call grouped_reduce to get the desired exception message + mgr.grouped_reduce(blk_func, ignore_failures=False) + + if is_ser: + res = obj._constructor_expanddim(res_mgr) + res = res[res.columns[0]] # aka res.squeeze() + res.name = obj.name + else: + res = obj._constructor(res_mgr) + + return self._wrap_aggregated_output(res, qs=qs) @final @Substitution(name="groupby") @@ -3027,6 +3047,7 @@ def blk_func(values: ArrayLike) -> ArrayLike: result = result.reshape((result_sz, ncols)) else: result = result.reshape(-1, 1) + func = partial(base_func, out=result) inferences = None @@ -3118,7 +3139,7 @@ def blk_func(values: ArrayLike) -> ArrayLike: f"Before calling .{howstr}, select only columns which " "should be valid for the function.", FutureWarning, - stacklevel=3, + stacklevel=find_stack_level(), ) continue @@ -3288,7 +3309,10 @@ def tail(self, n=5): @final def _reindex_output( - self, output: OutputFrameOrSeries, fill_value: Scalar = np.NaN + self, + output: OutputFrameOrSeries, + fill_value: Scalar = np.NaN, + qs: npt.NDArray[np.float64] | None = None, ) -> OutputFrameOrSeries: """ If we have categorical groupers, then we might want to make sure that @@ -3307,6 +3331,8 @@ def _reindex_output( Object resulting from grouping and applying an operation. fill_value : scalar, default np.NaN Value to use for unobserved categories if self.observed is False. + qs : np.ndarray[float64] or None, default None + quantile values, only relevant for quantile. Returns ------- @@ -3330,9 +3356,11 @@ def _reindex_output( return output levels_list = [ping.group_index for ping in groupings] - index, _ = MultiIndex.from_product( - levels_list, names=self.grouper.names - ).sortlevel() + names = self.grouper.names + if qs is not None: + levels_list.append(qs) + names = names + [None] + index, _ = MultiIndex.from_product(levels_list, names=names).sortlevel() if self.as_index: d = { @@ -3547,3 +3575,31 @@ def get_groupby( mutated=mutated, dropna=dropna, ) + + +def _insert_quantile_level(idx: Index, qs: npt.NDArray[np.float64]) -> MultiIndex: + """ + Insert the sequence 'qs' of quantiles as the inner-most level of a MultiIndex. + + The quantile level in the MultiIndex is a repeated copy of 'qs'. + + Parameters + ---------- + idx : Index + qs : np.ndarray[float64] + + Returns + ------- + MultiIndex + """ + nqs = len(qs) + + if idx._is_multi: + idx = cast(MultiIndex, idx) + lev_codes, lev = Index(qs).factorize() + levels = list(idx.levels) + [lev] + codes = [np.repeat(x, nqs) for x in idx.codes] + [np.tile(lev_codes, len(idx))] + mi = MultiIndex(levels=levels, codes=codes, names=idx.names + [None]) + else: + mi = MultiIndex.from_product([idx, qs]) + return mi diff --git a/pandas/tests/groupby/test_groupby.py b/pandas/tests/groupby/test_groupby.py index f26f18c9c20a0..8ca10ba0eea32 100644 --- a/pandas/tests/groupby/test_groupby.py +++ b/pandas/tests/groupby/test_groupby.py @@ -243,6 +243,10 @@ def f(x, q=None, axis=0): tm.assert_frame_equal(apply_result, expected, check_names=False) tm.assert_frame_equal(agg_result, expected) + apply_result = df_grouped.apply(DataFrame.quantile, [0.4, 0.8]) + expected_seq = df_grouped.quantile([0.4, 0.8]) + tm.assert_frame_equal(apply_result, expected_seq, check_names=False) + agg_result = df_grouped.agg(f, q=80) apply_result = df_grouped.apply(DataFrame.quantile, q=0.8) tm.assert_frame_equal(agg_result, expected) diff --git a/pandas/tests/groupby/test_quantile.py b/pandas/tests/groupby/test_quantile.py index 83d6c20bcac24..bcb2abeed75e4 100644 --- a/pandas/tests/groupby/test_quantile.py +++ b/pandas/tests/groupby/test_quantile.py @@ -240,8 +240,7 @@ def test_groupby_quantile_nullable_array(values, q): def test_groupby_quantile_skips_invalid_dtype(q): df = DataFrame({"a": [1], "b": [2.0], "c": ["x"]}) - warn = None if isinstance(q, list) else FutureWarning - with tm.assert_produces_warning(warn, match="Dropping invalid columns"): + with tm.assert_produces_warning(FutureWarning, match="Dropping invalid columns"): result = df.groupby("a").quantile(q) expected = df.groupby("a")[["b"]].quantile(q)