Skip to content

CLN: Simplify mmap code #47175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 22 additions & 100 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
)
import bz2
import codecs
from collections import abc
import dataclasses
import functools
import gzip
Expand Down Expand Up @@ -103,7 +102,6 @@ class IOHandles(Generic[AnyStr]):
compression: CompressionDict
created_handles: list[IO[bytes] | IO[str]] = dataclasses.field(default_factory=list)
is_wrapped: bool = False
is_mmap: bool = False

def close(self) -> None:
"""
Expand Down Expand Up @@ -687,14 +685,7 @@ def get_handle(

# memory mapping needs to be the first step
# only used for read_csv
handle, memory_map, handles = _maybe_memory_map(
handle,
memory_map,
ioargs.encoding,
ioargs.mode,
errors,
ioargs.compression["method"] not in _supported_compressions,
)
handle, memory_map, handles = _maybe_memory_map(handle, memory_map)

is_path = isinstance(handle, str)
compression_args = dict(ioargs.compression)
Expand Down Expand Up @@ -841,12 +832,19 @@ def get_handle(
handle,
encoding=ioargs.encoding,
)
elif is_text and (compression or _is_binary_mode(handle, ioargs.mode)):
elif is_text and (
compression or memory_map or _is_binary_mode(handle, ioargs.mode)
):
if (
not hasattr(handle, "readable")
or not hasattr(handle, "writable")
or not hasattr(handle, "seekable")
):
handle = _IOWrapper(handle)
# error: Argument 1 to "TextIOWrapper" has incompatible type
# "_IOWrapper"; expected "IO[bytes]"
handle = TextIOWrapper(
# error: Argument 1 to "TextIOWrapper" has incompatible type
# "Union[IO[bytes], IO[Any], RawIOBase, BufferedIOBase, TextIOBase, mmap]";
# expected "IO[bytes]"
_IOWrapper(handle), # type: ignore[arg-type]
handle, # type: ignore[arg-type]
encoding=ioargs.encoding,
errors=errors,
newline="",
Expand Down Expand Up @@ -877,7 +875,6 @@ def get_handle(
# "List[BaseBuffer]"; expected "List[Union[IO[bytes], IO[str]]]"
created_handles=handles, # type: ignore[arg-type]
is_wrapped=is_wrapped,
is_mmap=memory_map,
compression=ioargs.compression,
)

Expand Down Expand Up @@ -1001,75 +998,6 @@ def write_to_buffer(self) -> None:
self.buffer.writestr(archive_name, self.getvalue())


class _CSVMMapWrapper(abc.Iterator):
"""
Wrapper for the Python's mmap class so that it can be properly read in
by Python's csv.reader class.

Parameters
----------
f : file object
File object to be mapped onto memory. Must support the 'fileno'
method or have an equivalent attribute

