forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_arrow_utils.py
175 lines (138 loc) · 5.35 KB
/
_arrow_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from __future__ import annotations
import json
import warnings
import numpy as np
import pyarrow
from pandas._libs import lib
from pandas._libs.interval import _warning_interval
from pandas.errors import PerformanceWarning
from pandas.util._exceptions import find_stack_level
from pandas.core.arrays.interval import VALID_CLOSED
def fallback_performancewarning(version: str | None = None):
"""
Raise a PerformanceWarning for falling back to ExtensionArray's
non-pyarrow method
"""
msg = "Falling back on a non-pyarrow code path which may decrease performance."
if version is not None:
msg += f" Upgrade to pyarrow >={version} to possibly suppress this warning."
warnings.warn(msg, PerformanceWarning, stacklevel=find_stack_level())
def pyarrow_array_to_numpy_and_mask(arr, dtype: np.dtype):
"""
Convert a primitive pyarrow.Array to a numpy array and boolean mask based
on the buffers of the Array.
At the moment pyarrow.BooleanArray is not supported.
Parameters
----------
arr : pyarrow.Array
dtype : numpy.dtype
Returns
-------
(data, mask)
Tuple of two numpy arrays with the raw data (with specified dtype) and
a boolean mask (validity mask, so False means missing)
"""
dtype = np.dtype(dtype)
buflist = arr.buffers()
# Since Arrow buffers might contain padding and the data might be offset,
# the buffer gets sliced here before handing it to numpy.
# See also https://github.com/pandas-dev/pandas/issues/40896
offset = arr.offset * dtype.itemsize
length = len(arr) * dtype.itemsize
data_buf = buflist[1][offset : offset + length]
data = np.frombuffer(data_buf, dtype=dtype)
bitmask = buflist[0]
if bitmask is not None:
mask = pyarrow.BooleanArray.from_buffers(
pyarrow.bool_(), len(arr), [None, bitmask], offset=arr.offset
)
mask = np.asarray(mask)
else:
mask = np.ones(len(arr), dtype=bool)
return data, mask
class ArrowPeriodType(pyarrow.ExtensionType):
def __init__(self, freq) -> None:
# attributes need to be set first before calling
# super init (as that calls serialize)
self._freq = freq
pyarrow.ExtensionType.__init__(self, pyarrow.int64(), "pandas.period")
@property
def freq(self):
return self._freq
def __arrow_ext_serialize__(self):
metadata = {"freq": self.freq}
return json.dumps(metadata).encode()
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
metadata = json.loads(serialized.decode())
return ArrowPeriodType(metadata["freq"])
def __eq__(self, other):
if isinstance(other, pyarrow.BaseExtensionType):
return type(self) == type(other) and self.freq == other.freq
else:
return NotImplemented
def __hash__(self):
return hash((str(self), self.freq))
def to_pandas_dtype(self):
import pandas as pd
return pd.PeriodDtype(freq=self.freq)
# register the type with a dummy instance
_period_type = ArrowPeriodType("D")
pyarrow.register_extension_type(_period_type)
class ArrowIntervalType(pyarrow.ExtensionType):
def __init__(
self,
subtype,
inclusive: str | None = None,
closed: None | lib.NoDefault = lib.no_default,
) -> None:
# attributes need to be set first before calling
# super init (as that calls serialize)
inclusive, closed = _warning_interval(inclusive, closed)
assert inclusive in VALID_CLOSED
self._closed = inclusive
if not isinstance(subtype, pyarrow.DataType):
subtype = pyarrow.type_for_alias(str(subtype))
self._subtype = subtype
storage_type = pyarrow.struct([("left", subtype), ("right", subtype)])
pyarrow.ExtensionType.__init__(self, storage_type, "pandas.interval")
@property
def subtype(self):
return self._subtype
@property
def inclusive(self):
return self._closed
@property
def closed(self):
warnings.warn(
"Attribute `closed` is deprecated in favor of `inclusive`.",
FutureWarning,
stacklevel=find_stack_level(),
)
return self._closed
def __arrow_ext_serialize__(self):
metadata = {"subtype": str(self.subtype), "inclusive": self.inclusive}
return json.dumps(metadata).encode()
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
metadata = json.loads(serialized.decode())
subtype = pyarrow.type_for_alias(metadata["subtype"])
inclusive = metadata["inclusive"]
return ArrowIntervalType(subtype, inclusive)
def __eq__(self, other):
if isinstance(other, pyarrow.BaseExtensionType):
return (
type(self) == type(other)
and self.subtype == other.subtype
and self.inclusive == other.inclusive
)
else:
return NotImplemented
def __hash__(self):
return hash((str(self), str(self.subtype), self.inclusive))
def to_pandas_dtype(self):
import pandas as pd
return pd.IntervalDtype(self.subtype.to_pandas_dtype(), self.inclusive)
# register the type with a dummy instance
_interval_type = ArrowIntervalType(pyarrow.int64(), "left")
pyarrow.register_extension_type(_interval_type)