Skip to content

REF: make window _apply_blockwise actually blockwise #35861

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 24, 2020
71 changes: 46 additions & 25 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
from pandas.core.window.numba_ import generate_numba_apply_func

if TYPE_CHECKING:
from pandas import Series
from pandas import DataFrame, Series
from pandas.core.internals import Block # noqa:F401


def calculate_center_offset(window) -> int:
Expand Down Expand Up @@ -418,35 +419,40 @@ def _wrap_results(self, results, obj, skipped: List[int]) -> FrameOrSeriesUnion:
for i in skipped:
exclude.extend(orig_blocks[i].columns)

kept_blocks = [blk for i, blk in enumerate(orig_blocks) if i not in skipped]

final = []
for result, block in zip(results, kept_blocks):

result = type(obj)(result, index=obj.index, columns=block.columns)
final.append(result)

exclude = exclude or []
columns = [c for c in self._selected_obj.columns if c not in exclude]
if not columns and not len(final) and exclude:
if not columns and not len(results) and exclude:
raise DataError("No numeric types to aggregate")
elif not len(final):
elif not len(results):
return obj.astype("float64")

df = concat(final, axis=1).reindex(columns=columns, copy=False)
df = concat(results, axis=1).reindex(columns=columns, copy=False)
self._insert_on_column(df, obj)
return df

def _insert_on_column(self, result: "DataFrame", obj: "DataFrame"):
# if we have an 'on' column we want to put it back into
# the results in the same location
from pandas import Series

if self.on is not None and not self._on.equals(obj.index):
name = self._on.name
extra_col = Series(self._on, index=obj.index, name=name)
if name not in df.columns and name not in df.index.names:
new_loc = len(df.columns)
df.insert(new_loc, name, extra_col)
elif name in df.columns:
if name in result.columns:
# TODO: sure we want to overwrite results?
df[name] = extra_col
return df
result[name] = extra_col
elif name in result.index.names:
pass
elif name in self._selected_obj.columns:
# insert in the same location as we had in _selected_obj
old_cols = self._selected_obj.columns
new_cols = result.columns
old_loc = old_cols.get_loc(name)
overlap = new_cols.intersection(old_cols[:old_loc])
new_loc = len(overlap)
result.insert(new_loc, name, extra_col)
else:
# insert at the end
result[name] = extra_col

def _center_window(self, result, window) -> np.ndarray:
"""
Expand Down Expand Up @@ -530,21 +536,36 @@ def _apply_blockwise(
# This isn't quite blockwise, since `blocks` is actually a collection
# of homogenenous DataFrames.
blocks, obj = self._create_blocks(self._selected_obj)
mgr = obj._mgr

def hfunc(bvalues: ArrayLike) -> ArrayLike:
# TODO(EA2D): getattr unnecessary with 2D EAs
values = self._prep_values(getattr(bvalues, "T", bvalues))
res_values = homogeneous_func(values)
return getattr(res_values, "T", res_values)

skipped: List[int] = []
results: List[ArrayLike] = []
for i, b in enumerate(blocks):
res_blocks: List["Block"] = []
for i, blk in enumerate(mgr.blocks):
try:
values = self._prep_values(b.values)
nbs = blk.apply(hfunc)

except (TypeError, NotImplementedError):
skipped.append(i)
continue

result = homogeneous_func(values)
results.append(result)
res_blocks.extend(nbs)

if not len(res_blocks) and skipped:
raise DataError("No numeric types to aggregate")
elif not len(res_blocks):
return obj.astype("float64")

return self._wrap_results(results, obj, skipped)
new_cols = mgr.reset_dropped_locs(res_blocks, skipped)
new_mgr = type(mgr).from_blocks(res_blocks, [new_cols, obj.index])
out = obj._constructor(new_mgr)
self._insert_on_column(out, obj)
return out

def _apply(
self,
Expand Down