Skip to content

ENH: Add lazy copy to concat and round #50501

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions pandas/core/internals/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Sequence,
cast,
)
import weakref

import numpy as np

Expand Down Expand Up @@ -61,7 +62,10 @@
ensure_block_shape,
new_block_2d,
)
from pandas.core.internals.managers import BlockManager
from pandas.core.internals.managers import (
BlockManager,
using_copy_on_write,
)

if TYPE_CHECKING:
from pandas import Index
Expand Down Expand Up @@ -267,6 +271,8 @@ def _concat_managers_axis0(

offset = 0
blocks = []
refs: list[weakref.ref | None] = []
parents: list = []
for i, mgr in enumerate(mgrs):
# If we already reindexed, then we definitely don't need another copy
made_copy = had_reindexers[i]
Expand All @@ -283,8 +289,16 @@ def _concat_managers_axis0(
nb._mgr_locs = nb._mgr_locs.add(offset)
blocks.append(nb)

if not made_copy and not copy and using_copy_on_write():
refs.extend([weakref.ref(blk) for blk in mgr.blocks])
parents.append(mgr)

offset += len(mgr.items)
return BlockManager(tuple(blocks), axes)

result_parents = parents if parents else None
result_ref = refs if refs else None
result = BlockManager(tuple(blocks), axes, parent=result_parents, refs=result_ref)
return result


def _maybe_reindex_columns_na_proxy(
Expand Down
27 changes: 24 additions & 3 deletions pandas/core/reshape/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
cast,
overload,
)
import weakref

import numpy as np

from pandas._config import get_option
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from pandas._config import get_option
from pandas._config import get_option, using_copy_on_write

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah this is annoying, I thought that ci should fail, but makes sense that the import continues to work through there...


from pandas._typing import (
Axis,
AxisInt,
Expand Down Expand Up @@ -47,6 +50,8 @@
get_unanimous_names,
)
from pandas.core.internals import concatenate_managers
from pandas.core.internals.construction import dict_to_mgr
from pandas.core.internals.managers import using_copy_on_write
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from pandas.core.internals.managers import using_copy_on_write


if TYPE_CHECKING:
from pandas import (
Expand Down Expand Up @@ -155,7 +160,7 @@ def concat(
names=None,
verify_integrity: bool = False,
sort: bool = False,
copy: bool = True,
copy: bool | None = None,
) -> DataFrame | Series:
"""
Concatenate pandas objects along a particular axis.
Expand Down Expand Up @@ -363,6 +368,12 @@ def concat(
0 1 2
1 3 4
"""
if copy is None:
if using_copy_on_write():
copy = False
else:
copy = True

op = _Concatenator(
objs,
axis=axis,
Expand Down Expand Up @@ -584,7 +595,17 @@ def get_result(self):
cons = sample._constructor_expanddim

index, columns = self.new_axes
df = cons(data, index=index, copy=self.copy)
mgr = dict_to_mgr(
data,
index,
None,
copy=self.copy,
typ=get_option("mode.data_manager"),
)
if using_copy_on_write() and not self.copy:
mgr.parent = [obj._mgr for obj in self.objs]
mgr.refs = [weakref.ref(obj._mgr.blocks[0]) for obj in self.objs]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would very much prefer that we can keep this ref handling only to the internals ..
I was thinking, in theory it should be possible to let dict_to_mgr do this? (that might actually also help for the DataFrame(dict of Series) constructor case)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(that might be quite a bit more complicated though (since that function needs to handle more than just this case), so we could also leave that for a follow-up)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree in general, but we have to be very careful.

Would you be ok with pushing this to the internals after finishing the constructors or when doing the constructors? We don't have many tests in the constructor area yet and I don't want to break anything without noticing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly because this is tricky, I think it would be good to have a general rule that only the internals need to care about this, and that outside of the internals all the APIs we use take care of that for you.

But yes, let's do this when tackling those functions for the constructors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pushing this to the internals after finishing the constructors or when doing the constructors?

I am doing this in #50777. Will update that PR to also refactor the above once this is merged

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! I'll fix the typing stuff and then merge so that we can continue with the methods blocked by concat

df = cons(mgr, copy=False)
df.columns = columns
return df.__finalize__(self, method="concat")

Expand All @@ -611,7 +632,7 @@ def get_result(self):
new_data = concatenate_managers(
mgrs_indexers, self.new_axes, concat_axis=self.bm_axis, copy=self.copy
)
if not self.copy:
if not self.copy and not using_copy_on_write():
new_data._consolidate_inplace()

cons = sample._constructor
Expand Down
133 changes: 133 additions & 0 deletions pandas/tests/copy_view/test_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import numpy as np

from pandas import (
DataFrame,
Series,
concat,
)
import pandas._testing as tm
from pandas.tests.copy_view.util import get_array


def test_concat_frames(using_copy_on_write):
df = DataFrame({"b": ["a"] * 3})
df2 = DataFrame({"a": ["a"] * 3})
df_orig = df.copy()
result = concat([df, df2], axis=1)

if using_copy_on_write:
assert np.shares_memory(get_array(result, "b"), get_array(df, "b"))
assert np.shares_memory(get_array(result, "a"), get_array(df2, "a"))
else:
assert not np.shares_memory(get_array(result, "b"), get_array(df, "b"))
assert not np.shares_memory(get_array(result, "a"), get_array(df2, "a"))

result.iloc[0, 0] = "d"
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "b"), get_array(df, "b"))
assert np.shares_memory(get_array(result, "a"), get_array(df2, "a"))

result.iloc[0, 1] = "d"
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "a"), get_array(df2, "a"))
tm.assert_frame_equal(df, df_orig)


