forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathops.py
129 lines (99 loc) · 4.31 KB
/
ops.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from collections import namedtuple
from typing import TYPE_CHECKING, List, Tuple
import numpy as np
from pandas._typing import ArrayLike
if TYPE_CHECKING:
from pandas.core.internals.blocks import Block # noqa:F401
from pandas.core.internals.managers import BlockManager # noqa:F401
BlockPairInfo = namedtuple(
"BlockPairInfo", ["lvals", "rvals", "locs", "left_ea", "right_ea", "rblk"],
)
def _iter_block_pairs(left: "BlockManager", right: "BlockManager"):
# At this point we have already checked the parent DataFrames for
# assert rframe._indexed_same(lframe)
for n, blk in enumerate(left.blocks):
locs = blk.mgr_locs
blk_vals = blk.values
left_ea = not isinstance(blk_vals, np.ndarray)
rblks = right._slice_take_blocks_ax0(locs.indexer, only_slice=True)
# Assertions are disabled for performance, but should hold:
# if left_ea:
# assert len(locs) == 1, locs
# assert len(rblks) == 1, rblks
# assert rblks[0].shape[0] == 1, rblks[0].shape
for k, rblk in enumerate(rblks):
right_ea = not isinstance(rblk.values, np.ndarray)
lvals, rvals = _get_same_shape_values(blk, rblk, left_ea, right_ea)
info = BlockPairInfo(lvals, rvals, locs, left_ea, right_ea, rblk)
yield info
def operate_blockwise(
left: "BlockManager", right: "BlockManager", array_op
) -> "BlockManager":
# At this point we have already checked the parent DataFrames for
# assert rframe._indexed_same(lframe)
res_blks: List["Block"] = []
for lvals, rvals, locs, left_ea, right_ea, rblk in _iter_block_pairs(left, right):
res_values = array_op(lvals, rvals)
if left_ea and not right_ea and hasattr(res_values, "reshape"):
res_values = res_values.reshape(1, -1)
nbs = rblk._split_op_result(res_values)
# Assertions are disabled for performance, but should hold:
# if right_ea or left_ea:
# assert len(nbs) == 1
# else:
# assert res_values.shape == lvals.shape, (res_values.shape, lvals.shape)
_reset_block_mgr_locs(nbs, locs)
res_blks.extend(nbs)
# Assertions are disabled for performance, but should hold:
# slocs = {y for nb in res_blks for y in nb.mgr_locs.as_array}
# nlocs = sum(len(nb.mgr_locs.as_array) for nb in res_blks)
# assert nlocs == len(left.items), (nlocs, len(left.items))
# assert len(slocs) == nlocs, (len(slocs), nlocs)
# assert slocs == set(range(nlocs)), slocs
new_mgr = type(right)(res_blks, axes=right.axes, do_integrity_check=False)
return new_mgr
def _reset_block_mgr_locs(nbs: List["Block"], locs):
"""
Reset mgr_locs to correspond to our original DataFrame.
"""
for nb in nbs:
nblocs = locs.as_array[nb.mgr_locs.indexer]
nb.mgr_locs = nblocs
# Assertions are disabled for performance, but should hold:
# assert len(nblocs) == nb.shape[0], (len(nblocs), nb.shape)
# assert all(x in locs.as_array for x in nb.mgr_locs.as_array)
def _get_same_shape_values(
lblk: "Block", rblk: "Block", left_ea: bool, right_ea: bool
) -> Tuple[ArrayLike, ArrayLike]:
"""
Slice lblk.values to align with rblk. Squeeze if we have EAs.
"""
lvals = lblk.values
rvals = rblk.values
# Require that the indexing into lvals be slice-like
assert rblk.mgr_locs.is_slice_like, rblk.mgr_locs
# TODO(EA2D): with 2D EAs only this first clause would be needed
if not (left_ea or right_ea):
lvals = lvals[rblk.mgr_locs.indexer, :]
assert lvals.shape == rvals.shape, (lvals.shape, rvals.shape)
elif left_ea and right_ea:
assert lvals.shape == rvals.shape, (lvals.shape, rvals.shape)
elif right_ea:
# lvals are 2D, rvals are 1D
lvals = lvals[rblk.mgr_locs.indexer, :]
assert lvals.shape[0] == 1, lvals.shape
lvals = lvals[0, :]
else:
# lvals are 1D, rvals are 2D
assert rvals.shape[0] == 1, rvals.shape
rvals = rvals[0, :]
return lvals, rvals
def blockwise_all(left: "BlockManager", right: "BlockManager", op) -> bool:
"""
Blockwise `all` reduction.
"""
for info in _iter_block_pairs(left, right):
res = op(info.lvals, info.rvals)
if not res:
return False
return True