Skip to content

CLN: memory-mapping code #44766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,9 @@ I/O
- Bug in :func:`read_csv` raising ``AttributeError`` when attempting to read a .csv file and infer index column dtype from an nullable integer type (:issue:`44079`)
- :meth:`DataFrame.to_csv` and :meth:`Series.to_csv` with ``compression`` set to ``'zip'`` no longer create a zip file containing a file ending with ".zip". Instead, they try to infer the inner file name more smartly. (:issue:`39465`)
- Bug in :func:`read_csv` when passing simultaneously a parser in ``date_parser`` and ``parse_dates=False``, the parsing was still called (:issue:`44366`)
- Bug in :func:`read_csv` when passing a ``tempfile.SpooledTemporaryFile`` opened in binary mode (:issue:`44748`)
- Bug in :func:`read_csv` silently ignoring errors when failling to create a memory-mapped file (:issue:`44766`)
-

Period
^^^^^^
Expand Down
158 changes: 32 additions & 126 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@

import bz2
import codecs
from collections import abc
import dataclasses
import gzip
import io
from io import (
BufferedIOBase,
BytesIO,
Expand All @@ -18,7 +16,6 @@
import mmap
import os
from pathlib import Path
import tempfile
from typing import (
IO,
Any,
Expand Down Expand Up @@ -104,7 +101,7 @@ def close(self) -> None:
avoid closing the potentially user-created buffer.
"""
if self.is_wrapped:
assert isinstance(self.handle, (TextIOWrapper, BytesIOWrapper))
assert isinstance(self.handle, TextIOWrapper)
self.handle.flush()
self.handle.detach()
self.created_handles.remove(self.handle)
Expand Down Expand Up @@ -673,14 +670,7 @@ def get_handle(
handles: list[BaseBuffer]

# memory mapping needs to be the first step
handle, memory_map, handles = _maybe_memory_map(
handle,
memory_map,
ioargs.encoding,
ioargs.mode,
errors,
ioargs.compression["method"] not in _compression_to_extension,
)
handle, memory_map, handles = _maybe_memory_map(handle, memory_map)

is_path = isinstance(handle, str)
compression_args = dict(ioargs.compression)
Expand Down Expand Up @@ -779,20 +769,19 @@ def get_handle(
# Convert BytesIO or file objects passed with an encoding
is_wrapped = False
if not is_text and ioargs.mode == "rb" and isinstance(handle, TextIOBase):
handle = BytesIOWrapper(
# not added to handles as it does not open/buffer resources
handle = _BytesIOWrapper(
handle,
encoding=ioargs.encoding,
)
handles.append(handle)
# the (text) handle is always provided by the caller
# since get_handle would have opened it in binary mode
is_wrapped = True
elif is_text and (compression or _is_binary_mode(handle, ioargs.mode)):
elif is_text and (
compression or _is_binary_mode(handle, ioargs.mode) or memory_map
):
handle = TextIOWrapper(
# error: Argument 1 to "TextIOWrapper" has incompatible type
# "Union[IO[bytes], IO[Any], RawIOBase, BufferedIOBase, TextIOBase, mmap]";
# expected "IO[bytes]"
handle, # type: ignore[arg-type]
_IOWrapper(handle), # type: ignore[arg-type]
encoding=ioargs.encoding,
errors=errors,
newline="",
Expand Down Expand Up @@ -909,78 +898,26 @@ def closed(self):
return self.fp is None


class _MMapWrapper(abc.Iterator):
"""
Wrapper for the Python's mmap class so that it can be properly read in
by Python's csv.reader class.

Parameters
----------
f : file object
File object to be mapped onto memory. Must support the 'fileno'
method or have an equivalent attribute

"""
class _IOWrapper:
# Lies that certain attributes exist and are True.

def __init__(
self,
f: IO,
encoding: str = "utf-8",
errors: str = "strict",
decode: bool = True,
):
self.encoding = encoding
self.errors = errors
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
self.decode = decode

self.attributes = {}
for attribute in ("seekable", "readable", "writeable"):
if not hasattr(f, attribute):
continue
self.attributes[attribute] = getattr(f, attribute)()
self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
def __init__(self, buffer: BaseBuffer):
self.buffer = buffer
self.attributes = tuple(
attr
for attr in ("seekable", "readable", "writable")
if not hasattr(self.buffer, attr)
)

def __getattr__(self, name: str):
if name in self.attributes:
return lambda: self.attributes[name]
return getattr(self.mmap, name)
return lambda: True
return getattr(self.buffer, name)

def __iter__(self) -> _MMapWrapper:
return self

def read(self, size: int = -1) -> str | bytes:
# CSV c-engine uses read instead of iterating
content: bytes = self.mmap.read(size)
if self.decode and self.encoding != "utf-8":
# memory mapping is applied before compression. Encoding should
# be applied to the de-compressed data.
final = size == -1 or len(content) < size
return self.decoder.decode(content, final=final)
return content

def __next__(self) -> str:
newbytes = self.mmap.readline()

# readline returns bytes, not str, but Python's CSV reader
# expects str, so convert the output to str before continuing
newline = self.decoder.decode(newbytes)

# mmap doesn't raise if reading past the allocated
# data but instead returns an empty string, so raise
# if that is returned
if newline == "":
raise StopIteration

# IncrementalDecoder seems to push newline to the next line
return newline.lstrip("\n")


# Wrapper that wraps a StringIO buffer and reads bytes from it
# Created for compat with pyarrow read_csv
class BytesIOWrapper(io.BytesIO):
buffer: StringIO | TextIOBase | None

class _BytesIOWrapper:
# Wrapper that wraps a StringIO buffer and reads bytes from it
# Created for compat with pyarrow read_csv
def __init__(self, buffer: StringIO | TextIOBase, encoding: str = "utf-8"):
self.buffer = buffer
self.encoding = encoding
Expand All @@ -1006,23 +943,9 @@ def read(self, n: int | None = -1) -> bytes:
self.overflow = combined_bytestring[n:]
return to_return

def detach(self):
# Slightly modified from Python's TextIOWrapper detach method
if self.buffer is None:
raise ValueError("buffer is already detached")
self.flush()
buffer = self.buffer
self.buffer = None
return buffer


def _maybe_memory_map(
handle: str | BaseBuffer,
memory_map: bool,
encoding: str,
mode: str,
errors: str | None,
decode: bool,
handle: str | BaseBuffer, memory_map: bool
) -> tuple[str | BaseBuffer, bool, list[BaseBuffer]]:
"""Try to memory map file/buffer."""
handles: list[BaseBuffer] = []
Expand All @@ -1032,32 +955,17 @@ def _maybe_memory_map(

# need to open the file first
if isinstance(handle, str):
if encoding and "b" not in mode:
# Encoding
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
else:
# Binary mode
handle = open(handle, mode)
# encoding will be handled later in TextIOWrapper
handle = open(handle, "rb")
handles.append(handle)

try:
# error: Argument 1 to "_MMapWrapper" has incompatible type "Union[IO[Any],
# RawIOBase, BufferedIOBase, TextIOBase, mmap]"; expected "IO[Any]"
wrapped = cast(
BaseBuffer,
_MMapWrapper(handle, encoding, errors, decode), # type: ignore[arg-type]
)
# error: "BaseBuffer" has no attribute "close"
handle.close() # type: ignore[attr-defined]
handles.remove(handle)
handles.append(wrapped)
handle = wrapped
except Exception:
# we catch any errors that may have occurred
# because that is consistent with the lower-level
# functionality of the C engine (pd.read_csv), so
# leave the file handler as is then
memory_map = False
mmap_handle = mmap.mmap(handle.fileno(), 0, access=mmap.ACCESS_READ)

# error: Argument 1 to "_IOWrapper" has incompatible type "mmap";
# expected "BaseBuffer"
wrapped = _IOWrapper(mmap_handle) # type: ignore[arg-type]
handles.append(wrapped)
handle = wrapped

return handle, memory_map, handles

Expand Down Expand Up @@ -1088,8 +996,6 @@ def _is_binary_mode(handle: FilePath | BaseBuffer, mode: str) -> bool:
codecs.StreamWriter,
codecs.StreamReader,
codecs.StreamReaderWriter,
# cannot be wrapped in TextIOWrapper GH43439
tempfile.SpooledTemporaryFile,
)
if issubclass(type(handle), text_classes):
return False
Expand Down
2 changes: 2 additions & 0 deletions pandas/io/parsers/base_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def _open_handles(
self,
src: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str],
kwds: dict[str, Any],
is_text: bool = True,
) -> None:
"""
Let the readers open IOHandles after they are done with their potential raises.
Expand All @@ -243,6 +244,7 @@ def _open_handles(
memory_map=kwds.get("memory_map", False),
storage_options=kwds.get("storage_options", None),
errors=kwds.get("encoding_errors", "strict"),
is_text=is_text,
)

def _validate_parse_dates_presence(self, columns: Sequence[Hashable]) -> Iterable:
Expand Down
4 changes: 3 additions & 1 deletion pandas/io/parsers/c_parser_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ def __init__(
kwds["usecols"] = self.usecols

# open handles
self._open_handles(src, kwds)
# The c-engine can cope with utf-8 bytes: no need to use TextIOWrapper when the
# encoding is utf-8: get_handle(..., is_text=Flase)
self._open_handles(src, kwds, is_text=kwds.get("encoding") != "utf-8")
assert self.handles is not None

# Have to pass int, would break tests using TextReader directly otherwise :(
Expand Down
19 changes: 0 additions & 19 deletions pandas/tests/io/parser/common/test_file_buffer_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,25 +347,6 @@ def test_read_csv_file_handle(all_parsers, io_class, encoding):
assert not handle.closed


def test_memory_map_file_handle_silent_fallback(all_parsers, compression):
"""
Do not fail for buffers with memory_map=True (cannot memory map BytesIO).

GH 37621
"""
parser = all_parsers
expected = DataFrame({"a": [1], "b": [2]})

handle = BytesIO()
expected.to_csv(handle, index=False, compression=compression, mode="wb")
handle.seek(0)

tm.assert_frame_equal(
parser.read_csv(handle, memory_map=True, compression=compression),
expected,
)


def test_memory_map_compression(all_parsers, compression):
"""
Support memory map for compressed files.
Expand Down
23 changes: 16 additions & 7 deletions pandas/tests/io/parser/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,14 @@ def test_parse_encoded_special_characters(encoding):
def test_encoding_memory_map(all_parsers, encoding):
# GH40986
parser = all_parsers

# add one entry with a special character
encoding_ = encoding or "utf-8"
leonardo = "Léonardo".encode(encoding_, errors="ignore").decode(encoding_)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This more strict test version of the test would have failed on master with the python engine.


expected = DataFrame(
{
"name": ["Raphael", "Donatello", "Miguel Angel", "Leonardo"],
"name": ["Raphael", "Donatello", "Miguel Angel", leonardo],
"mask": ["red", "purple", "orange", "blue"],
"weapon": ["sai", "bo staff", "nunchunk", "katana"],
}
Expand Down Expand Up @@ -299,17 +304,21 @@ def test_readcsv_memmap_utf8(all_parsers):
tm.assert_frame_equal(df, dfr)


def test_not_readable(all_parsers, request):
@pytest.mark.usefixtures("pyarrow_xfail")
@pytest.mark.parametrize("mode", ["w+b", "w+t"])
def test_not_readable(all_parsers, mode, request):
# GH43439
parser = all_parsers
if parser.engine in ("python", "pyarrow"):
if parser.engine == "pyarrow":
mark = pytest.mark.xfail(
reason="SpooledTemporaryFile does only work with the c-engine"
reason="SpooledTemporaryFile does only work with the pyarrow-engine"
)
request.node.add_marker(mark)

with tempfile.SpooledTemporaryFile() as handle:
handle.write(b"abcd")
content = b"abcd"
if "t" in mode:
content = "abcd"
with tempfile.SpooledTemporaryFile(mode=mode) as handle:
handle.write(content)
handle.seek(0)
df = parser.read_csv(handle)
expected = DataFrame([], columns=["abcd"])
Expand Down
18 changes: 9 additions & 9 deletions pandas/tests/io/parser/test_read_fwf.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,15 +699,15 @@ def test_encoding_mmap(memory_map):
GH 23254.
"""
encoding = "iso8859_1"
data = BytesIO(" 1 A Ä 2\n".encode(encoding))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test wasn't using memory_map because it silently failed.

df = read_fwf(
data,
header=None,
widths=[2, 2, 2, 2],
encoding=encoding,
memory_map=memory_map,
)
data.seek(0)
with tm.ensure_clean() as path:
Path(path).write_bytes(" 1 A Ä 2\n".encode(encoding))
df = read_fwf(
path,
header=None,
widths=[2, 2, 2, 2],
encoding=encoding,
memory_map=memory_map,
)
df_reference = DataFrame([[1, "A", "Ä", 2]])
tm.assert_frame_equal(df, df_reference)

Expand Down
Loading