def test_concat_frames_updating_input(using_copy_on_write):
df = DataFrame({"b": ["a"] * 3})
df2 = DataFrame({"a": ["a"] * 3})
result = concat([df, df2], axis=1)

if using_copy_on_write:
assert np.shares_memory(get_array(result, "b"), get_array(df, "b"))
assert np.shares_memory(get_array(result, "a"), get_array(df2, "a"))
else:
assert not np.shares_memory(get_array(result, "b"), get_array(df, "b"))
assert not np.shares_memory(get_array(result, "a"), get_array(df2, "a"))

expected = result.copy()
df.iloc[0, 0] = "d"
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "b"), get_array(df, "b"))
assert np.shares_memory(get_array(result, "a"), get_array(df2, "a"))

df2.iloc[0, 0] = "d"
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "a"), get_array(df2, "a"))
tm.assert_frame_equal(result, expected)


def test_concat_series(using_copy_on_write):
ser = Series([1, 2], name="a")
ser2 = Series([3, 4], name="b")
ser_orig = ser.copy()
ser2_orig = ser2.copy()
result = concat([ser, ser2], axis=1)

if using_copy_on_write:
assert np.shares_memory(get_array(result, "a"), ser.values)
assert np.shares_memory(get_array(result, "b"), ser2.values)
else:
assert not np.shares_memory(get_array(result, "a"), ser.values)
assert not np.shares_memory(get_array(result, "b"), ser2.values)

result.iloc[0, 0] = 100
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "a"), ser.values)
assert np.shares_memory(get_array(result, "b"), ser2.values)

result.iloc[0, 1] = 1000
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "b"), ser2.values)
tm.assert_series_equal(ser, ser_orig)
tm.assert_series_equal(ser2, ser2_orig)


def test_chained_concat(using_copy_on_write):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_chained_concat(using_copy_on_write):
def test_concat_frames_chained(using_copy_on_write):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need a similar test for series with chained concat?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

df1 = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]})

df2 = DataFrame({"c": [4, 5, 6]})

df3 = DataFrame({"d": [4, 5, 6]})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
df1 = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]})
df2 = DataFrame({"c": [4, 5, 6]})
df3 = DataFrame({"d": [4, 5, 6]})
df1 = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]})
df2 = DataFrame({"c": [4, 5, 6]})
df3 = DataFrame({"d": [4, 5, 6]})

result = concat([concat([df1, df2], axis=1), df3], axis=1)
expected = result.copy()

if using_copy_on_write:
assert np.shares_memory(get_array(result, "a"), get_array(df1, "a"))
assert np.shares_memory(get_array(result, "c"), get_array(df2, "c"))
assert np.shares_memory(get_array(result, "d"), get_array(df3, "d"))
else:
assert not np.shares_memory(get_array(result, "a"), get_array(df1, "a"))
assert not np.shares_memory(get_array(result, "c"), get_array(df2, "c"))
assert not np.shares_memory(get_array(result, "d"), get_array(df3, "d"))

df1.iloc[0, 0] = 100
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "a"), get_array(df1, "a"))

tm.assert_frame_equal(result, expected)


def test_concat_series_parent_obj(using_copy_on_write):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_concat_series_parent_obj(using_copy_on_write):
def test_concat_series_updating_input(using_copy_on_write):

