diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index f7e81f41b8675..a70247d9f7f9c 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -66,7 +66,8 @@ from pandas.core.window.numba_ import generate_numba_apply_func if TYPE_CHECKING: - from pandas import Series + from pandas import DataFrame, Series + from pandas.core.internals import Block # noqa:F401 def calculate_center_offset(window) -> int: @@ -418,35 +419,40 @@ def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeriesUnion: for i in skipped: exclude.extend(orig_blocks[i].columns) - kept_blocks = [blk for i, blk in enumerate(orig_blocks) if i not in skipped] - - final = [] - for result, block in zip(results, kept_blocks): - - result = type(obj)(result, index=obj.index, columns=block.columns) - final.append(result) - - exclude = exclude or [] columns = [c for c in self._selected_obj.columns if c not in exclude] - if not columns and not len(final) and exclude: + if not columns and not len(results) and exclude: raise DataError("No numeric types to aggregate") - elif not len(final): + elif not len(results): return obj.astype("float64") - df = concat(final, axis=1).reindex(columns=columns, copy=False) + df = concat(results, axis=1).reindex(columns=columns, copy=False) + self._insert_on_column(df, obj) + return df + def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"): # if we have an 'on' column we want to put it back into # the results in the same location + from pandas import Series + if self.on is not None and not self._on.equals(obj.index): name = self._on.name extra_col = Series(self._on, index=obj.index, name=name) - if name not in df.columns and name not in df.index.names: - new_loc = len(df.columns) - df.insert(new_loc, name, extra_col) - elif name in df.columns: + if name in result.columns: # TODO: sure we want to overwrite results? - df[name] = extra_col - return df + result[name] = extra_col + elif name in result.index.names: + pass + elif name in self._selected_obj.columns: + # insert in the same location as we had in _selected_obj + old_cols = self._selected_obj.columns + new_cols = result.columns + old_loc = old_cols.get_loc(name) + overlap = new_cols.intersection(old_cols[:old_loc]) + new_loc = len(overlap) + result.insert(new_loc, name, extra_col) + else: + # insert at the end + result[name] = extra_col def _center_window(self, result, window) -> np.ndarray: """ @@ -530,21 +536,36 @@ def _apply_blockwise( # This isn't quite blockwise, since `blocks` is actually a collection # of homogenenous DataFrames. blocks, obj = self._create_blocks(self._selected_obj) + mgr = obj._mgr + + def hfunc(bvalues: ArrayLike) -> ArrayLike: + # TODO(EA2D): getattr unnecessary with 2D EAs + values = self._prep_values(getattr(bvalues, "T", bvalues)) + res_values = homogeneous_func(values) + return getattr(res_values, "T", res_values) skipped: List[int] = [] - results: List[ArrayLike] = [] - for i, b in enumerate(blocks): + res_blocks: List["Block"] = [] + for i, blk in enumerate(mgr.blocks): try: - values = self._prep_values(b.values) + nbs = blk.apply(hfunc) except (TypeError, NotImplementedError): skipped.append(i) continue - result = homogeneous_func(values) - results.append(result) + res_blocks.extend(nbs) + + if not len(res_blocks) and skipped: + raise DataError("No numeric types to aggregate") + elif not len(res_blocks): + return obj.astype("float64") - return self._wrap_results(results, obj, skipped) + new_cols = mgr.reset_dropped_locs(res_blocks, skipped) + new_mgr = type(mgr).from_blocks(res_blocks, [new_cols, obj.index]) + out = obj._constructor(new_mgr) + self._insert_on_column(out, obj) + return out def _apply( self,