Skip to content

BUG/ENH: compression for google cloud storage in to_csv #35681

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v1.2.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ I/O
- In :meth:`read_csv` `float_precision='round_trip'` now handles `decimal` and `thousands` parameters (:issue:`35365`)
- :meth:`to_pickle` and :meth:`read_pickle` were closing user-provided file objects (:issue:`35679`)
- :meth:`to_csv` passes compression arguments for `'gzip'` always to `gzip.GzipFile` (:issue:`28103`)
- :meth:`to_csv` did not support zip compression for binary file object not having a filename (:issue: `35058`)
- :meth:`to_csv` and :meth:`read_csv` did not honor `compression` and `encoding` for path-like objects that are internally converted to file-like objects (:issue:`35677`, :issue:`26124`, and :issue:`32392`)

Plotting
^^^^^^^^
Expand Down
29 changes: 28 additions & 1 deletion pandas/_typing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dataclasses import dataclass
from datetime import datetime, timedelta, tzinfo
from io import IOBase
from pathlib import Path
from typing import (
IO,
Expand All @@ -8,6 +10,7 @@
Callable,
Collection,
Dict,
Generic,
Hashable,
List,
Mapping,
Expand Down Expand Up @@ -62,7 +65,8 @@
"ExtensionDtype", str, np.dtype, Type[Union[str, float, int, complex, bool]]
]
DtypeObj = Union[np.dtype, "ExtensionDtype"]
FilePathOrBuffer = Union[str, Path, IO[AnyStr]]
FilePathOrBuffer = Union[str, Path, IO[AnyStr], IOBase]
FileOrBuffer = Union[str, IO[AnyStr], IOBase]

# FrameOrSeriesUnion means either a DataFrame or a Series. E.g.
# `def func(a: FrameOrSeriesUnion) -> FrameOrSeriesUnion: ...` means that if a Series
Expand Down Expand Up @@ -114,3 +118,26 @@
# compression keywords and compression
CompressionDict = Mapping[str, Optional[Union[str, int, bool]]]
CompressionOptions = Optional[Union[str, CompressionDict]]


# let's bind types
ModeVar = TypeVar("ModeVar", str, None, Optional[str])
EncodingVar = TypeVar("EncodingVar", str, None, Optional[str])


@dataclass
class IOargs(Generic[ModeVar, EncodingVar]):
"""
Return value of io/common.py:get_filepath_or_buffer.

Note (copy&past from io/parsers):
filepath_or_buffer can be Union[FilePathOrBuffer, s3fs.S3File, gcsfs.GCSFile]
though mypy handling of conditional imports is difficult.
See https://github.com/python/mypy/issues/1297
"""

