Skip to content

CoW: Delay copy when setting Series into DataFrame #51698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
if TYPE_CHECKING:
import datetime

from pandas._libs.internals import BlockValuesRefs
from pandas._typing import (
AggFuncType,
AlignJoin,
Expand Down Expand Up @@ -3944,11 +3945,11 @@ def isetitem(self, loc, value) -> None:
)

for i, idx in enumerate(loc):
arraylike = self._sanitize_column(value.iloc[:, i])
arraylike, _ = self._sanitize_column(value.iloc[:, i])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstraction-wise ive been thinking of refs as being handled at the Manager/Block level. is this wrong? is there a viable way to keep this at that level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if we don't want to move sanitise column to the same level. We have to keep track of the refs when we extract the array from values to set

self._iset_item_mgr(idx, arraylike, inplace=False)
return

arraylike = self._sanitize_column(value)
arraylike, _ = self._sanitize_column(value)
self._iset_item_mgr(loc, arraylike, inplace=False)

def __setitem__(self, key, value):
Expand Down Expand Up @@ -4117,7 +4118,7 @@ def _set_item_frame_value(self, key, value: DataFrame) -> None:
return

# now align rows
arraylike = _reindex_for_setitem(value, self.index)
arraylike, _ = _reindex_for_setitem(value, self.index)
self._set_item_mgr(key, arraylike)
return

Expand All @@ -4130,20 +4131,26 @@ def _set_item_frame_value(self, key, value: DataFrame) -> None:
self[key] = value[value.columns[0]]

def _iset_item_mgr(
self, loc: int | slice | np.ndarray, value, inplace: bool = False
self,
loc: int | slice | np.ndarray,
value,
inplace: bool = False,
refs: BlockValuesRefs | None = None,
) -> None:
# when called from _set_item_mgr loc can be anything returned from get_loc
self._mgr.iset(loc, value, inplace=inplace)
self._mgr.iset(loc, value, inplace=inplace, refs=refs)
self._clear_item_cache()

def _set_item_mgr(self, key, value: ArrayLike) -> None:
def _set_item_mgr(
self, key, value: ArrayLike, refs: BlockValuesRefs | None = None
) -> None:
try:
loc = self._info_axis.get_loc(key)
except KeyError:
# This item wasn't present, just insert at end
self._mgr.insert(len(self._info_axis), key, value)
self._mgr.insert(len(self._info_axis), key, value, refs)
else:
self._iset_item_mgr(loc, value)
self._iset_item_mgr(loc, value, refs=refs)

# check if we are modifying a copy
# try to set first as we want an invalid
Expand Down Expand Up @@ -4173,7 +4180,7 @@ def _set_item(self, key, value) -> None:
Series/TimeSeries will be conformed to the DataFrames index to
ensure homogeneity.
"""
value = self._sanitize_column(value)
value, refs = self._sanitize_column(value, using_cow=using_copy_on_write())

if (
key in self.columns
Expand All @@ -4185,8 +4192,9 @@ def _set_item(self, key, value) -> None:
existing_piece = self[key]
if isinstance(existing_piece, DataFrame):
value = np.tile(value, (len(existing_piece.columns), 1)).T
refs = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this would still be a view in case we are only setting a single column (i.e. len(existing_piece.columns) is equal to 1). But it seems that also in that case np.tile consistently returns a copy, so that should be fine.
(it might still be an opportunity for an optimization, although for a corner case)


self._set_item_mgr(key, value)
self._set_item_mgr(key, value, refs)

def _set_value(
self, index: IndexLabel, col, value: Scalar, takeable: bool = False
Expand Down Expand Up @@ -4813,7 +4821,7 @@ def insert(
elif isinstance(value, DataFrame):
value = value.iloc[:, 0]

value = self._sanitize_column(value)
value, _ = self._sanitize_column(value)
self._mgr.insert(loc, column, value)

def assign(self, **kwargs) -> DataFrame:
Expand Down Expand Up @@ -4884,7 +4892,9 @@ def assign(self, **kwargs) -> DataFrame:
data[k] = com.apply_if_callable(v, data)
return data

def _sanitize_column(self, value) -> ArrayLike:
def _sanitize_column(
self, value, using_cow: bool = False
) -> tuple[ArrayLike, BlockValuesRefs | None]:
"""
Ensures new columns (which go into the BlockManager as new blocks) are
always copied and converted into an array.
Expand All @@ -4902,11 +4912,13 @@ def _sanitize_column(self, value) -> ArrayLike:
# Using a DataFrame would mean coercing values to one dtype
assert not isinstance(value, DataFrame)
if is_dict_like(value):
return _reindex_for_setitem(Series(value), self.index)
if not isinstance(value, Series):
value = Series(value)
return _reindex_for_setitem(value, self.index, using_cow=using_cow)

