Skip to content

Commit 02b0a86

Browse files
authored
REF: make window _apply_blockwise actually blockwise (#35861)
1 parent 0fde208 commit 02b0a86

File tree

1 file changed

+46
-25
lines changed

1 file changed

+46
-25
lines changed

pandas/core/window/rolling.py

+46-25
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@
6666
from pandas.core.window.numba_ import generate_numba_apply_func
6767

6868
if TYPE_CHECKING:
69-
from pandas import Series
69+
from pandas import DataFrame, Series
70+
from pandas.core.internals import Block # noqa:F401
7071

7172

7273
def calculate_center_offset(window) -> int:
@@ -418,35 +419,40 @@ def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeriesUnion:
418419
for i in skipped:
419420
exclude.extend(orig_blocks[i].columns)
420421

421-
kept_blocks = [blk for i, blk in enumerate(orig_blocks) if i not in skipped]
422-
423-
final = []
424-
for result, block in zip(results, kept_blocks):
425-
426-
result = type(obj)(result, index=obj.index, columns=block.columns)
427-
final.append(result)
428-
429-
exclude = exclude or []
430422
columns = [c for c in self._selected_obj.columns if c not in exclude]
431-
if not columns and not len(final) and exclude:
423+
if not columns and not len(results) and exclude:
432424
raise DataError("No numeric types to aggregate")
433-
elif not len(final):
425+
elif not len(results):
434426
return obj.astype("float64")
435427

436-
df = concat(final, axis=1).reindex(columns=columns, copy=False)
428+
df = concat(results, axis=1).reindex(columns=columns, copy=False)
429+
self._insert_on_column(df, obj)
430+
return df
437431

432+
def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"):
438433
# if we have an 'on' column we want to put it back into
439434
# the results in the same location
435+
from pandas import Series
436+
440437
if self.on is not None and not self._on.equals(obj.index):
441438
name = self._on.name
442439
extra_col = Series(self._on, index=obj.index, name=name)
443-
if name not in df.columns and name not in df.index.names:
444-
new_loc = len(df.columns)
445-
df.insert(new_loc, name, extra_col)
446-
elif name in df.columns:
440+
if name in result.columns:
447441
# TODO: sure we want to overwrite results?
448-
df[name] = extra_col
449-
return df
442+
result[name] = extra_col
443+
elif name in result.index.names:
444+
pass
445+
elif name in self._selected_obj.columns:
446+
# insert in the same location as we had in _selected_obj
447+
old_cols = self._selected_obj.columns
448+
new_cols = result.columns
449+
old_loc = old_cols.get_loc(name)
450+
overlap = new_cols.intersection(old_cols[:old_loc])
451+
new_loc = len(overlap)
452+
result.insert(new_loc, name, extra_col)
453+
else:
454+
# insert at the end
455+
result[name] = extra_col
450456

451457
def _center_window(self, result, window) -> np.ndarray:
452458
"""
@@ -530,21 +536,36 @@ def _apply_blockwise(
530536
# This isn't quite blockwise, since `blocks` is actually a collection
531537
# of homogenenous DataFrames.
532538
blocks, obj = self._create_blocks(self._selected_obj)
539+
mgr = obj._mgr
540+
541+
def hfunc(bvalues: ArrayLike) -> ArrayLike:
542+
# TODO(EA2D): getattr unnecessary with 2D EAs
543+
values = self._prep_values(getattr(bvalues, "T", bvalues))
544+
res_values = homogeneous_func(values)
545+
return getattr(res_values, "T", res_values)
533546

534547
skipped: List[int] = []
535-
results: List[ArrayLike] = []
536-
for i, b in enumerate(blocks):
548+
res_blocks: List["Block"] = []
549+
for i, blk in enumerate(mgr.blocks):
537550
try:
538-
values = self._prep_values(b.values)
551+
nbs = blk.apply(hfunc)
539552

540553
except (TypeError, NotImplementedError):
541554
skipped.append(i)
542555
continue
543556

544-
result = homogeneous_func(values)
545-
results.append(result)
557+
res_blocks.extend(nbs)
558+
559+
if not len(res_blocks) and skipped:
560+
raise DataError("No numeric types to aggregate")
561+
elif not len(res_blocks):
562+
return obj.astype("float64")
546563

547-
return self._wrap_results(results, obj, skipped)
564+
new_cols = mgr.reset_dropped_locs(res_blocks, skipped)
565+
new_mgr = type(mgr).from_blocks(res_blocks, [new_cols, obj.index])
566+
out = obj._constructor(new_mgr)
567+
self._insert_on_column(out, obj)
568+
return out
548569

549570
def _apply(
550571
self,

0 commit comments

Comments
 (0)