forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharrow_parser_wrapper.py
227 lines (197 loc) · 8.21 KB
/
arrow_parser_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
from __future__ import annotations
from typing import TYPE_CHECKING
from pandas._config import using_pyarrow_string_dtype
from pandas._libs import lib
from pandas.compat._optional import import_optional_dependency
from pandas.core.dtypes.inference import is_integer
import pandas as pd
from pandas import DataFrame
from pandas.io._util import (
_arrow_dtype_mapping,
arrow_string_types_mapper,
)
from pandas.io.parsers.base_parser import ParserBase
if TYPE_CHECKING:
from pandas._typing import ReadBuffer
class ArrowParserWrapper(ParserBase):
"""
Wrapper for the pyarrow engine for read_csv()
"""
def __init__(self, src: ReadBuffer[bytes], **kwds) -> None:
super().__init__(kwds)
self.kwds = kwds
self.src = src
self._parse_kwds()
def _parse_kwds(self):
"""
Validates keywords before passing to pyarrow.
"""
encoding: str | None = self.kwds.get("encoding")
self.encoding = "utf-8" if encoding is None else encoding
na_values = self.kwds["na_values"]
if isinstance(na_values, dict):
raise ValueError(
"The pyarrow engine doesn't support passing a dict for na_values"
)
self.na_values = list(self.kwds["na_values"])
def _get_pyarrow_options(self) -> None:
"""
Rename some arguments to pass to pyarrow
"""
mapping = {
"usecols": "include_columns",
"na_values": "null_values",
"escapechar": "escape_char",
"skip_blank_lines": "ignore_empty_lines",
"decimal": "decimal_point",
}
for pandas_name, pyarrow_name in mapping.items():
if pandas_name in self.kwds and self.kwds.get(pandas_name) is not None:
self.kwds[pyarrow_name] = self.kwds.pop(pandas_name)
# Date format handling
# If we get a string, we need to convert it into a list for pyarrow
# If we get a dict, we want to parse those separately
date_format = self.date_format
if isinstance(date_format, str):
date_format = [date_format]
else:
# In case of dict, we don't want to propagate through, so
# just set to pyarrow default of None
# Ideally, in future we disable pyarrow dtype inference (read in as string)
# to prevent misreads.
date_format = None
self.kwds["timestamp_parsers"] = date_format
self.parse_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines")
}
self.convert_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in (
"include_columns",
"null_values",
"true_values",
"false_values",
"decimal_point",
"timestamp_parsers",
)
}
self.convert_options["strings_can_be_null"] = "" in self.kwds["null_values"]
self.read_options = {
"autogenerate_column_names": self.header is None,
"skip_rows": self.header
if self.header is not None
else self.kwds["skiprows"],
"encoding": self.encoding,
}
def _finalize_pandas_output(self, frame: DataFrame) -> DataFrame:
"""
Processes data read in based on kwargs.
Parameters
----------
frame: DataFrame
The DataFrame to process.
Returns
-------
DataFrame
The processed DataFrame.
"""
num_cols = len(frame.columns)
multi_index_named = True
if self.header is None:
if self.names is None:
if self.header is None:
self.names = range(num_cols)
if len(self.names) != num_cols:
# usecols is passed through to pyarrow, we only handle index col here
# The only way self.names is not the same length as number of cols is
# if we have int index_col. We should just pad the names(they will get
# removed anyways) to expected length then.
self.names = list(range(num_cols - len(self.names))) + self.names
multi_index_named = False
frame.columns = self.names
# we only need the frame not the names
_, frame = self._do_date_conversions(frame.columns, frame)
if self.index_col is not None:
index_to_set = self.index_col.copy()
for i, item in enumerate(self.index_col):
if is_integer(item):
index_to_set[i] = frame.columns[item]
# String case
elif item not in frame.columns:
raise ValueError(f"Index {item} invalid")
# Process dtype for index_col and drop from dtypes
if self.dtype is not None:
key, new_dtype = (
(item, self.dtype.get(item))
if self.dtype.get(item) is not None
else (frame.columns[item], self.dtype.get(frame.columns[item]))
)
if new_dtype is not None:
frame[key] = frame[key].astype(new_dtype)
del self.dtype[key]
frame.set_index(index_to_set, drop=True, inplace=True)
# Clear names if headerless and no name given
if self.header is None and not multi_index_named:
frame.index.names = [None] * len(frame.index.names)
if self.dtype is not None:
# Ignore non-existent columns from dtype mapping
# like other parsers do
if isinstance(self.dtype, dict):
self.dtype = {k: v for k, v in self.dtype.items() if k in frame.columns}
try:
frame = frame.astype(self.dtype)
except TypeError as e:
# GH#44901 reraise to keep api consistent
raise ValueError(e)
return frame
def read(self) -> DataFrame:
"""
Reads the contents of a CSV file into a DataFrame and
processes it according to the kwargs passed in the
constructor.
Returns
-------
DataFrame
The DataFrame created from the CSV file.
"""
pa = import_optional_dependency("pyarrow")
pyarrow_csv = import_optional_dependency("pyarrow.csv")
self._get_pyarrow_options()
table = pyarrow_csv.read_csv(
self.src,
read_options=pyarrow_csv.ReadOptions(**self.read_options),
parse_options=pyarrow_csv.ParseOptions(**self.parse_options),
convert_options=pyarrow_csv.ConvertOptions(**self.convert_options),
)
dtype_backend = self.kwds["dtype_backend"]
# Convert all pa.null() cols -> float64 (non nullable)
# else Int64 (nullable case, see below)
if dtype_backend is lib.no_default:
new_schema = table.schema
new_type = pa.float64()
for i, arrow_type in enumerate(table.schema.types):
if pa.types.is_null(arrow_type):
new_schema = new_schema.set(
i, new_schema.field(i).with_type(new_type)
)
table = table.cast(new_schema)
if dtype_backend == "pyarrow":
frame = table.to_pandas(types_mapper=pd.ArrowDtype)
elif dtype_backend == "numpy_nullable":
# Modify the default mapping to also
# map null to Int64 (to match other engines)
dtype_mapping = _arrow_dtype_mapping()
dtype_mapping[pa.null()] = pd.Int64Dtype()
frame = table.to_pandas(types_mapper=dtype_mapping.get)
elif using_pyarrow_string_dtype():
frame = table.to_pandas(types_mapper=arrow_string_types_mapper())
else:
frame = table.to_pandas()
return self._finalize_pandas_output(frame)