"""

def __init__(
self,
f: ReadBuffer[bytes],
encoding: str = "utf-8",
errors: str = "strict",
decode: bool = True,
) -> None:
self.encoding = encoding
self.errors = errors
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
self.decode = decode

# needed for compression libraries and TextIOWrapper
self.attributes = {}
for attribute in ("seekable", "readable"):
if not hasattr(f, attribute):
continue
self.attributes[attribute] = getattr(f, attribute)()

self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

def __getattr__(self, name: str):
if name in self.attributes:
return lambda: self.attributes[name]
return getattr(self.mmap, name)

def __iter__(self) -> _CSVMMapWrapper:
return self

def read(self, size: int = -1) -> str | bytes:
# CSV c-engine uses read instead of iterating
content: bytes = self.mmap.read(size)
if self.decode and self.encoding != "utf-8":
# memory mapping is applied before compression. Encoding should
# be applied to the de-compressed data.
final = size == -1 or len(content) < size
return self.decoder.decode(content, final=final)
return content

def __next__(self) -> str:
newbytes = self.mmap.readline()

# readline returns bytes, not str, but Python's CSV reader
# expects str, so convert the output to str before continuing
newline = self.decoder.decode(newbytes)

# mmap doesn't raise if reading past the allocated
# data but instead returns an empty string, so raise
# if that is returned
if newline == "":
raise StopIteration

# IncrementalDecoder seems to push newline to the next line
return newline.lstrip("\n")


class _IOWrapper:
# TextIOWrapper is overly strict: it request that the buffer has seekable, readable,
# and writable. If we have a read-only buffer, we shouldn't need writable and vice
Expand Down Expand Up @@ -1131,12 +1059,7 @@ def read(self, n: int | None = -1) -> bytes:


def _maybe_memory_map(
handle: str | BaseBuffer,
memory_map: bool,
encoding: str,
mode: str,
errors: str,
decode: bool,
handle: str | BaseBuffer, memory_map: bool
) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]:
"""Try to memory map file/buffer."""
handles: list[BaseBuffer] = []
Expand All @@ -1149,22 +1072,21 @@ def _maybe_memory_map(
handle = open(handle, "rb")
handles.append(handle)

# error: Argument 1 to "_MMapWrapper" has incompatible type "Union[IO[Any],
# RawIOBase, BufferedIOBase, TextIOBase, mmap]"; expected "IO[Any]"
try:
# open mmap, adds *-able, and convert to string
wrapped = cast(
BaseBuffer,
_CSVMMapWrapper(handle, encoding, errors, decode), # type: ignore[arg-type]
# open mmap and adds *-able
# error: Argument 1 to "_IOWrapper" has incompatible type "mmap";
# expected "BaseBuffer"
wrapped = _IOWrapper(
mmap.mmap(
handle.fileno(), 0, access=mmap.ACCESS_READ # type: ignore[arg-type]
)
)
finally:
for handle in reversed(handles):
# error: "BaseBuffer" has no attribute "close"
handle.close() # type: ignore[attr-defined]
handles = []
handles.append(wrapped)

return wrapped, memory_map, handles
return wrapped, memory_map, [wrapped]


def file_exists(filepath_or_buffer: FilePath | BaseBuffer) -> bool:
Expand Down
12 changes: 12 additions & 0 deletions pandas/io/parsers/c_parser_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from collections import defaultdict
from io import TextIOWrapper
from typing import (
Hashable,
Mapping,
Expand Down Expand Up @@ -62,6 +63,17 @@ def __init__(self, src: ReadCsvBuffer[str], **kwds) -> None:
# Have to pass int, would break tests using TextReader directly otherwise :(
kwds["on_bad_lines"] = self.on_bad_lines.value

# c-engine can cope with utf-8 bytes. Remove TextIOWrapper when its errors
# policy is the same as the one given to read_csv
if (
isinstance(src, TextIOWrapper)
and src.encoding == "utf-8"
and (src.errors or "strict") == kwds["encoding_errors"]
):
# error: Incompatible types in assignment (expression has type "BinaryIO",
# variable has type "ReadCsvBuffer[str]")
src = src.buffer # type: ignore[assignment]

for key in (
"storage_options",
"encoding",
Expand Down
34 changes: 13 additions & 21 deletions pandas/tests/io/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,39 +413,31 @@ def test_constructor_bad_file(self, mmap_file):
err = mmap.error

with pytest.raises(err, match=msg):
icom._CSVMMapWrapper(non_file)
icom._maybe_memory_map(non_file, True)

with open(mmap_file) as target:
pass

msg = "I/O operation on closed file"
with pytest.raises(ValueError, match=msg):
icom._CSVMMapWrapper(target)

def test_get_attr(self, mmap_file):
with open(mmap_file) as target:
wrapper = icom._CSVMMapWrapper(target)

attrs = dir(wrapper.mmap)
attrs = [attr for attr in attrs if not attr.startswith("__")]
attrs.append("__next__")

for attr in attrs:
assert hasattr(wrapper, attr)

assert not hasattr(wrapper, "foo")
icom._maybe_memory_map(target, True)

def test_next(self, mmap_file):
with open(mmap_file) as target:
wrapper = icom._CSVMMapWrapper(target)
lines = target.readlines()

for line in lines:
next_line = next(wrapper)
assert next_line.strip() == line.strip()
with icom.get_handle(
target, "r", is_text=True, memory_map=True
) as wrappers:
wrapper = wrappers.handle
assert isinstance(wrapper.buffer.buffer, mmap.mmap)

for line in lines:
next_line = next(wrapper)
assert next_line.strip() == line.strip()

with pytest.raises(StopIteration, match=r"^$"):
next(wrapper)
with pytest.raises(StopIteration, match=r"^$"):
next(wrapper)

def test_unknown_engine(self):
with tm.ensure_clean() as path:
Expand Down