Skip to content

REF: reindex_indexer use np.void to avoid JoinUnit #43376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions pandas/core/internals/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ def concatenate_managers(
if isinstance(mgrs_indexers[0][0], ArrayManager):
return _concatenate_array_managers(mgrs_indexers, axes, concat_axis, copy)

mgrs_indexers = _maybe_reindex_columns_na_proxy(axes, mgrs_indexers)

concat_plans = [
_get_mgr_concatenation_plan(mgr, indexers) for mgr, indexers in mgrs_indexers
]
Expand Down Expand Up @@ -245,6 +247,38 @@ def concatenate_managers(
return BlockManager(tuple(blocks), axes)


def _maybe_reindex_columns_na_proxy(
axes: list[Index], mgrs_indexers: list[tuple[BlockManager, dict[int, np.ndarray]]]
) -> list[tuple[BlockManager, dict[int, np.ndarray]]]:
"""
Reindex along columns so that all of the BlockManagers being concatenated
have matching columns.

Columns added in this reindexing have dtype=np.void, indicating they
should be ignored when choosing a column's final dtype.
"""
new_mgrs_indexers = []
for mgr, indexers in mgrs_indexers:
# We only reindex for axis=0 (i.e. columns), as this can be done cheaply
if 0 in indexers:
new_mgr = mgr.reindex_indexer(
axes[0],
indexers[0],
axis=0,
copy=False,
only_slice=True,
allow_dups=True,
use_na_proxy=True,
)
new_indexers = indexers.copy()
del new_indexers[0]
new_mgrs_indexers.append((new_mgr, new_indexers))
else:
new_mgrs_indexers.append((mgr, indexers))

return new_mgrs_indexers


def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarray]):
"""
Construct concatenation plan for given block manager and indexers.
Expand Down Expand Up @@ -375,6 +409,8 @@ def _is_valid_na_for(self, dtype: DtypeObj) -> bool:
return False
if self.block is None:
return True
if self.block.dtype.kind == "V":
return True

if self.dtype == object:
values = self.block.values
Expand All @@ -401,6 +437,8 @@ def is_na(self) -> bool:
blk = self.block
if blk is None:
return True
if blk.dtype.kind == "V":
return True

if not blk._can_hold_na:
return False
Expand All @@ -426,7 +464,7 @@ def is_na(self) -> bool:
return all(isna_all(row) for row in values)

def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike:
if upcasted_na is None:
if upcasted_na is None and self.block.dtype.kind != "V":
# No upcasting is necessary
fill_value = self.block.fill_value
values = self.block.get_values()
Expand All @@ -435,6 +473,7 @@ def get_reindexed_values(self, empty_dtype: DtypeObj, upcasted_na) -> ArrayLike:

if self._is_valid_na_for(empty_dtype):
# note: always holds when self.block is None
# or self.block.dtype.kind == "V"
blk_dtype = getattr(self.block, "dtype", None)

if blk_dtype == np.dtype("object"):
Expand Down Expand Up @@ -512,7 +551,9 @@ def _concatenate_join_units(

empty_dtype = _get_empty_dtype(join_units)

has_none_blocks = any(unit.block is None for unit in join_units)
has_none_blocks = any(
unit.block is None or unit.block.dtype.kind == "V" for unit in join_units
)
upcasted_na = _dtype_to_na_value(empty_dtype, has_none_blocks)

to_concat = [
Expand Down Expand Up @@ -597,13 +638,19 @@ def _get_empty_dtype(join_units: Sequence[JoinUnit]) -> DtypeObj:
empty_dtype = join_units[0].block.dtype
return empty_dtype

has_none_blocks = any(unit.block is None for unit in join_units)
has_none_blocks = any(
unit.block is None or unit.block.dtype.kind == "V" for unit in join_units
)

dtypes = [
unit.dtype for unit in join_units if unit.block is not None and not unit.is_na
]
if not len(dtypes):
dtypes = [unit.dtype for unit in join_units if unit.block is not None]
dtypes = [
unit.dtype
for unit in join_units
if unit.block is not None and unit.block.dtype.kind != "V"
]

dtype = find_common_type(dtypes)
if has_none_blocks:
Expand All @@ -619,7 +666,7 @@ def _is_uniform_join_units(join_units: list[JoinUnit]) -> bool:

"""
first = join_units[0].block
if first is None:
if first is None or first.dtype.kind == "V":
return False
return (
# exclude cases where a) ju.block is None or b) we have e.g. Int64+int64
Expand Down
31 changes: 28 additions & 3 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from pandas.core.internals.blocks import (
Block,
DatetimeTZBlock,
NumpyBlock,
ensure_block_shape,
extend_blocks,
get_block_type,
Expand Down Expand Up @@ -613,6 +614,8 @@ def reindex_indexer(
copy: bool = True,
consolidate: bool = True,
only_slice: bool = False,
*,
use_na_proxy: bool = False,
) -> T:
"""
Parameters
Expand All @@ -627,6 +630,8 @@ def reindex_indexer(
Whether to consolidate inplace before reindexing.
only_slice : bool, default False
Whether to take views, not copies, along columns.
use_na_proxy : bool, default False
Whether to use a np.void ndarray for newly introduced columns.

pandas-indexer with -1's only.
"""
Expand All @@ -651,7 +656,10 @@ def reindex_indexer(

if axis == 0:
new_blocks = self._slice_take_blocks_ax0(
indexer, fill_value=fill_value, only_slice=only_slice
indexer,
fill_value=fill_value,
only_slice=only_slice,
use_na_proxy=use_na_proxy,
)
else:
new_blocks = [
Expand All @@ -675,6 +683,8 @@ def _slice_take_blocks_ax0(
slice_or_indexer: slice | np.ndarray,
fill_value=lib.no_default,
only_slice: bool = False,
*,
use_na_proxy: bool = False,
) -> list[Block]:
"""
Slice/take blocks along axis=0.
Expand All @@ -688,6 +698,8 @@ def _slice_take_blocks_ax0(
only_slice : bool, default False
If True, we always return views on existing arrays, never copies.
This is used when called from ops.blockwise.operate_blockwise.
use_na_proxy : bool, default False
Whether to use a np.void ndarray for newly introduced columns.

Returns
-------
Expand Down Expand Up @@ -756,7 +768,11 @@ def _slice_take_blocks_ax0(
# If we've got here, fill_value was not lib.no_default

blocks.append(
self._make_na_block(placement=mgr_locs, fill_value=fill_value)
self._make_na_block(
placement=mgr_locs,
fill_value=fill_value,
use_na_proxy=use_na_proxy,
)
)
else:
blk = self.blocks[blkno]
Expand Down Expand Up @@ -798,7 +814,16 @@ def _slice_take_blocks_ax0(

return blocks

def _make_na_block(self, placement: BlockPlacement, fill_value=None) -> Block:
def _make_na_block(
self, placement: BlockPlacement, fill_value=None, use_na_proxy: bool = False
) -> Block:

if use_na_proxy:
assert fill_value is None
shape = (len(placement), self.shape[1])
vals = np.empty(shape, dtype=np.void)
nb = NumpyBlock(vals, placement, ndim=2)
return nb

if fill_value is None:
fill_value = np.nan
Expand Down