From c1eb614fd081152dfb3edf2163c6f2e1d7cada92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Sun, 16 Jan 2022 13:10:49 -0500 Subject: [PATCH] Backport PR #45389: REG/REF: close file handles engine-independently in read_csv --- doc/source/whatsnew/v1.4.0.rst | 1 + pandas/_typing.py | 3 + pandas/io/parsers/arrow_parser_wrapper.py | 30 +++----- pandas/io/parsers/base_parser.py | 30 +------- pandas/io/parsers/c_parser_wrapper.py | 21 +---- pandas/io/parsers/python_parser.py | 35 +++------ pandas/io/parsers/readers.py | 77 +++++++++++++++---- .../io/parser/common/test_file_buffer_url.py | 8 +- pandas/tests/io/parser/test_unsupported.py | 20 +++++ 9 files changed, 118 insertions(+), 107 deletions(-) diff --git a/doc/source/whatsnew/v1.4.0.rst b/doc/source/whatsnew/v1.4.0.rst index 64d8e36177051..aae19906fbc41 100644 --- a/doc/source/whatsnew/v1.4.0.rst +++ b/doc/source/whatsnew/v1.4.0.rst @@ -874,6 +874,7 @@ I/O - Bug in :func:`read_csv` when passing a ``tempfile.SpooledTemporaryFile`` opened in binary mode (:issue:`44748`) - Bug in :func:`read_json` raising ``ValueError`` when attempting to parse json strings containing "://" (:issue:`36271`) - Bug in :func:`read_csv` when ``engine="c"`` and ``encoding_errors=None`` which caused a segfault (:issue:`45180`) +- Bug in :func:`read_csv` an invalid value of ``usecols`` leading to an un-closed file handle (:issue:`45384`) Period ^^^^^^ diff --git a/pandas/_typing.py b/pandas/_typing.py index 5c8f0be39f712..fd099b3897bab 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -292,3 +292,6 @@ def closed(self) -> bool: # Windowing rank methods WindowingRankType = Literal["average", "min", "max"] + +# read_csv engines +CSVEngine = Literal["c", "python", "pyarrow", "python-fwf"] diff --git a/pandas/io/parsers/arrow_parser_wrapper.py b/pandas/io/parsers/arrow_parser_wrapper.py index 96f7f9b1738b8..618da9d33c490 100644 --- a/pandas/io/parsers/arrow_parser_wrapper.py +++ b/pandas/io/parsers/arrow_parser_wrapper.py @@ -1,16 +1,12 @@ from __future__ import annotations -from pandas._typing import ( - FilePath, - ReadBuffer, -) +from pandas._typing import ReadBuffer from pandas.compat._optional import import_optional_dependency from pandas.core.dtypes.inference import is_integer from pandas.core.frame import DataFrame -from pandas.io.common import get_handle from pandas.io.parsers.base_parser import ParserBase @@ -19,12 +15,11 @@ class ArrowParserWrapper(ParserBase): Wrapper for the pyarrow engine for read_csv() """ - def __init__(self, src: FilePath | ReadBuffer[bytes], **kwds): + def __init__(self, src: ReadBuffer[bytes], **kwds): + super().__init__(kwds) self.kwds = kwds self.src = src - ParserBase.__init__(self, kwds) - self._parse_kwds() def _parse_kwds(self): @@ -151,15 +146,12 @@ def read(self) -> DataFrame: pyarrow_csv = import_optional_dependency("pyarrow.csv") self._get_pyarrow_options() - with get_handle( - self.src, "rb", encoding=self.encoding, is_text=False - ) as handles: - table = pyarrow_csv.read_csv( - handles.handle, - read_options=pyarrow_csv.ReadOptions(**self.read_options), - parse_options=pyarrow_csv.ParseOptions(**self.parse_options), - convert_options=pyarrow_csv.ConvertOptions(**self.convert_options), - ) + table = pyarrow_csv.read_csv( + self.src, + read_options=pyarrow_csv.ReadOptions(**self.read_options), + parse_options=pyarrow_csv.ParseOptions(**self.parse_options), + convert_options=pyarrow_csv.ConvertOptions(**self.convert_options), + ) - frame = table.to_pandas() - return self._finalize_output(frame) + frame = table.to_pandas() + return self._finalize_output(frame) diff --git a/pandas/io/parsers/base_parser.py b/pandas/io/parsers/base_parser.py index 318dd659d46bf..2a3a2b064207e 100644 --- a/pandas/io/parsers/base_parser.py +++ b/pandas/io/parsers/base_parser.py @@ -7,7 +7,6 @@ from enum import Enum import itertools from typing import ( - Any, Callable, DefaultDict, Hashable, @@ -32,8 +31,6 @@ from pandas._typing import ( ArrayLike, DtypeArg, - FilePath, - ReadCsvBuffer, ) from pandas.errors import ( ParserError, @@ -71,10 +68,6 @@ from pandas.core.series import Series from pandas.core.tools import datetimes as tools -from pandas.io.common import ( - IOHandles, - get_handle, -) from pandas.io.date_converters import generic_parser @@ -176,30 +169,10 @@ def __init__(self, kwds): self.usecols, self.usecols_dtype = self._validate_usecols_arg(kwds["usecols"]) - self.handles: IOHandles[str] | None = None - # Fallback to error to pass a sketchy test(test_override_set_noconvert_columns) # Normally, this arg would get pre-processed earlier on self.on_bad_lines = kwds.get("on_bad_lines", self.BadLineHandleMethod.ERROR) - def _open_handles( - self, - src: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str], - kwds: dict[str, Any], - ) -> None: - """ - Let the readers open IOHandles after they are done with their potential raises. - """ - self.handles = get_handle( - src, - "r", - encoding=kwds.get("encoding", None), - compression=kwds.get("compression", None), - memory_map=kwds.get("memory_map", False), - storage_options=kwds.get("storage_options", None), - errors=kwds.get("encoding_errors", "strict"), - ) - def _validate_parse_dates_presence(self, columns: Sequence[Hashable]) -> Iterable: """ Check if parse_dates are in columns. @@ -262,8 +235,7 @@ def _validate_parse_dates_presence(self, columns: Sequence[Hashable]) -> Iterabl ] def close(self): - if self.handles is not None: - self.handles.close() + pass @final @property diff --git a/pandas/io/parsers/c_parser_wrapper.py b/pandas/io/parsers/c_parser_wrapper.py index 4088bf8d854bc..fc0f572c79e6b 100644 --- a/pandas/io/parsers/c_parser_wrapper.py +++ b/pandas/io/parsers/c_parser_wrapper.py @@ -14,7 +14,6 @@ ArrayLike, DtypeArg, DtypeObj, - FilePath, ReadCsvBuffer, ) from pandas.errors import DtypeWarning @@ -43,12 +42,10 @@ class CParserWrapper(ParserBase): low_memory: bool _reader: parsers.TextReader - def __init__( - self, src: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str], **kwds - ): + def __init__(self, src: ReadCsvBuffer[str], **kwds): + super().__init__(kwds) self.kwds = kwds kwds = kwds.copy() - ParserBase.__init__(self, kwds) self.low_memory = kwds.pop("low_memory", False) @@ -61,10 +58,6 @@ def __init__( # GH20529, validate usecol arg before TextReader kwds["usecols"] = self.usecols - # open handles - self._open_handles(src, kwds) - assert self.handles is not None - # Have to pass int, would break tests using TextReader directly otherwise :( kwds["on_bad_lines"] = self.on_bad_lines.value @@ -79,11 +72,7 @@ def __init__( kwds.pop(key, None) kwds["dtype"] = ensure_dtype_objs(kwds.get("dtype", None)) - try: - self._reader = parsers.TextReader(self.handles.handle, **kwds) - except Exception: - self.handles.close() - raise + self._reader = parsers.TextReader(src, **kwds) self.unnamed_cols = self._reader.unnamed_cols @@ -196,9 +185,7 @@ def __init__( self._implicit_index = self._reader.leading_cols > 0 def close(self) -> None: - super().close() - - # close additional handles opened by C parser + # close handles opened by C parser try: self._reader.close() except ValueError: diff --git a/pandas/io/parsers/python_parser.py b/pandas/io/parsers/python_parser.py index 55ad6be3100e7..52fa3be4ff418 100644 --- a/pandas/io/parsers/python_parser.py +++ b/pandas/io/parsers/python_parser.py @@ -25,7 +25,6 @@ import pandas._libs.lib as lib from pandas._typing import ( ArrayLike, - FilePath, ReadCsvBuffer, Scalar, ) @@ -51,13 +50,11 @@ class PythonParser(ParserBase): - def __init__( - self, f: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str] | list, **kwds - ): + def __init__(self, f: ReadCsvBuffer[str] | list, **kwds): """ Workhorse function for processing nested list into DataFrame """ - ParserBase.__init__(self, kwds) + super().__init__(kwds) self.data: Iterator[str] | None = None self.buf: list = [] @@ -104,28 +101,18 @@ def __init__( # read_excel: f is a list self.data = cast(Iterator[str], f) else: - self._open_handles(f, kwds) - assert self.handles is not None - assert hasattr(self.handles.handle, "readline") - try: - self._make_reader(self.handles.handle) - except (csv.Error, UnicodeDecodeError): - self.close() - raise + assert hasattr(f, "readline") + self._make_reader(f) # Get columns in two steps: infer from data, then # infer column indices from self.usecols if it is specified. self._col_indices: list[int] | None = None columns: list[list[Scalar | None]] - try: - ( - columns, - self.num_original_columns, - self.unnamed_cols, - ) = self._infer_columns() - except (TypeError, ValueError): - self.close() - raise + ( + columns, + self.num_original_columns, + self.unnamed_cols, + ) = self._infer_columns() # Now self.columns has the set of columns that we will process. # The original set is stored in self.original_columns. @@ -1259,9 +1246,7 @@ class FixedWidthFieldParser(PythonParser): See PythonParser for details. """ - def __init__( - self, f: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str], **kwds - ) -> None: + def __init__(self, f: ReadCsvBuffer[str], **kwds) -> None: # Support iterators, convert to a list. self.colspecs = kwds.pop("colspecs") self.infer_nrows = kwds.pop("infer_nrows") diff --git a/pandas/io/parsers/readers.py b/pandas/io/parsers/readers.py index c417a511a1d81..6d60f94e89291 100644 --- a/pandas/io/parsers/readers.py +++ b/pandas/io/parsers/readers.py @@ -8,6 +8,7 @@ import sys from textwrap import fill from typing import ( + IO, Any, Callable, NamedTuple, @@ -21,6 +22,7 @@ from pandas._typing import ( ArrayLike, CompressionOptions, + CSVEngine, DtypeArg, FilePath, ReadCsvBuffer, @@ -48,7 +50,11 @@ from pandas.core.indexes.api import RangeIndex from pandas.core.shared_docs import _shared_docs -from pandas.io.common import validate_header_arg +from pandas.io.common import ( + IOHandles, + get_handle, + validate_header_arg, +) from pandas.io.parsers.arrow_parser_wrapper import ArrowParserWrapper from pandas.io.parsers.base_parser import ( ParserBase, @@ -601,7 +607,7 @@ def read_csv( mangle_dupe_cols=True, # General Parsing Configuration dtype: DtypeArg | None = None, - engine=None, + engine: CSVEngine | None = None, converters=None, true_values=None, false_values=None, @@ -700,7 +706,7 @@ def read_table( mangle_dupe_cols=True, # General Parsing Configuration dtype: DtypeArg | None = None, - engine=None, + engine: CSVEngine | None = None, converters=None, true_values=None, false_values=None, @@ -877,10 +883,12 @@ class TextFileReader(abc.Iterator): """ - def __init__(self, f, engine=None, **kwds): - - self.f = f - + def __init__( + self, + f: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str] | list, + engine: CSVEngine | None = None, + **kwds, + ): if engine is not None: engine_specified = True else: @@ -921,9 +929,12 @@ def __init__(self, f, engine=None, **kwds): if "has_index_names" in kwds: self.options["has_index_names"] = kwds["has_index_names"] - self._engine = self._make_engine(self.engine) + self.handles: IOHandles | None = None + self._engine = self._make_engine(f, self.engine) def close(self): + if self.handles is not None: + self.handles.close() self._engine.close() def _get_options_with_defaults(self, engine): @@ -1178,7 +1189,11 @@ def __next__(self): self.close() raise - def _make_engine(self, engine="c"): + def _make_engine( + self, + f: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str] | list | IO, + engine: CSVEngine = "c", + ): mapping: dict[str, type[ParserBase]] = { "c": CParserWrapper, "python": PythonParser, @@ -1189,17 +1204,53 @@ def _make_engine(self, engine="c"): raise ValueError( f"Unknown engine: {engine} (valid options are {mapping.keys()})" ) - return mapping[engine](self.f, **self.options) + if not isinstance(f, list): + # open file here + is_text = True + mode = "r" + if engine == "pyarrow": + is_text = False + mode = "rb" + # error: No overload variant of "get_handle" matches argument types + # "Union[str, PathLike[str], ReadCsvBuffer[bytes], ReadCsvBuffer[str]]" + # , "str", "bool", "Any", "Any", "Any", "Any", "Any" + self.handles = get_handle( # type: ignore[call-overload] + f, + mode, + encoding=self.options.get("encoding", None), + compression=self.options.get("compression", None), + memory_map=self.options.get("memory_map", False), + is_text=is_text, + errors=self.options.get("encoding_errors", "strict"), + storage_options=self.options.get("storage_options", None), + ) + assert self.handles is not None + f = self.handles.handle + + try: + return mapping[engine](f, **self.options) + except Exception: + if self.handles is not None: + self.handles.close() + raise def _failover_to_python(self): raise AbstractMethodError(self) def read(self, nrows=None): if self.engine == "pyarrow": - df = self._engine.read() + try: + df = self._engine.read() + except Exception: + self.close() + raise else: nrows = validate_integer("nrows", nrows) - index, columns, col_dict = self._engine.read(nrows) + try: + index, columns, col_dict = self._engine.read(nrows) + except Exception: + self.close() + raise if index is None: if col_dict: @@ -1374,7 +1425,7 @@ def _refine_defaults_read( dialect: str | csv.Dialect, delimiter: str | object, delim_whitespace: bool, - engine: str, + engine: CSVEngine | None, sep: str | object, error_bad_lines: bool | None, warn_bad_lines: bool | None, diff --git a/pandas/tests/io/parser/common/test_file_buffer_url.py b/pandas/tests/io/parser/common/test_file_buffer_url.py index 37b0239516b72..2c0f1b01b00cb 100644 --- a/pandas/tests/io/parser/common/test_file_buffer_url.py +++ b/pandas/tests/io/parser/common/test_file_buffer_url.py @@ -371,13 +371,13 @@ def test_context_manager(all_parsers, datapath): path = datapath("io", "data", "csv", "iris.csv") reader = parser.read_csv(path, chunksize=1) - assert not reader._engine.handles.handle.closed + assert not reader.handles.handle.closed try: with reader: next(reader) assert False except AssertionError: - assert reader._engine.handles.handle.closed + assert reader.handles.handle.closed def test_context_manageri_user_provided(all_parsers, datapath): @@ -387,13 +387,13 @@ def test_context_manageri_user_provided(all_parsers, datapath): with open(datapath("io", "data", "csv", "iris.csv")) as path: reader = parser.read_csv(path, chunksize=1) - assert not reader._engine.handles.handle.closed + assert not reader.handles.handle.closed try: with reader: next(reader) assert False except AssertionError: - assert not reader._engine.handles.handle.closed + assert not reader.handles.handle.closed def test_file_descriptor_leak(all_parsers): diff --git a/pandas/tests/io/parser/test_unsupported.py b/pandas/tests/io/parser/test_unsupported.py index f359a73382f32..0d383cc4bde8f 100644 --- a/pandas/tests/io/parser/test_unsupported.py +++ b/pandas/tests/io/parser/test_unsupported.py @@ -7,6 +7,8 @@ test suite as new feature support is added to the parsers. """ from io import StringIO +import os +from pathlib import Path import pytest @@ -161,3 +163,21 @@ def test_on_bad_lines_callable_python_only(self, all_parsers): parser.read_csv(sio, on_bad_lines=bad_lines_func) else: parser.read_csv(sio, on_bad_lines=bad_lines_func) + + +def test_close_file_handle_on_invalide_usecols(all_parsers): + # GH 45384 + parser = all_parsers + + error = ValueError + if parser.engine == "pyarrow": + pyarrow = pytest.importorskip("pyarrow") + error = pyarrow.lib.ArrowKeyError + + with tm.ensure_clean("test.csv") as fname: + Path(fname).write_text("col1,col2\na,b\n1,2") + with tm.assert_produces_warning(False): + with pytest.raises(error, match="col3"): + parser.read_csv(fname, usecols=["col1", "col2", "col3"]) + # unlink fails on windows if file handles still point to it + os.unlink(fname)