diff --git a/doc/source/development/copy_on_write.rst b/doc/source/development/copy_on_write.rst new file mode 100644 index 0000000000000..34625ed645615 --- /dev/null +++ b/doc/source/development/copy_on_write.rst @@ -0,0 +1,41 @@ +.. _copy_on_write: + +{{ header }} + +************* +Copy on write +************* + +Copy on Write is a mechanism to simplify the indexing API and improve +performance through avoiding copies if possible. +CoW means that any DataFrame or Series derived from another in any way always +behaves as a copy. + +Reference tracking +------------------ + +To be able to determine, if we have to make a copy when writing into a DataFrame, +we have to be aware, if the values are shared with another DataFrame. pandas +keeps track of all ``Blocks`` that share values with another block internally to +be able to tell when a copy needs to be triggered. The reference tracking +mechanism is implemented on the Block level. + +We use a custom reference tracker object, ``BlockValuesRefs``, that keeps +track of every block, whose values share memory with each other. The reference +is held through a weak-reference. Every two blocks that share some memory should +point to the same ``BlockValuesRefs`` object. If one block goes out of +scope, the reference to this block dies. As a consequence, the reference tracker +object always knows how many blocks are alive and share memory. + +Whenever a :class:`DataFrame` or :class:`Series` object is sharing data with another +object, it is required that each of those objects have its own BlockManager and Block +objects. Thus, in other words, one Block instance (that is held by a DataFrame, not +necessarily for intermediate objects) should always be uniquely used for only +a single DataFrame/Series object. For example, when you want to use the same +Block for another object, you can create a shallow copy of the Block instance +with ``block.copy(deep=False)`` (which will create a new Block instance with +the same underlying values and which will correctly set up the references). + +We can ask the reference tracking object if there is another block alive that shares +data with us before writing into the values. We can trigger a copy before +writing if there is in fact another block alive. diff --git a/doc/source/development/index.rst b/doc/source/development/index.rst index c741441cf67a1..69f04494a271c 100644 --- a/doc/source/development/index.rst +++ b/doc/source/development/index.rst @@ -18,6 +18,7 @@ Development contributing_codebase maintaining internals + copy_on_write debugging_extensions extending developer diff --git a/pandas/_libs/internals.pyi b/pandas/_libs/internals.pyi index 79bdbea71e4d8..5dfcc3726c84f 100644 --- a/pandas/_libs/internals.pyi +++ b/pandas/_libs/internals.pyi @@ -4,6 +4,7 @@ from typing import ( final, overload, ) +import weakref import numpy as np @@ -59,8 +60,13 @@ class SharedBlock: _mgr_locs: BlockPlacement ndim: int values: ArrayLike + refs: BlockValuesRefs def __init__( - self, values: ArrayLike, placement: BlockPlacement, ndim: int + self, + values: ArrayLike, + placement: BlockPlacement, + ndim: int, + refs: BlockValuesRefs | None = ..., ) -> None: ... class NumpyBlock(SharedBlock): @@ -87,3 +93,9 @@ class BlockManager: ) -> None: ... def get_slice(self: T, slobj: slice, axis: int = ...) -> T: ... def _rebuild_blknos_and_blklocs(self) -> None: ... + +class BlockValuesRefs: + referenced_blocks: list[weakref.ref] + def __init__(self, blk: SharedBlock) -> None: ... + def add_reference(self, blk: SharedBlock) -> None: ... + def has_reference(self) -> bool: ... diff --git a/pandas/_libs/internals.pyx b/pandas/_libs/internals.pyx index 3333ac1115177..b5ff69d92492f 100644 --- a/pandas/_libs/internals.pyx +++ b/pandas/_libs/internals.pyx @@ -580,9 +580,16 @@ cdef class SharedBlock: """ cdef: public BlockPlacement _mgr_locs + public BlockValuesRefs refs readonly int ndim - def __cinit__(self, values, placement: BlockPlacement, ndim: int): + def __cinit__( + self, + values, + placement: BlockPlacement, + ndim: int, + refs: BlockValuesRefs | None = None, + ): """ Parameters ---------- @@ -591,9 +598,22 @@ cdef class SharedBlock: placement : BlockPlacement ndim : int 1 for SingleBlockManager/Series, 2 for BlockManager/DataFrame + refs: BlockValuesRefs, optional + Ref tracking object or None if block does not have any refs. """ self._mgr_locs = placement self.ndim = ndim + if refs is None: + # if no refs are passed, that means we are creating a Block from + # new values that it uniquely owns -> start a new BlockValuesRefs + # object that only references this block + self.refs = BlockValuesRefs(self) + else: + # if refs are passed, this is the BlockValuesRefs object that is shared + # with the parent blocks which share the values, and a reference to this + # new block is added + refs.add_reference(self) + self.refs = refs cpdef __reduce__(self): args = (self.values, self.mgr_locs.indexer, self.ndim) @@ -619,9 +639,15 @@ cdef class NumpyBlock(SharedBlock): cdef: public ndarray values - def __cinit__(self, ndarray values, BlockPlacement placement, int ndim): + def __cinit__( + self, + ndarray values, + BlockPlacement placement, + int ndim, + refs: BlockValuesRefs | None = None, + ): # set values here; the (implicit) call to SharedBlock.__cinit__ will - # set placement and ndim + # set placement, ndim and refs self.values = values cpdef NumpyBlock getitem_block_index(self, slice slicer): @@ -631,7 +657,7 @@ cdef class NumpyBlock(SharedBlock): Assumes self.ndim == 2 """ new_values = self.values[..., slicer] - return type(self)(new_values, self._mgr_locs, ndim=self.ndim) + return type(self)(new_values, self._mgr_locs, ndim=self.ndim, refs=self.refs) cdef class NDArrayBackedBlock(SharedBlock): @@ -641,9 +667,15 @@ cdef class NDArrayBackedBlock(SharedBlock): cdef public: NDArrayBacked values - def __cinit__(self, NDArrayBacked values, BlockPlacement placement, int ndim): + def __cinit__( + self, + NDArrayBacked values, + BlockPlacement placement, + int ndim, + refs: BlockValuesRefs | None = None, + ): # set values here; the (implicit) call to SharedBlock.__cinit__ will - # set placement and ndim + # set placement, ndim and refs self.values = values cpdef NDArrayBackedBlock getitem_block_index(self, slice slicer): @@ -653,16 +685,22 @@ cdef class NDArrayBackedBlock(SharedBlock): Assumes self.ndim == 2 """ new_values = self.values[..., slicer] - return type(self)(new_values, self._mgr_locs, ndim=self.ndim) + return type(self)(new_values, self._mgr_locs, ndim=self.ndim, refs=self.refs) cdef class Block(SharedBlock): cdef: public object values - def __cinit__(self, object values, BlockPlacement placement, int ndim): + def __cinit__( + self, + object values, + BlockPlacement placement, + int ndim, + refs: BlockValuesRefs | None = None, + ): # set values here; the (implicit) call to SharedBlock.__cinit__ will - # set placement and ndim + # set placement, ndim and refs self.values = values @@ -673,15 +711,11 @@ cdef class BlockManager: public list axes public bint _known_consolidated, _is_consolidated public ndarray _blknos, _blklocs - public list refs - public object parent def __cinit__( self, blocks=None, axes=None, - refs=None, - parent=None, verify_integrity=True, ): # None as defaults for unpickling GH#42345 @@ -695,8 +729,6 @@ cdef class BlockManager: self.blocks = blocks self.axes = axes.copy() # copy to make sure we are not remotely-mutable - self.refs = refs - self.parent = parent # Populate known_consolidate, blknos, and blklocs lazily self._known_consolidated = False @@ -805,16 +837,12 @@ cdef class BlockManager: ndarray blknos, blklocs nbs = [] - nrefs = [] for blk in self.blocks: nb = blk.getitem_block_index(slobj) nbs.append(nb) - nrefs.append(weakref.ref(blk)) new_axes = [self.axes[0], self.axes[1]._getitem_slice(slobj)] - mgr = type(self)( - tuple(nbs), new_axes, nrefs, parent=self, verify_integrity=False - ) + mgr = type(self)(tuple(nbs), new_axes, verify_integrity=False) # We can avoid having to rebuild blklocs/blknos blklocs = self._blklocs @@ -827,7 +855,7 @@ cdef class BlockManager: def get_slice(self, slobj: slice, axis: int = 0) -> BlockManager: if axis == 0: - new_blocks, new_refs = self._slice_take_blocks_ax0(slobj) + new_blocks = self._slice_take_blocks_ax0(slobj) elif axis == 1: return self._get_index_slice(slobj) else: @@ -836,6 +864,40 @@ cdef class BlockManager: new_axes = list(self.axes) new_axes[axis] = new_axes[axis]._getitem_slice(slobj) - return type(self)( - tuple(new_blocks), new_axes, new_refs, parent=self, verify_integrity=False - ) + return type(self)(tuple(new_blocks), new_axes, verify_integrity=False) + + +cdef class BlockValuesRefs: + """Tracks all references to a given array. + + Keeps track of all blocks (through weak references) that reference the same + data. + """ + cdef: + public object referenced_blocks + + def __cinit__(self, blk: SharedBlock) -> None: + self.referenced_blocks = weakref.WeakSet([blk]) + + def add_reference(self, blk: SharedBlock) -> None: + """Adds a new reference to our reference collection. + + Parameters + ---------- + blk: SharedBlock + The block that the new references should point to. + """ + self.referenced_blocks.add(blk) + + def has_reference(self) -> bool: + """Checks if block has foreign references. + + A reference is only relevant if it is still alive. The reference to + ourselves does not count. + + Returns + ------- + bool + """ + # Checking for more references than block pointing to itself + return len(self.referenced_blocks) > 1 diff --git a/pandas/compat/pickle_compat.py b/pandas/compat/pickle_compat.py index 2fbd3a6cdb046..bfee616d52aac 100644 --- a/pandas/compat/pickle_compat.py +++ b/pandas/compat/pickle_compat.py @@ -168,7 +168,7 @@ def load_newobj(self) -> None: arr = np.array([], dtype="m8[ns]") obj = cls.__new__(cls, arr, arr.dtype) elif cls is BlockManager and not args: - obj = cls.__new__(cls, (), [], None, False) + obj = cls.__new__(cls, (), [], False) else: obj = cls.__new__(cls, *args) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 6008e6b6cb566..9ad291c39cbc5 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -768,8 +768,10 @@ def swapaxes( ) assert isinstance(new_mgr, BlockManager) assert isinstance(self._mgr, BlockManager) - new_mgr.parent = self._mgr - new_mgr.refs = [weakref.ref(self._mgr.blocks[0])] + new_mgr.blocks[0].refs = self._mgr.blocks[0].refs + new_mgr.blocks[0].refs.add_reference( + new_mgr.blocks[0] # type: ignore[arg-type] + ) return self._constructor(new_mgr).__finalize__(self, method="swapaxes") elif (copy or copy is None) and self._mgr.is_single_block: diff --git a/pandas/core/internals/blocks.py b/pandas/core/internals/blocks.py index 22fe227d21727..15abc143cd081 100644 --- a/pandas/core/internals/blocks.py +++ b/pandas/core/internals/blocks.py @@ -20,7 +20,10 @@ lib, writers, ) -from pandas._libs.internals import BlockPlacement +from pandas._libs.internals import ( + BlockPlacement, + BlockValuesRefs, +) from pandas._libs.missing import NA from pandas._libs.tslibs import IncompatibleFrequency from pandas._typing import ( @@ -144,6 +147,7 @@ class Block(PandasObject): values: np.ndarray | ExtensionArray ndim: int + refs: BlockValuesRefs __init__: Callable __slots__ = () @@ -261,7 +265,8 @@ def getitem_block(self, slicer: slice | npt.NDArray[np.intp]) -> Block: new_mgr_locs = self._mgr_locs[slicer] new_values = self._slice(slicer) - return type(self)(new_values, new_mgr_locs, self.ndim) + refs = self.refs if isinstance(slicer, slice) else None + return type(self)(new_values, new_mgr_locs, self.ndim, refs=refs) @final def getitem_block_columns( @@ -277,7 +282,7 @@ def getitem_block_columns( if new_values.ndim != self.values.ndim: raise ValueError("Only same dim slicing is allowed") - return type(self)(new_values, new_mgr_locs, self.ndim) + return type(self)(new_values, new_mgr_locs, self.ndim, refs=self.refs) @final def _can_hold_element(self, element: Any) -> bool: @@ -502,9 +507,13 @@ def to_native_types(self, na_rep: str = "nan", quoting=None, **kwargs) -> Block: def copy(self, deep: bool = True) -> Block: """copy constructor""" values = self.values + refs: BlockValuesRefs | None if deep: values = values.copy() - return type(self)(values, placement=self._mgr_locs, ndim=self.ndim) + refs = None + else: + refs = self.refs + return type(self)(values, placement=self._mgr_locs, ndim=self.ndim, refs=refs) # --------------------------------------------------------------------- # Replace @@ -1337,6 +1346,10 @@ def delete(self, loc) -> list[Block]: new_blocks: list[Block] = [] previous_loc = -1 + # TODO(CoW): This is tricky, if parent block goes out of scope + # all split blocks are referencing each other even though they + # don't share data + refs = self.refs if self.refs.has_reference() else None for idx in loc: if idx == previous_loc + 1: @@ -1347,7 +1360,9 @@ def delete(self, loc) -> list[Block]: # argument type "Tuple[slice, slice]" values = self.values[previous_loc + 1 : idx, :] # type: ignore[call-overload] # noqa locs = mgr_locs_arr[previous_loc + 1 : idx] - nb = type(self)(values, placement=BlockPlacement(locs), ndim=self.ndim) + nb = type(self)( + values, placement=BlockPlacement(locs), ndim=self.ndim, refs=refs + ) new_blocks.append(nb) previous_loc = idx @@ -1804,7 +1819,7 @@ def getitem_block_index(self, slicer: slice) -> ExtensionBlock: # GH#42787 in principle this is equivalent to values[..., slicer], but we don't # require subclasses of ExtensionArray to support that form (for now). new_values = self.values[slicer] - return type(self)(new_values, self._mgr_locs, ndim=self.ndim) + return type(self)(new_values, self._mgr_locs, ndim=self.ndim, refs=self.refs) def diff(self, n: int, axis: AxisInt = 1) -> list[Block]: # only reached with ndim == 2 and axis == 1 @@ -2137,7 +2152,9 @@ def new_block_2d(values: ArrayLike, placement: BlockPlacement): return klass(values, ndim=2, placement=placement) -def new_block(values, placement, *, ndim: int) -> Block: +def new_block( + values, placement, *, ndim: int, refs: BlockValuesRefs | None = None +) -> Block: # caller is responsible for ensuring values is NOT a PandasArray if not isinstance(placement, BlockPlacement): @@ -2148,7 +2165,7 @@ def new_block(values, placement, *, ndim: int) -> Block: klass = get_block_type(values.dtype) values = maybe_coerce_values(values) - return klass(values, ndim=ndim, placement=placement) + return klass(values, ndim=ndim, placement=placement, refs=refs) def check_ndim(values, placement: BlockPlacement, ndim: int) -> None: diff --git a/pandas/core/internals/concat.py b/pandas/core/internals/concat.py index d46b51a2ee954..bedd4d92a1ea3 100644 --- a/pandas/core/internals/concat.py +++ b/pandas/core/internals/concat.py @@ -7,7 +7,6 @@ Sequence, cast, ) -import weakref import numpy as np @@ -62,10 +61,7 @@ ensure_block_shape, new_block_2d, ) -from pandas.core.internals.managers import ( - BlockManager, - using_copy_on_write, -) +from pandas.core.internals.managers import BlockManager if TYPE_CHECKING: from pandas import Index @@ -271,8 +267,6 @@ def _concat_managers_axis0( offset = 0 blocks = [] - refs: list[weakref.ref | None] = [] - parents: list = [] for i, mgr in enumerate(mgrs): # If we already reindexed, then we definitely don't need another copy made_copy = had_reindexers[i] @@ -289,17 +283,9 @@ def _concat_managers_axis0( nb._mgr_locs = nb._mgr_locs.add(offset) blocks.append(nb) - if not made_copy and not copy and using_copy_on_write(): - refs.extend([weakref.ref(blk) for blk in mgr.blocks]) - parents.append(mgr) - elif using_copy_on_write(): - refs.extend([None] * len(mgr.blocks)) - offset += len(mgr.items) - result_parents = parents if parents else None - result_ref = refs if refs else None - result = BlockManager(tuple(blocks), axes, parent=result_parents, refs=result_ref) + result = BlockManager(tuple(blocks), axes) return result diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index 8a4fa4c10bf5f..74116dd855e3e 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -11,7 +11,6 @@ cast, ) import warnings -import weakref import numpy as np @@ -146,8 +145,6 @@ class BaseBlockManager(DataManager): _blklocs: npt.NDArray[np.intp] blocks: tuple[Block, ...] axes: list[Index] - refs: list[weakref.ref | None] | None - parent: object @property def ndim(self) -> int: @@ -156,17 +153,11 @@ def ndim(self) -> int: _known_consolidated: bool _is_consolidated: bool - def __init__(self, blocks, axes, refs=None, verify_integrity: bool = True) -> None: + def __init__(self, blocks, axes, verify_integrity: bool = True) -> None: raise NotImplementedError @classmethod - def from_blocks( - cls: type_t[T], - blocks: list[Block], - axes: list[Index], - refs: list[weakref.ref | None] | None = None, - parent: object = None, - ) -> T: + def from_blocks(cls: type_t[T], blocks: list[Block], axes: list[Index]) -> T: raise NotImplementedError @property @@ -254,19 +245,7 @@ def _has_no_reference_block(self, blkno: int) -> bool: (whether it references another array or is itself being referenced) Returns True if the block has no references. """ - # TODO(CoW) include `or self.refs[blkno]() is None` ? - return ( - self.refs is None or self.refs[blkno] is None - ) and weakref.getweakrefcount(self.blocks[blkno]) == 0 - - def _clear_reference_block(self, blkno: int) -> None: - """ - Clear any reference for column `i`. - """ - if self.refs is not None: - self.refs[blkno] = None - if com.all_none(*self.refs): - self.parent = None + return not self.blocks[blkno].refs.has_reference() def get_dtypes(self): dtypes = np.array([blk.dtype for blk in self.blocks]) @@ -580,23 +559,17 @@ def _combine( # TODO(CoW) we could optimize here if we know that the passed blocks # are fully "owned" (eg created from an operation, not coming from # an existing manager) - new_refs: list[weakref.ref | None] | None = None if copy else [] for b in blocks: nb = b.copy(deep=copy) nb.mgr_locs = BlockPlacement(inv_indexer[nb.mgr_locs.indexer]) new_blocks.append(nb) - if not copy: - # None has no attribute "append" - new_refs.append(weakref.ref(b)) # type: ignore[union-attr] axes = list(self.axes) if index is not None: axes[-1] = index axes[0] = self.items.take(indexer) - return type(self).from_blocks( - new_blocks, axes, new_refs, parent=None if copy else self - ) + return type(self).from_blocks(new_blocks, axes) @property def nblocks(self) -> int: @@ -636,17 +609,7 @@ def copy_func(ax): new_axes = list(self.axes) res = self.apply("copy", deep=deep) - new_refs: list[weakref.ref | None] | None - if deep: - new_refs = None - parent = None - else: - new_refs = [weakref.ref(blk) for blk in self.blocks] - parent = self - res.axes = new_axes - res.refs = new_refs - res.parent = parent if self.ndim > 1: # Avoid needing to re-compute these @@ -670,7 +633,7 @@ def consolidate(self: T) -> T: if self.is_consolidated(): return self - bm = type(self)(self.blocks, self.axes, self.refs, verify_integrity=False) + bm = type(self)(self.blocks, self.axes, verify_integrity=False) bm._is_consolidated = False bm._consolidate_inplace() return bm @@ -732,13 +695,12 @@ def reindex_indexer( raise IndexError("Requested axis not found in manager") if axis == 0: - new_blocks, new_refs = self._slice_take_blocks_ax0( + new_blocks = self._slice_take_blocks_ax0( indexer, fill_value=fill_value, only_slice=only_slice, use_na_proxy=use_na_proxy, ) - parent = None if com.all_none(*new_refs) else self else: new_blocks = [ blk.take_nd( @@ -750,13 +712,11 @@ def reindex_indexer( ) for blk in self.blocks ] - new_refs = None - parent = None new_axes = list(self.axes) new_axes[axis] = new_axis - new_mgr = type(self).from_blocks(new_blocks, new_axes, new_refs, parent=parent) + new_mgr = type(self).from_blocks(new_blocks, new_axes) if axis == 1: # We can avoid the need to rebuild these new_mgr._blknos = self.blknos.copy() @@ -770,7 +730,7 @@ def _slice_take_blocks_ax0( only_slice: bool = False, *, use_na_proxy: bool = False, - ) -> tuple[list[Block], list[weakref.ref | None]]: + ) -> list[Block]: """ Slice/take blocks along axis=0. @@ -803,11 +763,9 @@ def _slice_take_blocks_ax0( # GH#32959 EABlock would fail since we can't make 0-width # TODO(EA2D): special casing unnecessary with 2D EAs if sllen == 0: - return [], [] + return [] bp = BlockPlacement(slice(0, sllen)) - return [blk.getitem_block_columns(slobj, new_mgr_locs=bp)], [ - weakref.ref(blk) - ] + return [blk.getitem_block_columns(slobj, new_mgr_locs=bp)] elif not allow_fill or self.ndim == 1: if allow_fill and fill_value is None: fill_value = blk.fill_value @@ -821,9 +779,7 @@ def _slice_take_blocks_ax0( ) for i, ml in enumerate(slobj) ] - # We have - # all(np.shares_memory(nb.values, blk.values) for nb in blocks) - return blocks, [weakref.ref(blk)] * len(blocks) + return blocks else: bp = BlockPlacement(slice(0, sllen)) return [ @@ -833,7 +789,7 @@ def _slice_take_blocks_ax0( new_mgr_locs=bp, fill_value=fill_value, ) - ], [None] + ] if sl_type == "slice": blknos = self.blknos[slobj] @@ -849,7 +805,6 @@ def _slice_take_blocks_ax0( # When filling blknos, make sure blknos is updated before appending to # blocks list, that way new blkno is exactly len(blocks). blocks = [] - refs: list[weakref.ref | None] = [] group = not only_slice for blkno, mgr_locs in libinternals.get_blkno_placements(blknos, group=group): if blkno == -1: @@ -862,7 +817,6 @@ def _slice_take_blocks_ax0( use_na_proxy=use_na_proxy, ) ) - refs.append(None) else: blk = self.blocks[blkno] @@ -876,7 +830,6 @@ def _slice_take_blocks_ax0( newblk = blk.copy(deep=False) newblk.mgr_locs = BlockPlacement(slice(mgr_loc, mgr_loc + 1)) blocks.append(newblk) - refs.append(weakref.ref(blk)) else: # GH#32779 to avoid the performance penalty of copying, @@ -889,7 +842,6 @@ def _slice_take_blocks_ax0( if isinstance(taker, slice): nb = blk.getitem_block_columns(taker, new_mgr_locs=mgr_locs) blocks.append(nb) - refs.append(weakref.ref(blk)) elif only_slice: # GH#33597 slice instead of take, so we get # views instead of copies @@ -899,13 +851,11 @@ def _slice_take_blocks_ax0( nb = blk.getitem_block_columns(slc, new_mgr_locs=bp) # We have np.shares_memory(nb.values, blk.values) blocks.append(nb) - refs.append(weakref.ref(blk)) else: nb = blk.take_nd(taker, axis=0, new_mgr_locs=mgr_locs) blocks.append(nb) - refs.append(None) - return blocks, refs + return blocks def _make_na_block( self, placement: BlockPlacement, fill_value=None, use_na_proxy: bool = False @@ -990,8 +940,6 @@ def __init__( self, blocks: Sequence[Block], axes: Sequence[Index], - refs: list[weakref.ref | None] | None = None, - parent: object = None, verify_integrity: bool = True, ) -> None: @@ -1023,28 +971,13 @@ def _verify_integrity(self) -> None: f"block items\n# manager items: {len(self.items)}, # " f"tot_items: {tot_items}" ) - if self.refs is not None: - if len(self.refs) != len(self.blocks): - raise AssertionError( - "Number of passed refs must equal the number of blocks: " - f"{len(self.refs)} refs vs {len(self.blocks)} blocks." - "\nIf you see this error, please report a bug at " - "https://github.com/pandas-dev/pandas/issues" - ) @classmethod - def from_blocks( - cls, - blocks: list[Block], - axes: list[Index], - refs: list[weakref.ref | None] | None = None, - parent: object = None, - ) -> BlockManager: + def from_blocks(cls, blocks: list[Block], axes: list[Index]) -> BlockManager: """ Constructor for BlockManager and SingleBlockManager with same signature. """ - parent = parent if using_copy_on_write() else None - return cls(blocks, axes, refs, parent, verify_integrity=False) + return cls(blocks, axes, verify_integrity=False) # ---------------------------------------------------------------- # Indexing @@ -1063,10 +996,14 @@ def fast_xs(self, loc: int) -> SingleBlockManager: """ if len(self.blocks) == 1: result = self.blocks[0].iget((slice(None), loc)) - block = new_block(result, placement=slice(0, len(result)), ndim=1) # in the case of a single block, the new block is a view - ref = weakref.ref(self.blocks[0]) - return SingleBlockManager(block, self.axes[0], [ref], parent=self) + block = new_block( + result, + placement=slice(0, len(result)), + ndim=1, + refs=self.blocks[0].refs, + ) + return SingleBlockManager(block, self.axes[0]) dtype = interleaved_dtype([blk.dtype for blk in self.blocks]) @@ -1109,10 +1046,10 @@ def iget(self, i: int, track_ref: bool = True) -> SingleBlockManager: # shortcut for select a single-dim from a 2-dim BM bp = BlockPlacement(slice(0, len(values))) - nb = type(block)(values, placement=bp, ndim=1) - ref = weakref.ref(block) if track_ref else None - parent = self if track_ref else None - return SingleBlockManager(nb, self.axes[1], [ref], parent) + nb = type(block)( + values, placement=bp, ndim=1, refs=block.refs if track_ref else None + ) + return SingleBlockManager(nb, self.axes[1]) def iget_values(self, i: int) -> ArrayLike: """ @@ -1244,7 +1181,7 @@ def value_getitem(placement): self._iset_split_block(blkno_l, blk_locs) if len(removed_blknos): - # Remove blocks & update blknos and refs accordingly + # Remove blocks & update blknos accordingly is_deleted = np.zeros(self.nblocks, dtype=np.bool_) is_deleted[removed_blknos] = True @@ -1255,18 +1192,14 @@ def value_getitem(placement): self.blocks = tuple( blk for i, blk in enumerate(self.blocks) if i not in set(removed_blknos) ) - if self.refs is not None: - self.refs = [ - ref - for i, ref in enumerate(self.refs) - if i not in set(removed_blknos) - ] if unfit_val_locs: unfit_idxr = np.concatenate(unfit_mgr_locs) unfit_count = len(unfit_idxr) new_blocks: list[Block] = [] + # TODO(CoW) is this always correct to assume that the new_blocks + # are not referencing anything else? if value_is_extension_type: # This code (ab-)uses the fact that EA blocks contain only # one item. @@ -1297,10 +1230,6 @@ def value_getitem(placement): self._blklocs[unfit_idxr] = np.arange(unfit_count) self.blocks += tuple(new_blocks) - # TODO(CoW) is this always correct to assume that the new_blocks - # are not referencing anything else? - if self.refs is not None: - self.refs = list(self.refs) + [None] * len(new_blocks) # Newly created block's dtype may already be present. self._known_consolidated = False @@ -1337,13 +1266,6 @@ def _iset_split_block( first_nb = nbs_tup[0] nbs_tup = tuple(nbs_tup[1:]) - if self.refs is not None: - self.refs.extend([self.refs[blkno_l]] * len(nbs_tup)) - - if value is not None: - # Only clear if we set new values - self._clear_reference_block(blkno_l) - nr_blocks = len(self.blocks) blocks_tup = ( self.blocks[:blkno_l] + (first_nb,) + self.blocks[blkno_l + 1 :] + nbs_tup @@ -1373,7 +1295,6 @@ def _iset_single( if using_copy_on_write() and not self._has_no_reference_block(blkno): # perform Copy-on-Write and clear the reference copy = True - self._clear_reference_block(blkno) iloc = self.blklocs[loc] blk.set_inplace(slice(iloc, iloc + 1), value, copy=copy) return @@ -1382,7 +1303,6 @@ def _iset_single( old_blocks = self.blocks new_blocks = old_blocks[:blkno] + (nb,) + old_blocks[blkno + 1 :] self.blocks = new_blocks - self._clear_reference_block(blkno) return def column_setitem( @@ -1400,7 +1320,6 @@ def column_setitem( blocks = list(self.blocks) blocks[blkno] = blocks[blkno].copy() self.blocks = tuple(blocks) - self._clear_reference_block(blkno) # this manager is only created temporarily to mutate the values in place # so don't track references, otherwise the `setitem` would perform CoW again @@ -1434,6 +1353,7 @@ def insert(self, loc: int, item: Hashable, value: ArrayLike) -> None: value = ensure_block_shape(value, ndim=self.ndim) bp = BlockPlacement(slice(loc, loc + 1)) + # TODO(CoW) do we always "own" the passed `value`? block = new_block_2d(values=value, placement=bp) if not len(self.blocks): @@ -1446,9 +1366,6 @@ def insert(self, loc: int, item: Hashable, value: ArrayLike) -> None: self.axes[0] = new_axis self.blocks += (block,) - # TODO(CoW) do we always "own" the passed `value`? - if self.refs is not None: - self.refs += [None] self._known_consolidated = False @@ -1502,12 +1419,10 @@ def idelete(self, indexer) -> BlockManager: is_deleted[indexer] = True taker = (~is_deleted).nonzero()[0] - nbs, new_refs = self._slice_take_blocks_ax0(taker, only_slice=True) + nbs = self._slice_take_blocks_ax0(taker, only_slice=True) new_columns = self.items[~is_deleted] axes = [new_columns, self.axes[1]] - # TODO this might not be needed (can a delete ever be done in chained manner?) - parent = None if com.all_none(*new_refs) else self - return type(self)(tuple(nbs), axes, new_refs, parent, verify_integrity=False) + return type(self)(tuple(nbs), axes, verify_integrity=False) # ---------------------------------------------------------------- # Block-wise Operation @@ -1854,10 +1769,7 @@ def _consolidate_inplace(self) -> None: # the DataFrame's _item_cache. The exception is for newly-created # BlockManager objects not yet attached to a DataFrame. if not self.is_consolidated(): - if self.refs is None: - self.blocks = _consolidate(self.blocks) - else: - self.blocks, self.refs = _consolidate_with_refs(self.blocks, self.refs) + self.blocks = _consolidate(self.blocks) self._is_consolidated = True self._known_consolidated = True self._rebuild_blknos_and_blklocs() @@ -1879,8 +1791,6 @@ def __init__( self, block: Block, axis: Index, - refs: list[weakref.ref | None] | None = None, - parent: object = None, verify_integrity: bool = False, ) -> None: # Assertions disabled for performance @@ -1889,25 +1799,19 @@ def __init__( self.axes = [axis] self.blocks = (block,) - self.refs = refs - self.parent = parent if using_copy_on_write() else None @classmethod def from_blocks( cls, blocks: list[Block], axes: list[Index], - refs: list[weakref.ref | None] | None = None, - parent: object = None, ) -> SingleBlockManager: """ Constructor for BlockManager and SingleBlockManager with same signature. """ assert len(blocks) == 1 assert len(axes) == 1 - if refs is not None: - assert len(refs) == 1 - return cls(blocks[0], axes[0], refs, parent, verify_integrity=False) + return cls(blocks[0], axes[0], verify_integrity=False) @classmethod def from_array(cls, array: ArrayLike, index: Index) -> SingleBlockManager: @@ -1924,13 +1828,9 @@ def to_2d_mgr(self, columns: Index) -> BlockManager: blk = self.blocks[0] arr = ensure_block_shape(blk.values, ndim=2) bp = BlockPlacement(0) - new_blk = type(blk)(arr, placement=bp, ndim=2) + new_blk = type(blk)(arr, placement=bp, ndim=2, refs=blk.refs) axes = [columns, self.axes[0]] - refs: list[weakref.ref | None] = [weakref.ref(blk)] - parent = self if using_copy_on_write() else None - return BlockManager( - [new_blk], axes=axes, refs=refs, parent=parent, verify_integrity=False - ) + return BlockManager([new_blk], axes=axes, verify_integrity=False) def _has_no_reference(self, i: int = 0) -> bool: """ @@ -1938,9 +1838,7 @@ def _has_no_reference(self, i: int = 0) -> bool: (whether it references another array or is itself being referenced) Returns True if the column has no references. """ - return (self.refs is None or self.refs[0] is None) and weakref.getweakrefcount( - self.blocks[0] - ) == 0 + return not self.blocks[0].refs.has_reference() def __getstate__(self): block_values = [b.values for b in self.blocks] @@ -2008,19 +1906,18 @@ def getitem_mgr(self, indexer: slice | np.ndarray) -> SingleBlockManager: and com.is_bool_indexer(indexer) and indexer.all() ): - return type(self)(blk, self.index, [weakref.ref(blk)], parent=self) + return type(self)(blk.copy(deep=False), self.index) array = blk._slice(indexer) if array.ndim > 1: # This will be caught by Series._get_values raise ValueError("dimension-expanding indexing not allowed") bp = BlockPlacement(slice(0, len(array))) - block = type(blk)(array, placement=bp, ndim=1) + # TODO(CoW) in theory only need to track reference if new_array is a view + block = type(blk)(array, placement=bp, ndim=1, refs=blk.refs) new_idx = self.index[indexer] - # TODO(CoW) in theory only need to track reference if new_array is a view - ref = weakref.ref(blk) - return type(self)(block, new_idx, [ref], parent=self) + return type(self)(block, new_idx) def get_slice(self, slobj: slice, axis: AxisInt = 0) -> SingleBlockManager: # Assertion disabled for performance @@ -2031,11 +1928,11 @@ def get_slice(self, slobj: slice, axis: AxisInt = 0) -> SingleBlockManager: blk = self._block array = blk._slice(slobj) bp = BlockPlacement(slice(0, len(array))) - block = type(blk)(array, placement=bp, ndim=1) - new_index = self.index._getitem_slice(slobj) # TODO this method is only used in groupby SeriesSplitter at the moment, - # so passing refs / parent is not yet covered by the tests - return type(self)(block, new_index, [weakref.ref(blk)], parent=self) + # so passing refs is not yet covered by the tests + block = type(blk)(array, placement=bp, ndim=1, refs=blk.refs) + new_index = self.index._getitem_slice(slobj) + return type(self)(block, new_index) @property def index(self) -> Index: @@ -2081,8 +1978,6 @@ def setitem_inplace(self, indexer, value) -> None: """ if using_copy_on_write() and not self._has_no_reference(0): self.blocks = (self._block.copy(),) - self.refs = None - self.parent = None self._cache.clear() super().setitem_inplace(indexer, value) @@ -2097,9 +1992,6 @@ def idelete(self, indexer) -> SingleBlockManager: self.blocks = (nb,) self.axes[0] = self.axes[0].delete(indexer) self._cache.clear() - # clear reference since delete always results in a new array - self.refs = None - self.parent = None return self def fast_xs(self, loc): @@ -2319,31 +2211,6 @@ def _consolidate(blocks: tuple[Block, ...]) -> tuple[Block, ...]: return tuple(new_blocks) -def _consolidate_with_refs( - blocks: tuple[Block, ...], refs -) -> tuple[tuple[Block, ...], list[weakref.ref | None]]: - """ - Merge blocks having same dtype, exclude non-consolidating blocks, handling - refs - """ - gkey = lambda x: x[0]._consolidate_key - grouper = itertools.groupby(sorted(zip(blocks, refs), key=gkey), gkey) - - new_blocks: list[Block] = [] - new_refs: list[weakref.ref | None] = [] - for (_can_consolidate, dtype), group_blocks_refs in grouper: - group_blocks, group_refs = list(zip(*list(group_blocks_refs))) - merged_blocks, consolidated = _merge_blocks( - list(group_blocks), dtype=dtype, can_consolidate=_can_consolidate - ) - new_blocks = extend_blocks(merged_blocks, new_blocks) - if consolidated: - new_refs.extend([None]) - else: - new_refs.extend(group_refs) - return tuple(new_blocks), new_refs - - def _merge_blocks( blocks: list[Block], dtype: DtypeObj, can_consolidate: bool ) -> tuple[list[Block], bool]: diff --git a/pandas/core/internals/ops.py b/pandas/core/internals/ops.py index 477dc98aa2b2b..24fc51a96d9df 100644 --- a/pandas/core/internals/ops.py +++ b/pandas/core/internals/ops.py @@ -36,7 +36,7 @@ def _iter_block_pairs( left_ea = blk_vals.ndim == 1 - rblks, _ = right._slice_take_blocks_ax0(locs.indexer, only_slice=True) + rblks = right._slice_take_blocks_ax0(locs.indexer, only_slice=True) # Assertions are disabled for performance, but should hold: # if left_ea: diff --git a/pandas/core/reshape/concat.py b/pandas/core/reshape/concat.py index f8220649bf890..43e0b77a90a85 100644 --- a/pandas/core/reshape/concat.py +++ b/pandas/core/reshape/concat.py @@ -14,7 +14,6 @@ cast, overload, ) -import weakref import numpy as np @@ -551,8 +550,9 @@ def __init__( obj = sample._constructor(obj, columns=[name], copy=False) if using_copy_on_write(): # TODO(CoW): Remove when ref tracking in constructors works - obj._mgr.parent = original_obj # type: ignore[union-attr] - obj._mgr.refs = [weakref.ref(original_obj._mgr.blocks[0])] # type: ignore[union-attr] # noqa: E501 + for i, block in enumerate(original_obj._mgr.blocks): # type: ignore[union-attr] # noqa + obj._mgr.blocks[i].refs = block.refs # type: ignore[union-attr] # noqa + obj._mgr.blocks[i].refs.add_reference(obj._mgr.blocks[i]) # type: ignore[arg-type, union-attr] # noqa obj.columns = [new_name] @@ -612,13 +612,9 @@ def get_result(self): typ=get_option("mode.data_manager"), ) if using_copy_on_write() and not self.copy: - parents = [obj._mgr for obj in self.objs] - mgr.parent = parents # type: ignore[union-attr] - refs = [ - weakref.ref(obj._mgr.blocks[0]) # type: ignore[union-attr] - for obj in self.objs - ] - mgr.refs = refs # type: ignore[union-attr] + for i, obj in enumerate(self.objs): + mgr.blocks[i].refs = obj._mgr.blocks[0].refs # type: ignore[union-attr] # noqa + mgr.blocks[i].refs.add_reference(mgr.blocks[i]) # type: ignore[arg-type, union-attr] # noqa df = cons(mgr, copy=False) df.columns = columns return df.__finalize__(self, method="concat") diff --git a/pandas/tests/copy_view/test_constructors.py b/pandas/tests/copy_view/test_constructors.py index c04c733e5ee1d..f5805455e326f 100644 --- a/pandas/tests/copy_view/test_constructors.py +++ b/pandas/tests/copy_view/test_constructors.py @@ -18,7 +18,7 @@ def test_series_from_series(using_copy_on_write): assert np.shares_memory(ser.values, result.values) if using_copy_on_write: - assert result._mgr.refs is not None + assert result._mgr.blocks[0].refs.has_reference() if using_copy_on_write: # mutating new series copy doesn't mutate original @@ -72,4 +72,4 @@ def test_series_from_series_with_reindex(using_copy_on_write): result = Series(ser, index=[0, 1, 2, 3]) assert not np.shares_memory(ser.values, result.values) if using_copy_on_write: - assert result._mgr.refs is None or result._mgr.refs[0] is None + assert not result._mgr.blocks[0].refs.has_reference() diff --git a/pandas/tests/copy_view/test_core_functionalities.py b/pandas/tests/copy_view/test_core_functionalities.py new file mode 100644 index 0000000000000..204e26b35d680 --- /dev/null +++ b/pandas/tests/copy_view/test_core_functionalities.py @@ -0,0 +1,88 @@ +import numpy as np +import pytest + +from pandas import DataFrame +import pandas._testing as tm +from pandas.tests.copy_view.util import get_array + + +def test_assigning_to_same_variable_removes_references(using_copy_on_write): + df = DataFrame({"a": [1, 2, 3]}) + df = df.reset_index() + if using_copy_on_write: + assert df._mgr._has_no_reference(1) + arr = get_array(df, "a") + df.iloc[0, 1] = 100 # Write into a + + assert np.shares_memory(arr, get_array(df, "a")) + + +def test_setitem_dont_track_unnecessary_references(using_copy_on_write): + df = DataFrame({"a": [1, 2, 3], "b": 1, "c": 1}) + + df["b"] = 100 + arr = get_array(df, "a") + # We split the block in setitem, if we are not careful the new blocks will + # reference each other triggering a copy + df.iloc[0, 0] = 100 + assert np.shares_memory(arr, get_array(df, "a")) + + +def test_setitem_with_view_copies(using_copy_on_write): + df = DataFrame({"a": [1, 2, 3], "b": 1, "c": 1}) + view = df[:] + expected = df.copy() + + df["b"] = 100 + arr = get_array(df, "a") + df.iloc[0, 0] = 100 # Check that we correctly track reference + if using_copy_on_write: + assert not np.shares_memory(arr, get_array(df, "a")) + tm.assert_frame_equal(view, expected) + + +def test_setitem_with_view_invalidated_does_not_copy(using_copy_on_write, request): + df = DataFrame({"a": [1, 2, 3], "b": 1, "c": 1}) + view = df[:] + + df["b"] = 100 + arr = get_array(df, "a") + view = None # noqa + df.iloc[0, 0] = 100 + if using_copy_on_write: + # Setitem split the block. Since the old block shared data with view + # all the new blocks are referencing view and each other. When view + # goes out of scope, they don't share data with any other block, + # so we should not trigger a copy + mark = pytest.mark.xfail( + reason="blk.delete does not track references correctly" + ) + request.node.add_marker(mark) + assert np.shares_memory(arr, get_array(df, "a")) + + +def test_out_of_scope(using_copy_on_write): + def func(): + df = DataFrame({"a": [1, 2], "b": 1.5, "c": 1}) + # create some subset + result = df[["a", "b"]] + return result + + result = func() + if using_copy_on_write: + assert not result._mgr.blocks[0].refs.has_reference() + assert not result._mgr.blocks[1].refs.has_reference() + + +def test_delete(using_copy_on_write): + df = DataFrame(np.random.randn(4, 3), columns=["a", "b", "c"]) + del df["b"] + if using_copy_on_write: + # TODO: This should not have references, delete makes a shallow copy + # but keeps the blocks alive + assert df._mgr.blocks[0].refs.has_reference() + assert df._mgr.blocks[1].refs.has_reference() + + df = df[["a"]] + if using_copy_on_write: + assert not df._mgr.blocks[0].refs.has_reference() diff --git a/pandas/tests/copy_view/test_internals.py b/pandas/tests/copy_view/test_internals.py index 506ae7d3465c5..67022e533dbc4 100644 --- a/pandas/tests/copy_view/test_internals.py +++ b/pandas/tests/copy_view/test_internals.py @@ -20,52 +20,32 @@ def test_consolidate(using_copy_on_write): subset = df[:] # each block of subset references a block of df - assert subset._mgr.refs is not None and all( - ref is not None for ref in subset._mgr.refs - ) + assert all(blk.refs.has_reference() for blk in subset._mgr.blocks) # consolidate the two int64 blocks subset._consolidate_inplace() # the float64 block still references the parent one because it still a view - assert subset._mgr.refs[0] is not None + assert subset._mgr.blocks[0].refs.has_reference() # equivalent of assert np.shares_memory(df["b"].values, subset["b"].values) # but avoids caching df["b"] assert np.shares_memory(get_array(df, "b"), get_array(subset, "b")) # the new consolidated int64 block does not reference another - assert subset._mgr.refs[1] is None + assert not subset._mgr.blocks[1].refs.has_reference() # the parent dataframe now also only is linked for the float column - assert df._mgr._has_no_reference(0) - assert not df._mgr._has_no_reference(1) - assert df._mgr._has_no_reference(2) + assert not df._mgr.blocks[0].refs.has_reference() + assert df._mgr.blocks[1].refs.has_reference() + assert not df._mgr.blocks[2].refs.has_reference() # and modifying subset still doesn't modify parent if using_copy_on_write: subset.iloc[0, 1] = 0.0 - assert df._mgr._has_no_reference(1) + assert not df._mgr.blocks[1].refs.has_reference() assert df.loc[0, "b"] == 0.1 -@td.skip_array_manager_invalid_test -def test_clear_parent(using_copy_on_write): - # ensure to clear parent reference if we are no longer viewing data from parent - if not using_copy_on_write: - pytest.skip("test only relevant when using copy-on-write") - - df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]}) - subset = df[:] - assert subset._mgr.parent is not None - - # replacing existing columns loses the references to the parent df - subset["a"] = 0 - assert subset._mgr.parent is not None - # when losing the last reference, also the parent should be reset - subset["b"] = 0 - assert subset._mgr.parent is None - - @pytest.mark.single_cpu @td.skip_array_manager_invalid_test def test_switch_options(): diff --git a/pandas/tests/copy_view/test_methods.py b/pandas/tests/copy_view/test_methods.py index 26038725c544f..b814b9089aabd 100644 --- a/pandas/tests/copy_view/test_methods.py +++ b/pandas/tests/copy_view/test_methods.py @@ -22,7 +22,8 @@ def test_copy(using_copy_on_write): # the deep copy doesn't share memory assert not np.shares_memory(get_array(df_copy, "a"), get_array(df, "a")) if using_copy_on_write: - assert df_copy._mgr.refs is None + assert not df_copy._mgr.blocks[0].refs.has_reference() + assert not df_copy._mgr.blocks[1].refs.has_reference() # mutating copy doesn't mutate original df_copy.iloc[0, 0] = 0 @@ -36,7 +37,8 @@ def test_copy_shallow(using_copy_on_write): # the shallow copy still shares memory assert np.shares_memory(get_array(df_copy, "a"), get_array(df, "a")) if using_copy_on_write: - assert df_copy._mgr.refs is not None + assert df_copy._mgr.blocks[0].refs.has_reference() + assert df_copy._mgr.blocks[1].refs.has_reference() if using_copy_on_write: # mutating shallow copy doesn't mutate original