forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_ranges.py
196 lines (167 loc) · 6.67 KB
/
_ranges.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""
Helper functions to generate range-like data for DatetimeArray
(and possibly TimedeltaArray/PeriodArray)
"""
from __future__ import annotations
from typing import Literal
import numpy as np
from pandas._libs.lib import i8max
from pandas._libs.tslibs import (
BaseOffset,
OutOfBoundsDatetime,
Timedelta,
Timestamp,
iNaT,
)
def generate_regular_range(
start: Timestamp | Timedelta,
end: Timestamp | Timedelta,
periods: int,
freq: BaseOffset,
):
"""
Generate a range of dates or timestamps with the spans between dates
described by the given `freq` DateOffset.
Parameters
----------
start : Timedelta, Timestamp or None
First point of produced date range.
end : Timedelta, Timestamp or None
Last point of produced date range.
periods : int
Number of periods in produced date range.
freq : Tick
Describes space between dates in produced date range.
Returns
-------
ndarray[np.int64] Representing nanoseconds.
"""
istart = start.value if start is not None else None
iend = end.value if end is not None else None
stride = freq.nanos
if periods is None:
b = istart
# cannot just use e = Timestamp(end) + 1 because arange breaks when
# stride is too large, see GH10887
e = b + (iend - b) // stride * stride + stride // 2 + 1
elif istart is not None:
b = istart
e = _generate_range_overflow_safe(b, periods, stride, side="start")
elif iend is not None:
e = iend + stride
b = _generate_range_overflow_safe(e, periods, stride, side="end")
else:
raise ValueError(
"at least 'start' or 'end' should be specified if a 'period' is given."
)
with np.errstate(over="raise"):
# If the range is sufficiently large, np.arange may overflow
# and incorrectly return an empty array if not caught.
try:
values = np.arange(b, e, stride, dtype=np.int64)
except FloatingPointError:
xdr = [b]
while xdr[-1] != e:
xdr.append(xdr[-1] + stride)
values = np.array(xdr[:-1], dtype=np.int64)
return values
def _generate_range_overflow_safe(
endpoint: int, periods: int, stride: int, side: Literal["start", "end"] = "start"
) -> int:
"""
Calculate the second endpoint for passing to np.arange, checking
to avoid an integer overflow. Catch OverflowError and re-raise
as OutOfBoundsDatetime.
Parameters
----------
endpoint : int
nanosecond timestamp of the known endpoint of the desired range
periods : int
number of periods in the desired range
stride : int
nanoseconds between periods in the desired range
side : {'start', 'end'}
which end of the range `endpoint` refers to
Returns
-------
other_end : int
Raises
------
OutOfBoundsDatetime
"""
# GH#14187 raise instead of incorrectly wrapping around
assert side in ["start", "end"]
i64max = np.uint64(i8max)
msg = f"Cannot generate range with {side}={endpoint} and periods={periods}"
with np.errstate(over="raise"):
# if periods * strides cannot be multiplied within the *uint64* bounds,
# we cannot salvage the operation by recursing, so raise
try:
addend = np.uint64(periods) * np.uint64(np.abs(stride))
except FloatingPointError as err:
raise OutOfBoundsDatetime(msg) from err
if np.abs(addend) <= i64max:
# relatively easy case without casting concerns
return _generate_range_overflow_safe_signed(endpoint, periods, stride, side)
elif (endpoint > 0 and side == "start" and stride > 0) or (
endpoint < 0 and side == "end" and stride > 0
):
# no chance of not-overflowing
raise OutOfBoundsDatetime(msg)
elif side == "end" and endpoint > i64max and endpoint - stride <= i64max:
# in _generate_regular_range we added `stride` thereby overflowing
# the bounds. Adjust to fix this.
return _generate_range_overflow_safe(
endpoint - stride, periods - 1, stride, side
)
# split into smaller pieces
mid_periods = periods // 2
remaining = periods - mid_periods
assert 0 < remaining < periods, (remaining, periods, endpoint, stride)
midpoint = _generate_range_overflow_safe(endpoint, mid_periods, stride, side)
return _generate_range_overflow_safe(midpoint, remaining, stride, side)
def _generate_range_overflow_safe_signed(
endpoint: int, periods: int, stride: int, side: Literal["start", "end"]
) -> int:
"""
A special case for _generate_range_overflow_safe where `periods * stride`
can be calculated without overflowing int64 bounds.
"""
assert side in ["start", "end"]
if side == "end":
stride *= -1
with np.errstate(over="raise"):
addend = np.int64(periods) * np.int64(stride)
try:
# easy case with no overflows
result = np.int64(endpoint) + addend
if result == iNaT:
# Putting this into a DatetimeArray/TimedeltaArray
# would incorrectly be interpreted as NaT
raise OverflowError
# error: Incompatible return value type (got "signedinteger[_64Bit]",
# expected "int")
return result # type: ignore[return-value]
except (FloatingPointError, OverflowError):
# with endpoint negative and addend positive we risk
# FloatingPointError; with reversed signed we risk OverflowError
pass
# if stride and endpoint had opposite signs, then endpoint + addend
# should never overflow. so they must have the same signs
assert (stride > 0 and endpoint >= 0) or (stride < 0 and endpoint <= 0)
if stride > 0:
# watch out for very special case in which we just slightly
# exceed implementation bounds, but when passing the result to
# np.arange will get a result slightly within the bounds
# error: Incompatible types in assignment (expression has type
# "unsignedinteger[_64Bit]", variable has type "signedinteger[_64Bit]")
result = np.uint64(endpoint) + np.uint64(addend) # type: ignore[assignment]
i64max = np.uint64(i8max)
assert result > i64max
if result <= i64max + np.uint64(stride):
# error: Incompatible return value type (got "unsignedinteger", expected
# "int")
return result # type: ignore[return-value]
raise OutOfBoundsDatetime(
f"Cannot generate range with {side}={endpoint} and periods={periods}"
)