if is_list_like(value):
com.require_length_match(value, self.index)
return sanitize_array(value, self.index, copy=True, allow_2d=True)
return sanitize_array(value, self.index, copy=True, allow_2d=True), None

@property
def _series(self):
Expand Down Expand Up @@ -12031,11 +12043,15 @@ def _from_nested_dict(data) -> collections.defaultdict:
return new_data


def _reindex_for_setitem(value: DataFrame | Series, index: Index) -> ArrayLike:
def _reindex_for_setitem(
value: DataFrame | Series, index: Index, using_cow: bool = False
) -> tuple[ArrayLike, BlockValuesRefs | None]:
# reindex if necessary

if value.index.equals(index) or not len(index):
return value._values.copy()
if using_cow and isinstance(value, Series):
return value._values, value._references
return value._values.copy(), None

# GH#4107
try:
Expand All @@ -12049,4 +12065,4 @@ def _reindex_for_setitem(value: DataFrame | Series, index: Index) -> ArrayLike:
raise TypeError(
"incompatible index of inserted column with frame index"
) from err
return reindexed_value
return reindexed_value, None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return here the result of value.reindex(index)._references, since in theory reindex can now return a view in certain corner cases?

8 changes: 6 additions & 2 deletions pandas/core/internals/array_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,11 @@ def column_arrays(self) -> list[ArrayLike]:
return [np.asarray(arr) for arr in self.arrays]

def iset(
self, loc: int | slice | np.ndarray, value: ArrayLike, inplace: bool = False
self,
loc: int | slice | np.ndarray,
value: ArrayLike,
inplace: bool = False,
refs=None,
) -> None:
"""
Set new column(s).
Expand Down Expand Up @@ -876,7 +880,7 @@ def column_setitem(
# update existing ArrayManager in-place
self.arrays[loc] = new_mgr.arrays[0]

def insert(self, loc: int, item: Hashable, value: ArrayLike) -> None:
def insert(self, loc: int, item: Hashable, value: ArrayLike, refs=None) -> None:
"""
Insert item at selected position.

Expand Down
34 changes: 26 additions & 8 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
internals as libinternals,
lib,
)
from pandas._libs.internals import BlockPlacement
from pandas._libs.internals import (
BlockPlacement,
BlockValuesRefs,
)
from pandas.errors import PerformanceWarning
from pandas.util._decorators import cache_readonly
from pandas.util._exceptions import find_stack_level
Expand Down Expand Up @@ -1125,7 +1128,11 @@ def column_arrays(self) -> list[np.ndarray]:
return result # type: ignore[return-value]

