Skip to content

Commit 22de62c

Browse files
authored
ENH: memory_map for compressed files (#37621)
1 parent e0699ca commit 22de62c

File tree

4 files changed

+109
-91
lines changed

4 files changed

+109
-91
lines changed

doc/source/whatsnew/v1.2.0.rst

+1
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ Other enhancements
230230
- :class:`DatetimeIndex` and :class:`Series` with ``datetime64`` or ``datetime64tz`` dtypes now support ``std`` (:issue:`37436`)
231231
- :class:`Window` now supports all Scipy window types in ``win_type`` with flexible keyword argument support (:issue:`34556`)
232232
- :meth:`testing.assert_index_equal` now has a ``check_order`` parameter that allows indexes to be checked in an order-insensitive manner (:issue:`37478`)
233+
- :func:`read_csv` supports memory-mapping for compressed files (:issue:`37621`)
233234

234235
.. _whatsnew_120.api_breaking.python:
235236

pandas/io/common.py

+85-50
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ class IOHandles:
107107
handle: Buffer
108108
created_handles: List[Buffer] = dataclasses.field(default_factory=list)
109109
is_wrapped: bool = False
110+
is_mmap: bool = False
110111

111112
def close(self) -> None:
112113
"""
@@ -604,49 +605,49 @@ def get_handle(
604605
except ImportError:
605606
pass
606607

607-
handles: List[Buffer] = list()
608-
609608
# Windows does not default to utf-8. Set to utf-8 for a consistent behavior
610609
if encoding is None:
611610
encoding = "utf-8"
612611

613612
# Convert pathlib.Path/py.path.local or string
614-
path_or_buf = stringify_path(path_or_buf)
615-
is_path = isinstance(path_or_buf, str)
616-
f = path_or_buf
613+
handle = stringify_path(path_or_buf)
617614

618615
compression, compression_args = get_compression_method(compression)
619-
if is_path:
620-
compression = infer_compression(path_or_buf, compression)
616+
compression = infer_compression(handle, compression)
621617

622-
if compression:
618+
# memory mapping needs to be the first step
619+
handle, memory_map, handles = _maybe_memory_map(
620+
handle, memory_map, encoding, mode, errors
621+
)
623622

623+
is_path = isinstance(handle, str)
624+
if compression:
624625
# GZ Compression
625626
if compression == "gzip":
626627
if is_path:
627-
assert isinstance(path_or_buf, str)
628-
f = gzip.GzipFile(filename=path_or_buf, mode=mode, **compression_args)
628+
assert isinstance(handle, str)
629+
handle = gzip.GzipFile(filename=handle, mode=mode, **compression_args)
629630
else:
630-
f = gzip.GzipFile(
631-
fileobj=path_or_buf, # type: ignore[arg-type]
631+
handle = gzip.GzipFile(
632+
fileobj=handle, # type: ignore[arg-type]
632633
mode=mode,
633634
**compression_args,
634635
)
635636

636637
# BZ Compression
637638
elif compression == "bz2":
638-
f = bz2.BZ2File(
639-
path_or_buf, mode=mode, **compression_args # type: ignore[arg-type]
639+
handle = bz2.BZ2File(
640+
handle, mode=mode, **compression_args # type: ignore[arg-type]
640641
)
641642

642643
# ZIP Compression
643644
elif compression == "zip":
644-
f = _BytesZipFile(path_or_buf, mode, **compression_args)
645-
if f.mode == "r":
646-
handles.append(f)
647-
zip_names = f.namelist()
645+
handle = _BytesZipFile(handle, mode, **compression_args)
646+
if handle.mode == "r":
647+
handles.append(handle)
648+
zip_names = handle.namelist()
648649
if len(zip_names) == 1:
649-
f = f.open(zip_names.pop())
650+
handle = handle.open(zip_names.pop())
650651
elif len(zip_names) == 0:
651652
raise ValueError(f"Zero files found in ZIP file {path_or_buf}")
652653
else:
@@ -657,64 +658,52 @@ def get_handle(
657658

658659
# XZ Compression
659660
elif compression == "xz":
660-
f = get_lzma_file(lzma)(path_or_buf, mode)
661+
handle = get_lzma_file(lzma)(handle, mode)
661662

662663
# Unrecognized Compression
663664
else:
664665
msg = f"Unrecognized compression type: {compression}"
665666
raise ValueError(msg)
666667

667-
assert not isinstance(f, str)
668-
handles.append(f)
668+
assert not isinstance(handle, str)
669+
handles.append(handle)
669670

670671
elif is_path:
671672
# Check whether the filename is to be opened in binary mode.
672673
# Binary mode does not support 'encoding' and 'newline'.
673-
is_binary_mode = "b" in mode
674-
assert isinstance(path_or_buf, str)
675-
if encoding and not is_binary_mode:
674+
assert isinstance(handle, str)
675+
if encoding and "b" not in mode:
676676
# Encoding
677-
f = open(path_or_buf, mode, encoding=encoding, errors=errors, newline="")
677+
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
678678
else:
679679
# Binary mode
680-
f = open(path_or_buf, mode)
681-
handles.append(f)
680+
handle = open(handle, mode)
681+
handles.append(handle)
682682

683683
# Convert BytesIO or file objects passed with an encoding
684684
is_wrapped = False
685685
if is_text and (
686686
compression
687-
or isinstance(f, need_text_wrapping)
688-
or "b" in getattr(f, "mode", "")
687+
or isinstance(handle, need_text_wrapping)
688+
or "b" in getattr(handle, "mode", "")
689689
):
690-
f = TextIOWrapper(
691-
f, encoding=encoding, errors=errors, newline="" # type: ignore[arg-type]
690+
handle = TextIOWrapper(
691+
handle, # type: ignore[arg-type]
692+
encoding=encoding,
693+
errors=errors,
694+
newline="",
692695
)
693-
handles.append(f)
696+
handles.append(handle)
694697
# do not mark as wrapped when the user provided a string
695698
is_wrapped = not is_path
696699

697-
if memory_map and hasattr(f, "fileno"):
698-
assert not isinstance(f, str)
699-
try:
700-
wrapped = cast(mmap.mmap, _MMapWrapper(f)) # type: ignore[arg-type]
701-
f.close()
702-
handles.remove(f)
703-
handles.append(wrapped)
704-
f = wrapped
705-
except Exception:
706-
# we catch any errors that may have occurred
707-
# because that is consistent with the lower-level
708-
# functionality of the C engine (pd.read_csv), so
709-
# leave the file handler as is then
710-
pass
711-
712700
handles.reverse() # close the most recently added buffer first
713-
assert not isinstance(f, str)
701+
assert not isinstance(handle, str)
714702
return IOHandles(
715-
handle=f,
703+
handle=handle,
716704
created_handles=handles,
717705
is_wrapped=is_wrapped,
706+
is_mmap=memory_map,
718707
)
719708

720709

@@ -778,9 +767,16 @@ class _MMapWrapper(abc.Iterator):
778767
"""
779768

780769
def __init__(self, f: IO):
770+
self.attributes = {}
771+
for attribute in ("seekable", "readable", "writeable"):
772+
if not hasattr(f, attribute):
773+
continue
774+
self.attributes[attribute] = getattr(f, attribute)()
781775
self.mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
782776

783777
def __getattr__(self, name: str):
778+
if name in self.attributes:
779+
return lambda: self.attributes[name]
784780
return getattr(self.mmap, name)
785781

786782
def __iter__(self) -> "_MMapWrapper":
@@ -799,3 +795,42 @@ def __next__(self) -> str:
799795
if newline == "":
800796
raise StopIteration
801797
return newline
798+
799+
800+
def _maybe_memory_map(
801+
handle: FileOrBuffer,
802+
memory_map: bool,
803+
encoding: str,
804+
mode: str,
805+
errors: Optional[str],
806+
) -> Tuple[FileOrBuffer, bool, List[Buffer]]:
807+
"""Try to use memory map file/buffer."""
808+
handles: List[Buffer] = []
809+
memory_map &= hasattr(handle, "fileno") or isinstance(handle, str)
810+
if not memory_map:
811+
return handle, memory_map, handles
812+
813+
# need to open the file first
814+
if isinstance(handle, str):
815+
if encoding and "b" not in mode:
816+
# Encoding
817+
handle = open(handle, mode, encoding=encoding, errors=errors, newline="")
818+
else:
819+
# Binary mode
820+
handle = open(handle, mode)
821+
handles.append(handle)
822+
823+
try:
824+
wrapped = cast(mmap.mmap, _MMapWrapper(handle)) # type: ignore[arg-type]
825+
handle.close()
826+
handles.remove(handle)
827+
handles.append(wrapped)
828+
handle = wrapped
829+
except Exception:
830+
# we catch any errors that may have occurred
831+
# because that is consistent with the lower-level
832+
# functionality of the C engine (pd.read_csv), so
833+
# leave the file handler as is then
834+
memory_map = False
835+
836+
return handle, memory_map, handles

pandas/io/parsers.py

+2-18
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,7 @@
6363
from pandas.core.series import Series
6464
from pandas.core.tools import datetimes as tools
6565

66-
from pandas.io.common import (
67-
get_compression_method,
68-
get_filepath_or_buffer,
69-
get_handle,
70-
stringify_path,
71-
validate_header_arg,
72-
)
66+
from pandas.io.common import get_filepath_or_buffer, get_handle, validate_header_arg
7367
from pandas.io.date_converters import generic_parser
7468

7569
# BOM character (byte order mark)
@@ -1834,16 +1828,6 @@ def __init__(self, src, **kwds):
18341828

18351829
ParserBase.__init__(self, kwds)
18361830

1837-
if kwds.get("memory_map", False):
1838-
# memory-mapped files are directly handled by the TextReader.
1839-
src = stringify_path(src)
1840-
1841-
if get_compression_method(kwds.get("compression", None))[0] is not None:
1842-
raise ValueError(
1843-
"read_csv does not support compression with memory_map=True. "
1844-
+ "Please use memory_map=False instead."
1845-
)
1846-
18471831
self.handles = get_handle(
18481832
src,
18491833
mode="r",
@@ -1855,7 +1839,7 @@ def __init__(self, src, **kwds):
18551839
kwds.pop("encoding", None)
18561840
kwds.pop("memory_map", None)
18571841
kwds.pop("compression", None)
1858-
if kwds.get("memory_map", False) and hasattr(self.handles.handle, "mmap"):
1842+
if self.handles.is_mmap and hasattr(self.handles.handle, "mmap"):
18591843
self.handles.handle = self.handles.handle.mmap
18601844

18611845
# #2442

pandas/tests/io/parser/test_common.py

+21-23
Original file line numberDiff line numberDiff line change
@@ -2275,40 +2275,38 @@ def test_read_csv_file_handle(all_parsers, io_class, encoding):
22752275
assert not handle.closed
22762276

22772277

2278-
def test_memory_map_compression_error(c_parser_only):
2278+
def test_memory_map_file_handle_silent_fallback(all_parsers, compression):
22792279
"""
2280-
c-parsers do not support memory_map=True with compression.
2280+
Do not fail for buffers with memory_map=True (cannot memory map BytesIO).
22812281
2282-
GH 36997
2282+
GH 37621
22832283
"""
2284-
parser = c_parser_only
2285-
df = DataFrame({"a": [1], "b": [2]})
2286-
msg = (
2287-
"read_csv does not support compression with memory_map=True. "
2288-
+ "Please use memory_map=False instead."
2289-
)
2284+
parser = all_parsers
2285+
expected = DataFrame({"a": [1], "b": [2]})
22902286

2291-
with tm.ensure_clean() as path:
2292-
df.to_csv(path, compression="gzip", index=False)
2287+
handle = BytesIO()
2288+
expected.to_csv(handle, index=False, compression=compression, mode="wb")
2289+
handle.seek(0)
22932290

2294-
with pytest.raises(ValueError, match=msg):
2295-
parser.read_csv(path, memory_map=True, compression="gzip")
2291+
tm.assert_frame_equal(
2292+
parser.read_csv(handle, memory_map=True, compression=compression),
2293+
expected,
2294+
)
22962295

22972296

2298-
def test_memory_map_file_handle(all_parsers):
2297+
def test_memory_map_compression(all_parsers, compression):
22992298
"""
2300-
Support some buffers with memory_map=True.
2299+
Support memory map for compressed files.
23012300
2302-
GH 36997
2301+
GH 37621
23032302
"""
23042303
parser = all_parsers
23052304
expected = DataFrame({"a": [1], "b": [2]})
23062305

2307-
handle = StringIO()
2308-
expected.to_csv(handle, index=False)
2309-
handle.seek(0)
2306+
with tm.ensure_clean() as path:
2307+
expected.to_csv(path, index=False, compression=compression)
23102308

2311-
tm.assert_frame_equal(
2312-
parser.read_csv(handle, memory_map=True),
2313-
expected,
2314-
)
2309+
tm.assert_frame_equal(
2310+
parser.read_csv(path, memory_map=True, compression=compression),
2311+
expected,
2312+
)

0 commit comments

Comments
 (0)