diff --git a/pandas/io/common.py b/pandas/io/common.py index 33f83d7c66433..5eca7eea7eec6 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -7,7 +7,6 @@ ) import bz2 import codecs -from collections import abc import dataclasses import functools import gzip @@ -103,7 +102,6 @@ class IOHandles(Generic[AnyStr]): compression: CompressionDict created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list) is_wrapped: bool = False - is_mmap: bool = False def close(self) -> None: """ @@ -687,14 +685,7 @@ def get_handle( # memory mapping needs to be the first step # only used for read_csv - handle, memory_map, handles = _maybe_memory_map( - handle, - memory_map, - ioargs.encoding, - ioargs.mode, - errors, - ioargs.compression["method"] not in _supported_compressions, - ) + handle, memory_map, handles = _maybe_memory_map(handle, memory_map) is_path = isinstance(handle, str) compression_args = dict(ioargs.compression) @@ -841,12 +832,19 @@ def get_handle( handle, encoding=ioargs.encoding, ) - elif is_text and (compression or _is_binary_mode(handle, ioargs.mode)): + elif is_text and ( + compression or memory_map or _is_binary_mode(handle, ioargs.mode) + ): + if ( + not hasattr(handle, "readable") + or not hasattr(handle, "writable") + or not hasattr(handle, "seekable") + ): + handle = _IOWrapper(handle) + # error: Argument 1 to "TextIOWrapper" has incompatible type + # "_IOWrapper"; expected "IO[bytes]" handle = TextIOWrapper( - # error: Argument 1 to "TextIOWrapper" has incompatible type - # "Union[IO[bytes], IO[Any], RawIOBase, BufferedIOBase, TextIOBase, mmap]"; - # expected "IO[bytes]" - _IOWrapper(handle), # type: ignore[arg-type] + handle, # type: ignore[arg-type] encoding=ioargs.encoding, errors=errors, newline="", @@ -877,7 +875,6 @@ def get_handle( # "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]" created_handles=handles, # type: ignore[arg-type] is_wrapped=is_wrapped, - is_mmap=memory_map, compression=ioargs.compression, ) @@ -1001,75 +998,6 @@ def write_to_buffer(self) -> None: self.buffer.writestr(archive_name, self.getvalue()) -class _CSVMMapWrapper(abc.Iterator): - """ - Wrapper for the Python's mmap class so that it can be properly read in - by Python's csv.reader class. - - Parameters - ---------- - f : file object - File object to be mapped onto memory. Must support the 'fileno' - method or have an equivalent attribute - - """ - - def __init__( - self, - f: ReadBuffer[bytes], - encoding: str = "utf-8", - errors: str = "strict", - decode: bool = True, - ) -> None: - self.encoding = encoding - self.errors = errors - self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors) - self.decode = decode - - # needed for compression libraries and TextIOWrapper - self.attributes = {} - for attribute in ("seekable", "readable"): - if not hasattr(f, attribute): - continue - self.attributes[attribute] = getattr(f, attribute)() - - self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) - - def __getattr__(self, name: str): - if name in self.attributes: - return lambda: self.attributes[name] - return getattr(self.mmap, name) - - def __iter__(self) -> _CSVMMapWrapper: - return self - - def read(self, size: int = -1) -> str | bytes: - # CSV c-engine uses read instead of iterating - content: bytes = self.mmap.read(size) - if self.decode and self.encoding != "utf-8": - # memory mapping is applied before compression. Encoding should - # be applied to the de-compressed data. - final = size == -1 or len(content) < size - return self.decoder.decode(content, final=final) - return content - - def __next__(self) -> str: - newbytes = self.mmap.readline() - - # readline returns bytes, not str, but Python's CSV reader - # expects str, so convert the output to str before continuing - newline = self.decoder.decode(newbytes) - - # mmap doesn't raise if reading past the allocated - # data but instead returns an empty string, so raise - # if that is returned - if newline == "": - raise StopIteration - - # IncrementalDecoder seems to push newline to the next line - return newline.lstrip("\n") - - class _IOWrapper: # TextIOWrapper is overly strict: it request that the buffer has seekable, readable, # and writable. If we have a read-only buffer, we shouldn't need writable and vice @@ -1131,12 +1059,7 @@ def read(self, n: int | None = -1) -> bytes: def _maybe_memory_map( - handle: str | BaseBuffer, - memory_map: bool, - encoding: str, - mode: str, - errors: str, - decode: bool, + handle: str | BaseBuffer, memory_map: bool ) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]: """Try to memory map file/buffer.""" handles: list[BaseBuffer] = [] @@ -1149,22 +1072,21 @@ def _maybe_memory_map( handle = open(handle, "rb") handles.append(handle) - # error: Argument 1 to "_MMapWrapper" has incompatible type "Union[IO[Any], - # RawIOBase, BufferedIOBase, TextIOBase, mmap]"; expected "IO[Any]" try: - # open mmap, adds *-able, and convert to string - wrapped = cast( - BaseBuffer, - _CSVMMapWrapper(handle, encoding, errors, decode), # type: ignore[arg-type] + # open mmap and adds *-able + # error: Argument 1 to "_IOWrapper" has incompatible type "mmap"; + # expected "BaseBuffer" + wrapped = _IOWrapper( + mmap.mmap( + handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type] + ) ) finally: for handle in reversed(handles): # error: "BaseBuffer" has no attribute "close" handle.close() # type: ignore[attr-defined] - handles = [] - handles.append(wrapped) - return wrapped, memory_map, handles + return wrapped, memory_map, [wrapped] def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool: diff --git a/pandas/io/parsers/c_parser_wrapper.py b/pandas/io/parsers/c_parser_wrapper.py index d18abc477665a..91c37a0e43505 100644 --- a/pandas/io/parsers/c_parser_wrapper.py +++ b/pandas/io/parsers/c_parser_wrapper.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections import defaultdict +from io import TextIOWrapper from typing import ( Hashable, Mapping, @@ -62,6 +63,17 @@ def __init__(self, src: ReadCsvBuffer[str], **kwds) -> None: # Have to pass int, would break tests using TextReader directly otherwise :( kwds["on_bad_lines"] = self.on_bad_lines.value + # c-engine can cope with utf-8 bytes. Remove TextIOWrapper when its errors + # policy is the same as the one given to read_csv + if ( + isinstance(src, TextIOWrapper) + and src.encoding == "utf-8" + and (src.errors or "strict") == kwds["encoding_errors"] + ): + # error: Incompatible types in assignment (expression has type "BinaryIO", + # variable has type "ReadCsvBuffer[str]") + src = src.buffer # type: ignore[assignment] + for key in ( "storage_options", "encoding", diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 22399917f2bf7..4df4e351faf92 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -413,39 +413,31 @@ def test_constructor_bad_file(self, mmap_file): err = mmap.error with pytest.raises(err, match=msg): - icom._CSVMMapWrapper(non_file) + icom._maybe_memory_map(non_file, True) with open(mmap_file) as target: pass msg = "I/O operation on closed file" with pytest.raises(ValueError, match=msg): - icom._CSVMMapWrapper(target) - - def test_get_attr(self, mmap_file): - with open(mmap_file) as target: - wrapper = icom._CSVMMapWrapper(target) - - attrs = dir(wrapper.mmap) - attrs = [attr for attr in attrs if not attr.startswith("__")] - attrs.append("__next__") - - for attr in attrs: - assert hasattr(wrapper, attr) - - assert not hasattr(wrapper, "foo") + icom._maybe_memory_map(target, True) def test_next(self, mmap_file): with open(mmap_file) as target: - wrapper = icom._CSVMMapWrapper(target) lines = target.readlines() - for line in lines: - next_line = next(wrapper) - assert next_line.strip() == line.strip() + with icom.get_handle( + target, "r", is_text=True, memory_map=True + ) as wrappers: + wrapper = wrappers.handle + assert isinstance(wrapper.buffer.buffer, mmap.mmap) + + for line in lines: + next_line = next(wrapper) + assert next_line.strip() == line.strip() - with pytest.raises(StopIteration, match=r"^$"): - next(wrapper) + with pytest.raises(StopIteration, match=r"^$"): + next(wrapper) def test_unknown_engine(self): with tm.ensure_clean() as path: