Skip to content

ENH: Add Arrow CSV Reader #43072

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 27, 2021
4 changes: 2 additions & 2 deletions asv_bench/benchmarks/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def time_read_csv(self, bad_date_value):
class ReadCSVSkipRows(BaseIO):

fname = "__test__.csv"
params = ([None, 10000], ["c", "python"])
params = ([None, 10000], ["c", "python", "pyarrow"])
param_names = ["skiprows", "engine"]

def setup(self, skiprows, engine):
Expand Down Expand Up @@ -320,7 +320,7 @@ def time_read_csv_python_engine(self, sep, decimal, float_precision):


class ReadCSVEngine(StringIORewind):
params = ["c", "python"]
params = ["c", "python", "pyarrow"]
param_names = ["engine"]

def setup(self, engine):
Expand Down
54 changes: 46 additions & 8 deletions doc/source/user_guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,15 @@ dtype : Type name or dict of column -> type, default ``None``
(unsupported with ``engine='python'``). Use ``str`` or ``object`` together
with suitable ``na_values`` settings to preserve and
not interpret dtype.
engine : {``'c'``, ``'python'``}
Parser engine to use. The C engine is faster while the Python engine is
currently more feature-complete.
engine : {``'c'``, ``'python'``, ``'pyarrow'``}
Parser engine to use. The C and pyarrow engines are faster, while the python engine
is currently more feature-complete. Multithreading is currently only supported by
the pyarrow engine.

.. versionadded:: 1.4.0