filepath_or_buffer: FileOrBuffer
encoding: EncodingVar
compression: CompressionOptions
should_close: bool
mode: Union[ModeVar, str]
13 changes: 5 additions & 8 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2281,14 +2281,11 @@ def to_markdown(
result = tabulate.tabulate(self, **kwargs)
if buf is None:
return result
buf, _, _, should_close = get_filepath_or_buffer(
buf, mode=mode, storage_options=storage_options
)
assert buf is not None # Help mypy.
assert not isinstance(buf, str)
buf.writelines(result)
if should_close:
buf.close()
ioargs = get_filepath_or_buffer(buf, mode=mode, storage_options=storage_options)
assert not isinstance(ioargs.filepath_or_buffer, str)
ioargs.filepath_or_buffer.writelines(result)
if ioargs.should_close:
ioargs.filepath_or_buffer.close()
return None

@deprecate_kwarg(old_arg_name="fname", new_arg_name="path")
Expand Down
2 changes: 2 additions & 0 deletions pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import timedelta
import functools
import gc
from io import StringIO
import json
import operator
import pickle
Expand Down Expand Up @@ -3249,6 +3250,7 @@ def to_csv(
formatter.save()

if path_or_buf is None:
assert isinstance(formatter.path_or_buf, StringIO)
return formatter.path_or_buf.getvalue()

return None
Expand Down
104 changes: 82 additions & 22 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
uses_params,
uses_relative,
)
import warnings
import zipfile

from pandas._typing import (
CompressionDict,
CompressionOptions,
EncodingVar,
FileOrBuffer,
FilePathOrBuffer,
IOargs,
ModeVar,
StorageOptions,
)
from pandas.compat import _get_lzma_file, _import_lzma
Expand Down Expand Up @@ -69,9 +74,7 @@ def is_url(url) -> bool:
return parse_url(url).scheme in _VALID_URLS


def _expand_user(
filepath_or_buffer: FilePathOrBuffer[AnyStr],
) -> FilePathOrBuffer[AnyStr]:
def _expand_user(filepath_or_buffer: FileOrBuffer[AnyStr]) -> FileOrBuffer[AnyStr]:
"""
Return the argument with an initial component of ~ or ~user
replaced by that user's home directory.
Expand Down Expand Up @@ -101,7 +104,7 @@ def validate_header_arg(header) -> None:

def stringify_path(
filepath_or_buffer: FilePathOrBuffer[AnyStr],
) -> FilePathOrBuffer[AnyStr]:
) -> FileOrBuffer[AnyStr]:
"""
Attempt to convert a path-like object to a string.

Expand Down Expand Up @@ -134,9 +137,9 @@ def stringify_path(
# "__fspath__" [union-attr]
# error: Item "IO[bytes]" of "Union[str, Path, IO[bytes]]" has no
# attribute "__fspath__" [union-attr]
return filepath_or_buffer.__fspath__() # type: ignore[union-attr]
filepath_or_buffer = filepath_or_buffer.__fspath__() # type: ignore[union-attr]
elif isinstance(filepath_or_buffer, pathlib.Path):
return str(filepath_or_buffer)
filepath_or_buffer = str(filepath_or_buffer)
return _expand_user(filepath_or_buffer)


Expand All @@ -162,13 +165,13 @@ def is_fsspec_url(url: FilePathOrBuffer) -> bool:
)


def get_filepath_or_buffer(
def get_filepath_or_buffer( # type: ignore[assignment]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my local mypy needs that for line 170 and 172 but the CI mypy needs it apparently at that line (TypeVars cannot have default values, could be fixed with @overload)

filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
encoding: EncodingVar = None,
compression: CompressionOptions = None,
mode: Optional[str] = None,
mode: ModeVar = None,
storage_options: StorageOptions = None,
):
) -> IOargs[ModeVar, EncodingVar]:
"""
If the filepath_or_buffer is a url, translate and return the buffer.
Otherwise passthrough.
Expand All @@ -191,14 +194,35 @@ def get_filepath_or_buffer(

.. versionadded:: 1.2.0

Returns
-------
Tuple[FilePathOrBuffer, str, CompressionOptions, bool]
Tuple containing the filepath or buffer, the encoding, the compression
and should_close.
..versionchange:: 1.2.0

Returns the dataclass IOargs.
"""
filepath_or_buffer = stringify_path(filepath_or_buffer)

# bz2 and xz do not write the byte order mark for utf-16 and utf-32
# print a warning when writing such files
compression_method = infer_compression(
filepath_or_buffer, get_compression_method(compression)[0]
)
if (
mode
and "w" in mode
and compression_method in ["bz2", "xz"]
and encoding in ["utf-16", "utf-32"]
):
warnings.warn(
f"{compression} will not write the byte order mark for {encoding}",
UnicodeWarning,
)

# Use binary mode when converting path-like objects to file-like objects (fsspec)
# except when text mode is explicitly requested. The original mode is returned if
# fsspec is not used.
fsspec_mode = mode or "rb"
if "t" not in fsspec_mode and "b" not in fsspec_mode:
fsspec_mode += "b"

if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
# TODO: fsspec can also handle HTTP via requests, but leaving this unchanged
if storage_options:
Expand All @@ -212,7 +236,13 @@ def get_filepath_or_buffer(
compression = "gzip"
reader = BytesIO(req.read())
req.close()
return reader, encoding, compression, True
return IOargs(
filepath_or_buffer=reader,
encoding=encoding,
compression=compression,
should_close=True,
mode=fsspec_mode,
)

if is_fsspec_url(filepath_or_buffer):
assert isinstance(
Expand Down Expand Up @@ -244,7 +274,7 @@ def get_filepath_or_buffer(

try:
file_obj = fsspec.open(
filepath_or_buffer, mode=mode or "rb", **(storage_options or {})
filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
).open()
# GH 34626 Reads from Public Buckets without Credentials needs anon=True
except tuple(err_types_to_retry_with_anon):
Expand All @@ -255,23 +285,41 @@ def get_filepath_or_buffer(
storage_options = dict(storage_options)
storage_options["anon"] = True
file_obj = fsspec.open(
filepath_or_buffer, mode=mode or "rb", **(storage_options or {})
filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
).open()

return file_obj, encoding, compression, True
return IOargs(
filepath_or_buffer=file_obj,
encoding=encoding,
compression=compression,
should_close=True,
mode=fsspec_mode,
)
elif storage_options:
raise ValueError(
"storage_options passed with file object or non-fsspec file path"
)

if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
return _expand_user(filepath_or_buffer), None, compression, False
return IOargs(
filepath_or_buffer=_expand_user(filepath_or_buffer),
encoding=encoding,
compression=compression,
should_close=False,
mode=mode,
)

if not is_file_like(filepath_or_buffer):
msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
raise ValueError(msg)

return filepath_or_buffer, None, compression, False
return IOargs(
filepath_or_buffer=filepath_or_buffer,
encoding=encoding,
compression=compression,
should_close=False,
mode=mode,
)


def file_path_to_url(path: str) -> str:
Expand Down Expand Up @@ -452,6 +500,15 @@ def get_handle(
need_text_wrapping = (BufferedIOBase, RawIOBase, S3File)
except ImportError:
need_text_wrapping = (BufferedIOBase, RawIOBase)
# fsspec is an optional dependency. If it is available, add its file-object
# class to the list of classes that need text wrapping. If fsspec is too old and is
# needed, get_filepath_or_buffer would already have thrown an exception.
try:
from fsspec.spec import AbstractFileSystem

need_text_wrapping = (*need_text_wrapping, AbstractFileSystem)
except ImportError:
pass

handles: List[Union[IO, _MMapWrapper]] = list()
f = path_or_buf
Expand Down Expand Up @@ -583,12 +640,15 @@ def __init__(
self.archive_name = archive_name
kwargs_zip: Dict[str, Any] = {"compression": zipfile.ZIP_DEFLATED}
kwargs_zip.update(kwargs)
super().__init__(file, mode, **kwargs_zip)
super().__init__(file, mode, **kwargs_zip) # type: ignore[arg-type]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complains about file being IOBase but we cannot have an assert not isisntance(file, IOBase) since io.StringIO inherits from IOBase


def write(self, data):
archive_name = self.filename
if self.archive_name is not None:
archive_name = self.archive_name
if archive_name is None:
# ZipFile needs a non-empty string
archive_name = "zip"
super().writestr(archive_name, data)

@property
Expand Down
4 changes: 2 additions & 2 deletions pandas/io/excel/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,9 @@ def __init__(self, filepath_or_buffer, storage_options: StorageOptions = None):
if is_url(filepath_or_buffer):
filepath_or_buffer = BytesIO(urlopen(filepath_or_buffer).read())
elif not isinstance(filepath_or_buffer, (ExcelFile, self._workbook_class)):
filepath_or_buffer, _, _, _ = get_filepath_or_buffer(
filepath_or_buffer = get_filepath_or_buffer(
filepath_or_buffer, storage_options=storage_options
)
).filepath_or_buffer

if isinstance(filepath_or_buffer, self._workbook_class):
self.book = filepath_or_buffer
Expand Down
23 changes: 13 additions & 10 deletions pandas/io/feather_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ def to_feather(df: DataFrame, path, storage_options: StorageOptions = None, **kw
import_optional_dependency("pyarrow")
from pyarrow import feather

path, _, _, should_close = get_filepath_or_buffer(
path, mode="wb", storage_options=storage_options
)
ioargs = get_filepath_or_buffer(path, mode="wb", storage_options=storage_options)

if not isinstance(df, DataFrame):
raise ValueError("feather only support IO with DataFrames")
Expand Down Expand Up @@ -74,7 +72,11 @@ def to_feather(df: DataFrame, path, storage_options: StorageOptions = None, **kw
if df.columns.inferred_type not in valid_types:
raise ValueError("feather must have string column names")

feather.write_feather(df, path, **kwargs)
feather.write_feather(df, ioargs.filepath_or_buffer, **kwargs)

if ioargs.should_close:
assert not isinstance(ioargs.filepath_or_buffer, str)
ioargs.filepath_or_buffer.close()


def read_feather(
Expand Down Expand Up @@ -122,14 +124,15 @@ def read_feather(
import_optional_dependency("pyarrow")
from pyarrow import feather

path, _, _, should_close = get_filepath_or_buffer(
path, storage_options=storage_options
)
ioargs = get_filepath_or_buffer(path, storage_options=storage_options)

df = feather.read_feather(path, columns=columns, use_threads=bool(use_threads))
df = feather.read_feather(
ioargs.filepath_or_buffer, columns=columns, use_threads=bool(use_threads)
)

# s3fs only validates the credentials when the file is closed.
if should_close:
path.close()
if ioargs.should_close:
assert not isinstance(ioargs.filepath_or_buffer, str)
ioargs.filepath_or_buffer.close()

return df
Loading