forked from pandas-dev/pandas
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patharrow_parser_wrapper.py
164 lines (145 loc) · 5.67 KB
/
arrow_parser_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from __future__ import annotations
from pandas._typing import ReadBuffer
from pandas.compat._optional import import_optional_dependency
from pandas.core.dtypes.inference import is_integer
from pandas import (
DataFrame,
arrays,
get_option,
)
from pandas.io.parsers.base_parser import ParserBase
class ArrowParserWrapper(ParserBase):
"""
Wrapper for the pyarrow engine for read_csv()
"""
def __init__(self, src: ReadBuffer[bytes], **kwds) -> None:
super().__init__(kwds)
self.kwds = kwds
self.src = src
self._parse_kwds()
def _parse_kwds(self):
"""
Validates keywords before passing to pyarrow.
"""
encoding: str | None = self.kwds.get("encoding")
self.encoding = "utf-8" if encoding is None else encoding
self.usecols, self.usecols_dtype = self._validate_usecols_arg(
self.kwds["usecols"]
)
na_values = self.kwds["na_values"]
if isinstance(na_values, dict):
raise ValueError(
"The pyarrow engine doesn't support passing a dict for na_values"
)
self.na_values = list(self.kwds["na_values"])
def _get_pyarrow_options(self) -> None:
"""
Rename some arguments to pass to pyarrow
"""
mapping = {
"usecols": "include_columns",
"na_values": "null_values",
"escapechar": "escape_char",
"skip_blank_lines": "ignore_empty_lines",
}
for pandas_name, pyarrow_name in mapping.items():
if pandas_name in self.kwds and self.kwds.get(pandas_name) is not None:
self.kwds[pyarrow_name] = self.kwds.pop(pandas_name)
self.parse_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines")
}
self.convert_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("include_columns", "null_values", "true_values", "false_values")
}
self.read_options = {
"autogenerate_column_names": self.header is None,
"skip_rows": self.header
if self.header is not None
else self.kwds["skiprows"],
}
def _finalize_pandas_output(self, frame: DataFrame) -> DataFrame:
"""
Processes data read in based on kwargs.
Parameters
----------
frame: DataFrame
The DataFrame to process.
Returns
-------
DataFrame
The processed DataFrame.
"""
num_cols = len(frame.columns)
multi_index_named = True
if self.header is None:
if self.names is None:
if self.header is None:
self.names = range(num_cols)
if len(self.names) != num_cols:
# usecols is passed through to pyarrow, we only handle index col here
# The only way self.names is not the same length as number of cols is
# if we have int index_col. We should just pad the names(they will get
# removed anyways) to expected length then.
self.names = list(range(num_cols - len(self.names))) + self.names
multi_index_named = False
frame.columns = self.names
# we only need the frame not the names
frame.columns, frame = self._do_date_conversions(frame.columns, frame)
if self.index_col is not None:
for i, item in enumerate(self.index_col):
if is_integer(item):
self.index_col[i] = frame.columns[item]
else:
# String case
if item not in frame.columns:
raise ValueError(f"Index {item} invalid")
frame.set_index(self.index_col, drop=True, inplace=True)
# Clear names if headerless and no name given
if self.header is None and not multi_index_named:
frame.index.names = [None] * len(frame.index.names)
if self.kwds.get("dtype") is not None:
try:
frame = frame.astype(self.kwds.get("dtype"))
except TypeError as e:
# GH#44901 reraise to keep api consistent
raise ValueError(e)
return frame
def read(self) -> DataFrame:
"""
Reads the contents of a CSV file into a DataFrame and
processes it according to the kwargs passed in the
constructor.
Returns
-------
DataFrame
The DataFrame created from the CSV file.
"""
pyarrow_csv = import_optional_dependency("pyarrow.csv")
self._get_pyarrow_options()
table = pyarrow_csv.read_csv(
self.src,
read_options=pyarrow_csv.ReadOptions(**self.read_options),
parse_options=pyarrow_csv.ParseOptions(**self.parse_options),
convert_options=pyarrow_csv.ConvertOptions(**self.convert_options),
)
if (
self.kwds["use_nullable_dtypes"]
and get_option("io.nullable_backend") == "pyarrow"
):
frame = DataFrame(
{
col_name: arrays.ArrowExtensionArray(pa_col)
for col_name, pa_col in zip(table.column_names, table.itercolumns())
}
)
else:
frame = table.to_pandas()
return self._finalize_pandas_output(frame)