The "pyarrow" engine was added as an *experimental* engine, and some features
are unsupported, or may not work correctly, with this engine.
converters : dict, default ``None``
Dict of functions for converting values in certain columns. Keys can either be
integers or column labels.
Expand Down Expand Up @@ -1622,11 +1628,17 @@ Specifying ``iterator=True`` will also return the ``TextFileReader`` object:
Specifying the parser engine
''''''''''''''''''''''''''''

Under the hood pandas uses a fast and efficient parser implemented in C as well
as a Python implementation which is currently more feature-complete. Where
possible pandas uses the C parser (specified as ``engine='c'``), but may fall
back to Python if C-unsupported options are specified. Currently, C-unsupported
options include:
Pandas currently supports three engines, the C engine, the python engine, and an experimental
pyarrow engine (requires the ``pyarrow`` package). In general, the pyarrow engine is fastest
on larger workloads and is equivalent in speed to the C engine on most other workloads.
The python engine tends to be slower than the pyarrow and C engines on most workloads. However,
the pyarrow engine is much less robust than the C engine, which lacks a few features compared to the
Python engine.

Where possible, pandas uses the C parser (specified as ``engine='c'``), but it may fall
back to Python if C-unsupported options are specified.

Currently, options unsupported by the C and pyarrow engines include:

* ``sep`` other than a single character (e.g. regex separators)
* ``skipfooter``
Expand All @@ -1635,6 +1647,32 @@ options include:
Specifying any of the above options will produce a ``ParserWarning`` unless the
python engine is selected explicitly using ``engine='python'``.

Options that are unsupported by the pyarrow engine which are not covered by the list above include:

* ``float_precision``
* ``chunksize``
* ``comment``
* ``nrows``
* ``thousands``
* ``memory_map``
* ``dialect``
* ``warn_bad_lines``
* ``error_bad_lines``
* ``on_bad_lines``
* ``delim_whitespace``
* ``quoting``
* ``lineterminator``
* ``converters``
* ``decimal``
* ``iterator``
* ``dayfirst``
* ``infer_datetime_format``
* ``verbose``
* ``skipinitialspace``
* ``low_memory``

Specifying these options with ``engine='pyarrow'`` will raise a ``ValueError``.

.. _io.remote:

Reading/writing remote files
Expand Down
9 changes: 6 additions & 3 deletions doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ Styler

There are also bug fixes and deprecations listed below.

.. _whatsnew_140.enhancements.enhancement2:
.. _whatsnew_140.enhancements.pyarrow_csv_engine:

enhancement2
^^^^^^^^^^^^
Multithreaded CSV reading with a new CSV Engine based on pyarrow
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

:func:`pandas.read_csv` now accepts ``engine="pyarrow"`` (requires at least ``pyarrow`` 0.17.0) as an argument, allowing for faster csv parsing on multicore machines
with pyarrow installed. See the :doc:`I/O docs </user_guide/io>` for more info. (:issue:`23697`)

.. _whatsnew_140.enhancements.other:

Expand Down
138 changes: 138 additions & 0 deletions pandas/io/parsers/arrow_parser_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from __future__ import annotations

from pandas._typing import FilePathOrBuffer
from pandas.compat._optional import import_optional_dependency

from pandas.core.dtypes.inference import is_integer

from pandas.core.frame import DataFrame

from pandas.io.common import get_handle
from pandas.io.parsers.base_parser import ParserBase


class ArrowParserWrapper(ParserBase):
"""
Wrapper for the pyarrow engine for read_csv()
"""

def __init__(self, src: FilePathOrBuffer, **kwds):
self.kwds = kwds
self.src = src

ParserBase.__init__(self, kwds)

self._parse_kwds()

def _parse_kwds(self):
"""
Validates keywords before passing to pyarrow.
"""
encoding: str | None = self.kwds.get("encoding")
self.encoding = "utf-8" if encoding is None else encoding

self.usecols, self.usecols_dtype = self._validate_usecols_arg(
self.kwds["usecols"]
)
na_values = self.kwds["na_values"]
if isinstance(na_values, dict):
raise ValueError(
"The pyarrow engine doesn't support passing a dict for na_values"
)
self.na_values = list(self.kwds["na_values"])

def _get_pyarrow_options(self):
"""
Rename some arguments to pass to pyarrow
"""
mapping = {
"usecols": "include_columns",
"na_values": "null_values",
"escapechar": "escape_char",
"skip_blank_lines": "ignore_empty_lines",
}
for pandas_name, pyarrow_name in mapping.items():
if pandas_name in self.kwds and self.kwds.get(pandas_name) is not None:
self.kwds[pyarrow_name] = self.kwds.pop(pandas_name)

self.parse_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines")
}
self.convert_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("include_columns", "null_values", "true_values", "false_values")
}
self.read_options = {
"autogenerate_column_names": self.header is None,
"skip_rows": self.header
if self.header is not None
else self.kwds["skiprows"],
}

def _finalize_output(self, frame: DataFrame) -> DataFrame:
"""
Processes data read in based on kwargs.

Parameters
----------
frame: DataFrame
The DataFrame to process.

Returns
-------
DataFrame
The processed DataFrame.
"""
num_cols = len(frame.columns)
if self.header is None:
if self.names is None:
if self.prefix is not None:
self.names = [f"{self.prefix}{i}" for i in range(num_cols)]
elif self.header is None:
self.names = range(num_cols)
frame.columns = self.names
# we only need the frame not the names
frame.columns, frame = self._do_date_conversions(frame.columns, frame)
if self.index_col is not None:
for i, item in enumerate(self.index_col):
if is_integer(item):
self.index_col[i] = frame.columns[item]
frame.set_index(self.index_col, drop=True, inplace=True)

if self.kwds.get("dtype") is not None:
frame = frame.astype(self.kwds.get("dtype"))
return frame

def read(self) -> DataFrame:
"""
Reads the contents of a CSV file into a DataFrame and
processes it according to the kwargs passed in the
constructor.

Returns
-------
DataFrame
The DataFrame created from the CSV file.
"""
pyarrow_csv = import_optional_dependency("pyarrow.csv")
self._get_pyarrow_options()

with get_handle(
self.src, "rb", encoding=self.encoding, is_text=False
) as handles:
table = pyarrow_csv.read_csv(
handles.handle,
read_options=pyarrow_csv.ReadOptions(**self.read_options),
parse_options=pyarrow_csv.ParseOptions(**self.parse_options),
convert_options=pyarrow_csv.ConvertOptions(**self.convert_options),
)

frame = table.to_pandas()
return self._finalize_output(frame)
Loading