(that's the name pattern you used above for frames)

ser = Series([1, 2], name="a")
ser2 = Series([3, 4], name="b")
expected = DataFrame({"a": [1, 2], "b": [3, 4]})
result = concat([ser, ser2], axis=1)

if using_copy_on_write:
assert np.shares_memory(get_array(result, "a"), get_array(ser, "a"))
assert np.shares_memory(get_array(result, "b"), get_array(ser2, "b"))
else:
assert not np.shares_memory(get_array(result, "a"), get_array(ser, "a"))
assert not np.shares_memory(get_array(result, "b"), get_array(ser2, "b"))

ser.iloc[0] = 100
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "a"), get_array(ser, "a"))
assert np.shares_memory(get_array(result, "b"), get_array(ser2, "b"))
tm.assert_frame_equal(result, expected)

ser2.iloc[0] = 1000
if using_copy_on_write:
assert not np.shares_memory(get_array(result, "b"), get_array(ser2, "b"))
tm.assert_frame_equal(result, expected)
17 changes: 17 additions & 0 deletions pandas/tests/copy_view/test_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,23 @@ def test_sort_index(using_copy_on_write):
tm.assert_series_equal(ser, ser_orig)


def test_round(using_copy_on_write):
df = DataFrame({"a": [1, 2], "b": "c"})
df2 = df.round()
df_orig = df.copy()

if using_copy_on_write:
assert np.shares_memory(get_array(df2, "b"), get_array(df, "b"))
assert np.shares_memory(get_array(df2, "a"), get_array(df, "a"))
else:
assert not np.shares_memory(get_array(df2, "b"), get_array(df, "b"))

df2.iloc[0, 1] = "d"
if using_copy_on_write:
assert not np.shares_memory(get_array(df2, "b"), get_array(df, "b"))
tm.assert_frame_equal(df, df_orig)


def test_reorder_levels(using_copy_on_write):
index = MultiIndex.from_tuples(
[(1, 1), (1, 2), (2, 1), (2, 2)], names=["one", "two"]
Expand Down
2 changes: 2 additions & 0 deletions pandas/tests/io/pytables/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
timedelta_range,
)
import pandas._testing as tm
from pandas.core.internals.managers import using_copy_on_write
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from pandas.core.internals.managers import using_copy_on_write
from pandas._config import using_copy_on_write

from pandas.tests.io.pytables.common import (
_maybe_remove,
ensure_clean_store,
Expand Down Expand Up @@ -1007,6 +1008,7 @@ def test_to_hdf_with_object_column_names(tmp_path, setup_path):
assert len(result)


@pytest.mark.skipif(using_copy_on_write(), reason="strides buggy with cow")
def test_hdfstore_strides(setup_path):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this caused by the concat changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah strides changed from 8 to 16, this is really weird

Copy link
Member

@jorisvandenbossche jorisvandenbossche Jan 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into this a little bit, I think the issue is that in

pandas/pandas/io/pytables.py

Lines 3207 to 3212 in a0071f9

if len(dfs) > 0:
out = concat(dfs, axis=1)
out = out.reindex(columns=items, copy=False)
return out
return DataFrame(columns=axes[0], index=axes[1])

where we are creating the DataFrame, with the changes in this PR that concat won't copy if CoW is enabled. But it seems that the data from the HDF store always come back as F contiguous, while in pandas we want it as C contiguous for optimal performance.
Maybe we should just add a copy=True in the concat call in the mentioned snippet. That's already the default for non-CoW so won't change anything, and for CoW enabled also ensures we get the desired memory layout (at the expense of an extra copy while reading in, but to fix that, that can be a follow-up optimization investigating why the HDF store always returns F contiguous arrays)

Also commented about that on the related issue (that triggered adding this test): #22073

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this seems to fix it

# GH22073
df = DataFrame({"a": [1, 2, 3, 4], "b": [5, 6, 7, 8]})
Expand Down
4 changes: 2 additions & 2 deletions pandas/tests/reshape/concat/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_append_concat(self):
assert isinstance(result.index, PeriodIndex)
assert result.index[0] == s1.index[0]

def test_concat_copy(self, using_array_manager):
def test_concat_copy(self, using_array_manager, using_copy_on_write):
df = DataFrame(np.random.randn(4, 3))
df2 = DataFrame(np.random.randint(0, 10, size=4).reshape(4, 1))
df3 = DataFrame({5: "foo"}, index=range(4))
Expand Down Expand Up @@ -82,7 +82,7 @@ def test_concat_copy(self, using_array_manager):
result = concat([df, df2, df3, df4], axis=1, copy=False)
for arr in result._mgr.arrays:
if arr.dtype.kind == "f":
if using_array_manager:
if using_array_manager or using_copy_on_write:
# this is a view on some array in either df or df4
assert any(
np.shares_memory(arr, other)
Expand Down