def iset(
self, loc: int | slice | np.ndarray, value: ArrayLike, inplace: bool = False
self,
loc: int | slice | np.ndarray,
value: ArrayLike,
inplace: bool = False,
refs: BlockValuesRefs | None = None,
):
"""
Set new item in-place. Does not consolidate. Adds new Block if not
Expand Down Expand Up @@ -1166,6 +1173,7 @@ def iset(
inplace=inplace,
blkno=blkno,
blk=blk,
refs=refs,
)

# error: Incompatible types in assignment (expression has type
Expand Down Expand Up @@ -1211,7 +1219,7 @@ def value_getitem(placement):
continue
else:
# Defer setting the new values to enable consolidation
self._iset_split_block(blkno_l, blk_locs)
self._iset_split_block(blkno_l, blk_locs, refs=refs)

if len(removed_blknos):
# Remove blocks & update blknos accordingly
Expand Down Expand Up @@ -1241,6 +1249,7 @@ def value_getitem(placement):
new_block_2d(
values=value,
placement=BlockPlacement(slice(mgr_loc, mgr_loc + 1)),
refs=refs,
)
for mgr_loc in unfit_idxr
)
Expand All @@ -1256,6 +1265,7 @@ def value_getitem(placement):
new_block_2d(
values=value_getitem(unfit_val_items),
placement=BlockPlacement(unfit_idxr),
refs=refs,
)
)

Expand All @@ -1272,6 +1282,7 @@ def _iset_split_block(
blkno_l: int,
blk_locs: np.ndarray | list[int],
value: ArrayLike | None = None,
refs: BlockValuesRefs | None = None,
) -> None:
"""Removes columns from a block by splitting the block.

