forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathquantile.py
190 lines (160 loc) · 5.03 KB
/
quantile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from __future__ import annotations
import numpy as np
from pandas._typing import (
ArrayLike,
Scalar,
npt,
)
from pandas.compat.numpy import np_percentile_argname
from pandas.core.dtypes.missing import (
isna,
na_value_for_dtype,
)
def quantile_compat(
values: ArrayLike, qs: npt.NDArray[np.float64], interpolation: str
) -> ArrayLike:
"""
Compute the quantiles of the given values for each quantile in `qs`.
Parameters
----------
values : np.ndarray or ExtensionArray
qs : np.ndarray[float64]
interpolation : str
Returns
-------
np.ndarray or ExtensionArray
"""
if isinstance(values, np.ndarray):
fill_value = na_value_for_dtype(values.dtype, compat=False)
mask = isna(values)
return quantile_with_mask(values, mask, fill_value, qs, interpolation)
else:
return values._quantile(qs, interpolation)
def quantile_with_mask(
values: np.ndarray,
mask: npt.NDArray[np.bool_],
fill_value,
qs: npt.NDArray[np.float64],
interpolation: str,
) -> np.ndarray:
"""
Compute the quantiles of the given values for each quantile in `qs`.
Parameters
----------
values : np.ndarray
For ExtensionArray, this is _values_for_factorize()[0]
mask : np.ndarray[bool]
mask = isna(values)
For ExtensionArray, this is computed before calling _value_for_factorize
fill_value : Scalar
The value to interpret fill NA entries with
For ExtensionArray, this is _values_for_factorize()[1]
qs : np.ndarray[float64]
interpolation : str
Type of interpolation
Returns
-------
np.ndarray
Notes
-----
Assumes values is already 2D. For ExtensionArray this means np.atleast_2d
has been called on _values_for_factorize()[0]
Quantile is computed along axis=1.
"""
assert values.ndim == 2
is_empty = values.shape[1] == 0
if is_empty:
# create the array of na_values
# 2d len(values) * len(qs)
flat = np.array([fill_value] * len(qs))
result = np.repeat(flat, len(values)).reshape(len(values), len(qs))
else:
result = _nanpercentile(
values,
qs * 100.0,
na_value=fill_value,
mask=mask,
interpolation=interpolation,
)
result = np.array(result, copy=False)
result = result.T
return result
def _nanpercentile_1d(
values: np.ndarray,
mask: npt.NDArray[np.bool_],
qs: npt.NDArray[np.float64],
na_value: Scalar,
interpolation,
) -> Scalar | np.ndarray:
"""
Wrapper for np.percentile that skips missing values, specialized to
1-dimensional case.
Parameters
----------
values : array over which to find quantiles
mask : ndarray[bool]
locations in values that should be considered missing
qs : np.ndarray[float64] of quantile indices to find
na_value : scalar
value to return for empty or all-null values
interpolation : str
Returns
-------
quantiles : scalar or array
"""
# mask is Union[ExtensionArray, ndarray]
values = values[~mask]
if len(values) == 0:
# Can't pass dtype=values.dtype here bc we might have na_value=np.nan
# with values.dtype=int64 see test_quantile_empty
# equiv: 'np.array([na_value] * len(qs))' but much faster
return np.full(len(qs), na_value)
return np.percentile(values, qs, **{np_percentile_argname: interpolation})
def _nanpercentile(
values: np.ndarray,
qs: npt.NDArray[np.float64],
*,
na_value,
mask: npt.NDArray[np.bool_],
interpolation,
):
"""
Wrapper for np.percentile that skips missing values.
Parameters
----------
values : np.ndarray[ndim=2] over which to find quantiles
qs : np.ndarray[float64] of quantile indices to find
na_value : scalar
value to return for empty or all-null values
mask : np.ndarray[bool]
locations in values that should be considered missing
interpolation : str
Returns
-------
quantiles : scalar or array
"""
if values.dtype.kind in ["m", "M"]:
# need to cast to integer to avoid rounding errors in numpy
result = _nanpercentile(
values.view("i8"),
qs=qs,
na_value=na_value.view("i8"),
mask=mask,
interpolation=interpolation,
)
# Note: we have to do `astype` and not view because in general we
# have float result at this point, not i8
return result.astype(values.dtype)
if mask.any():
# Caller is responsible for ensuring mask shape match
assert mask.shape == values.shape
result = [
_nanpercentile_1d(val, m, qs, na_value, interpolation=interpolation)
for (val, m) in zip(list(values), list(mask))
]
result = np.array(result, dtype=values.dtype, copy=False).T
return result
else:
return np.percentile(
values, qs, axis=1, **{np_percentile_argname: interpolation}
)