Expand All @@ -1284,6 +1295,7 @@ def _iset_split_block(
blkno_l: The block number to operate on, relevant for updating the manager
blk_locs: The locations of our block that should be deleted.
value: The value to set as a replacement.
refs: The reference tracking object of the value to set.
"""
blk = self.blocks[blkno_l]

Expand All @@ -1293,7 +1305,7 @@ def _iset_split_block(
nbs_tup = tuple(blk.delete(blk_locs))
if value is not None:
locs = blk.mgr_locs.as_array[blk_locs]
first_nb = new_block_2d(value, BlockPlacement(locs))
first_nb = new_block_2d(value, BlockPlacement(locs), refs=refs)
else:
first_nb = nbs_tup[0]
nbs_tup = tuple(nbs_tup[1:])
Expand All @@ -1315,7 +1327,13 @@ def _iset_split_block(
self._blknos[nb.mgr_locs.indexer] = i + nr_blocks

def _iset_single(
self, loc: int, value: ArrayLike, inplace: bool, blkno: int, blk: Block
self,
loc: int,
value: ArrayLike,
inplace: bool,
blkno: int,
blk: Block,
refs: BlockValuesRefs | None = None,
) -> None:
"""
Fastpath for iset when we are only setting a single position and
Expand All @@ -1335,7 +1353,7 @@ def _iset_single(
blk.set_inplace(slice(iloc, iloc + 1), value, copy=copy)
return

nb = new_block_2d(value, placement=blk._mgr_locs)
nb = new_block_2d(value, placement=blk._mgr_locs, refs=refs)
old_blocks = self.blocks
new_blocks = old_blocks[:blkno] + (nb,) + old_blocks[blkno + 1 :]
self.blocks = new_blocks
Expand Down Expand Up @@ -1373,7 +1391,7 @@ def column_setitem(
new_mgr = col_mgr.setitem((idx,), value)
self.iset(loc, new_mgr._block.values, inplace=True)

def insert(self, loc: int, item: Hashable, value: ArrayLike) -> None:
def insert(self, loc: int, item: Hashable, value: ArrayLike, refs=None) -> None:
"""
Insert item at selected position.

Expand All @@ -1397,7 +1415,7 @@ def insert(self, loc: int, item: Hashable, value: ArrayLike) -> None:

bp = BlockPlacement(slice(loc, loc + 1))
# TODO(CoW) do we always "own" the passed `value`?
block = new_block_2d(values=value, placement=bp)
block = new_block_2d(values=value, placement=bp, refs=refs)

if not len(self.blocks):
# Fastpath
Expand Down
7 changes: 5 additions & 2 deletions pandas/tests/copy_view/test_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ def test_column_as_series_no_item_cache(
# TODO add tests for other indexing methods on the Series


def test_dataframe_add_column_from_series(backend):
def test_dataframe_add_column_from_series(backend, using_copy_on_write):
# Case: adding a new column to a DataFrame from an existing column/series
# -> always already takes a copy on assignment
# (no change in behaviour here)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can update the TODO on the line below here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Expand All @@ -987,7 +987,10 @@ def test_dataframe_add_column_from_series(backend):

s = Series([10, 11, 12])
df["new"] = s
assert not np.shares_memory(get_array(df, "new"), s.values)
if using_copy_on_write:
assert np.shares_memory(get_array(df, "new"), get_array(s))
else:
assert not np.shares_memory(get_array(df, "new"), get_array(s))

# editing series -> doesn't modify column in frame
s[0] = 0
Expand Down
53 changes: 47 additions & 6 deletions pandas/tests/copy_view/test_setitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from pandas import (
DataFrame,
Index,
MultiIndex,
RangeIndex,
Series,
)
import pandas._testing as tm
from pandas.tests.copy_view.util import get_array

# -----------------------------------------------------------------------------
# Copy/view behaviour for the values that are set in a DataFrame
Expand Down Expand Up @@ -34,9 +36,7 @@ def test_set_column_with_series(using_copy_on_write):
df["c"] = ser

if using_copy_on_write:
# TODO(CoW) with CoW we can delay the copy
# assert np.shares_memory(df["c"].values, ser.values)
assert not np.shares_memory(df["c"].values, ser.values)
assert np.shares_memory(df["c"].values, ser.values)
else:
# the series data is copied
assert not np.shares_memory(df["c"].values, ser.values)
Expand Down Expand Up @@ -79,13 +79,54 @@ def test_set_columns_with_dataframe(using_copy_on_write):
df[["c", "d"]] = df2

if using_copy_on_write:
# TODO(CoW) with CoW we can delay the copy
# assert np.shares_memory(df["c"].values, df2["c"].values)
assert not np.shares_memory(df["c"].values, df2["c"].values)
assert np.shares_memory(df["c"].values, df2["c"].values)
else:
# the data is copied
assert not np.shares_memory(df["c"].values, df2["c"].values)

# and modifying the set DataFrame does not modify the original DataFrame
df2.iloc[0, 0] = 0
tm.assert_series_equal(df["c"], Series([7, 8, 9], name="c"))


def test_setitem_series_no_copy(using_copy_on_write):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this test different from the other ones above that you edited?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed that, but there is actually a difference, the existing test is a bit buggy, I will copy this one over and modify the other one as appropriate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no I keep it here and modify the other test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the other test: As soon as you update s, a copy is triggered and updating the df afterwards never has an effect if the initial copy worked because they don't share data anymore. The df-update is now handled by the new test.

Regarding the different cases: overwriting an existing column takes a different code path than adding a new one, so we have to test both

df = DataFrame({"a": [1, 2, 3]})
rhs = Series([4, 5, 6])
rhs_orig = rhs.copy()
df["b"] = rhs
if using_copy_on_write:
assert np.shares_memory(get_array(rhs), get_array(df, "b"))

df.iloc[0, 1] = 100
tm.assert_series_equal(rhs, rhs_orig)

df["a"] = rhs
if using_copy_on_write:
assert np.shares_memory(get_array(rhs), get_array(df, "a"))

df.iloc[0, 0] = 100
tm.assert_series_equal(rhs, rhs_orig)


def test_setitem_series_no_copy_split_block(using_copy_on_write):
df = DataFrame({"a": [1, 2, 3], "b": 1})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking about the other issue to no longer use copy=True as default for dict (#52967), that such test cases as the above could start to no longer test what it is actually meant to test ..
I know that here it's not a dict of Series, so this example might not change, but writing it as DataFrame(np.array(..), columns=["a", "b"]) might be more robust?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or set copy true explicitly?

rhs = Series([4, 5, 6])
rhs_orig = rhs.copy()
df["b"] = rhs
if using_copy_on_write:
assert np.shares_memory(get_array(rhs), get_array(df, "b"))

df.iloc[0, 1] = 100
tm.assert_series_equal(rhs, rhs_orig)


def test_setitem_series_column_midx_broadcasting(using_copy_on_write):
df = DataFrame(
[[1, 2, 3], [3, 4, 5]],
columns=MultiIndex.from_arrays([["a", "a", "b"], [1, 2, 3]]),
)
rhs = Series([10, 11])
df["a"] = rhs
assert not np.shares_memory(get_array(rhs), df._get_column_array(0))
if using_copy_on_write:
assert df._mgr._has_no_reference(0)