Skip to content

Commit 361166f

Browse files
authored
BUG/ENH: compression for google cloud storage in to_csv (#35681)
1 parent 76eb314 commit 361166f

20 files changed

+321
-130
lines changed

doc/source/whatsnew/v1.2.0.rst

+2
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ I/O
290290
- In :meth:`read_csv` `float_precision='round_trip'` now handles `decimal` and `thousands` parameters (:issue:`35365`)
291291
- :meth:`to_pickle` and :meth:`read_pickle` were closing user-provided file objects (:issue:`35679`)
292292
- :meth:`to_csv` passes compression arguments for `'gzip'` always to `gzip.GzipFile` (:issue:`28103`)
293+
- :meth:`to_csv` did not support zip compression for binary file object not having a filename (:issue: `35058`)
294+
- :meth:`to_csv` and :meth:`read_csv` did not honor `compression` and `encoding` for path-like objects that are internally converted to file-like objects (:issue:`35677`, :issue:`26124`, and :issue:`32392`)
293295

294296
Plotting
295297
^^^^^^^^

pandas/_typing.py

+28-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
from dataclasses import dataclass
12
from datetime import datetime, timedelta, tzinfo
3+
from io import IOBase
24
from pathlib import Path
35
from typing import (
46
IO,
@@ -8,6 +10,7 @@
810
Callable,
911
Collection,
1012
Dict,
13+
Generic,
1114
Hashable,
1215
List,
1316
Mapping,
@@ -62,7 +65,8 @@
6265
"ExtensionDtype", str, np.dtype, Type[Union[str, float, int, complex, bool]]
6366
]
6467
DtypeObj = Union[np.dtype, "ExtensionDtype"]
65-
FilePathOrBuffer = Union[str, Path, IO[AnyStr]]
68+
FilePathOrBuffer = Union[str, Path, IO[AnyStr], IOBase]
69+
FileOrBuffer = Union[str, IO[AnyStr], IOBase]
6670

6771
# FrameOrSeriesUnion means either a DataFrame or a Series. E.g.
6872
# `def func(a: FrameOrSeriesUnion) -> FrameOrSeriesUnion: ...` means that if a Series
@@ -114,3 +118,26 @@
114118
# compression keywords and compression
115119
CompressionDict = Mapping[str, Optional[Union[str, int, bool]]]
116120
CompressionOptions = Optional[Union[str, CompressionDict]]
121+
122+
123+
# let's bind types
124+
ModeVar = TypeVar("ModeVar", str, None, Optional[str])
125+
EncodingVar = TypeVar("EncodingVar", str, None, Optional[str])
126+
127+
128+
@dataclass
129+
class IOargs(Generic[ModeVar, EncodingVar]):
130+
"""
131+
Return value of io/common.py:get_filepath_or_buffer.
132+
133+
Note (copy&past from io/parsers):
134+
filepath_or_buffer can be Union[FilePathOrBuffer, s3fs.S3File, gcsfs.GCSFile]
135+
though mypy handling of conditional imports is difficult.
136+
See https://github.com/python/mypy/issues/1297
137+
"""
138+
139+
filepath_or_buffer: FileOrBuffer
140+
encoding: EncodingVar
141+
compression: CompressionOptions
142+
should_close: bool
143+
mode: Union[ModeVar, str]

pandas/core/frame.py

+5-8
Original file line numberDiff line numberDiff line change
@@ -2284,14 +2284,11 @@ def to_markdown(
22842284
result = tabulate.tabulate(self, **kwargs)
22852285
if buf is None:
22862286
return result
2287-
buf, _, _, should_close = get_filepath_or_buffer(
2288-
buf, mode=mode, storage_options=storage_options
2289-
)
2290-
assert buf is not None # Help mypy.
2291-
assert not isinstance(buf, str)
2292-
buf.writelines(result)
2293-
if should_close:
2294-
buf.close()
2287+
ioargs = get_filepath_or_buffer(buf, mode=mode, storage_options=storage_options)
2288+
assert not isinstance(ioargs.filepath_or_buffer, str)
2289+
ioargs.filepath_or_buffer.writelines(result)
2290+
if ioargs.should_close:
2291+
ioargs.filepath_or_buffer.close()
22952292
return None
22962293

22972294
@deprecate_kwarg(old_arg_name="fname", new_arg_name="path")

pandas/core/generic.py

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import timedelta
55
import functools
66
import gc
7+
from io import StringIO
78
import json
89
import operator
910
import pickle
@@ -3357,6 +3358,7 @@ def to_csv(
33573358
formatter.save()
33583359

33593360
if path_or_buf is None:
3361+
assert isinstance(formatter.path_or_buf, StringIO)
33603362
return formatter.path_or_buf.getvalue()
33613363

33623364
return None

pandas/io/common.py

+82-22
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@
2727
uses_params,
2828
uses_relative,
2929
)
30+
import warnings
3031
import zipfile
3132

3233
from pandas._typing import (
3334
CompressionDict,
3435
CompressionOptions,
36+
EncodingVar,
37+
FileOrBuffer,
3538
FilePathOrBuffer,
39+
IOargs,
40+
ModeVar,
3641
StorageOptions,
3742
)
3843
from pandas.compat import _get_lzma_file, _import_lzma
@@ -69,9 +74,7 @@ def is_url(url) -> bool:
6974
return parse_url(url).scheme in _VALID_URLS
7075

7176

72-
def _expand_user(
73-
filepath_or_buffer: FilePathOrBuffer[AnyStr],
74-
) -> FilePathOrBuffer[AnyStr]:
77+
def _expand_user(filepath_or_buffer: FileOrBuffer[AnyStr]) -> FileOrBuffer[AnyStr]:
7578
"""
7679
Return the argument with an initial component of ~ or ~user
7780
replaced by that user's home directory.
@@ -101,7 +104,7 @@ def validate_header_arg(header) -> None:
101104

102105
def stringify_path(
103106
filepath_or_buffer: FilePathOrBuffer[AnyStr],
104-
) -> FilePathOrBuffer[AnyStr]:
107+
) -> FileOrBuffer[AnyStr]:
105108
"""
106109
Attempt to convert a path-like object to a string.
107110
@@ -134,9 +137,9 @@ def stringify_path(
134137
# "__fspath__" [union-attr]
135138
# error: Item "IO[bytes]" of "Union[str, Path, IO[bytes]]" has no
136139
# attribute "__fspath__" [union-attr]
137-
return filepath_or_buffer.__fspath__() # type: ignore[union-attr]
140+
filepath_or_buffer = filepath_or_buffer.__fspath__() # type: ignore[union-attr]
138141
elif isinstance(filepath_or_buffer, pathlib.Path):
139-
return str(filepath_or_buffer)
142+
filepath_or_buffer = str(filepath_or_buffer)
140143
return _expand_user(filepath_or_buffer)
141144

142145

@@ -162,13 +165,13 @@ def is_fsspec_url(url: FilePathOrBuffer) -> bool:
162165
)
163166

164167

165-
def get_filepath_or_buffer(
168+
def get_filepath_or_buffer( # type: ignore[assignment]
166169
filepath_or_buffer: FilePathOrBuffer,
167-
encoding: Optional[str] = None,
170+
encoding: EncodingVar = None,
168171
compression: CompressionOptions = None,
169-
mode: Optional[str] = None,
172+
mode: ModeVar = None,
170173
storage_options: StorageOptions = None,
171-
):
174+
) -> IOargs[ModeVar, EncodingVar]:
172175
"""
173176
If the filepath_or_buffer is a url, translate and return the buffer.
174177
Otherwise passthrough.
@@ -191,14 +194,35 @@ def get_filepath_or_buffer(
191194
192195
.. versionadded:: 1.2.0
193196
194-
Returns
195-
-------
196-
Tuple[FilePathOrBuffer, str, CompressionOptions, bool]
197-
Tuple containing the filepath or buffer, the encoding, the compression
198-
and should_close.
197+
..versionchange:: 1.2.0
198+
199+
Returns the dataclass IOargs.
199200
"""
200201
filepath_or_buffer = stringify_path(filepath_or_buffer)
201202

203+
# bz2 and xz do not write the byte order mark for utf-16 and utf-32
204+
# print a warning when writing such files
205+
compression_method = infer_compression(
206+
filepath_or_buffer, get_compression_method(compression)[0]
207+
)
208+
if (
209+
mode
210+
and "w" in mode
211+
and compression_method in ["bz2", "xz"]
212+
and encoding in ["utf-16", "utf-32"]
213+
):
214+
warnings.warn(
215+
f"{compression} will not write the byte order mark for {encoding}",
216+
UnicodeWarning,
217+
)
218+
219+
# Use binary mode when converting path-like objects to file-like objects (fsspec)
220+
# except when text mode is explicitly requested. The original mode is returned if
221+
# fsspec is not used.
222+
fsspec_mode = mode or "rb"
223+
if "t" not in fsspec_mode and "b" not in fsspec_mode:
224+
fsspec_mode += "b"
225+
202226
if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
203227
# TODO: fsspec can also handle HTTP via requests, but leaving this unchanged
204228
if storage_options:
@@ -212,7 +236,13 @@ def get_filepath_or_buffer(
212236
compression = "gzip"
213237
reader = BytesIO(req.read())
214238
req.close()
215-
return reader, encoding, compression, True
239+
return IOargs(
240+
filepath_or_buffer=reader,
241+
encoding=encoding,
242+
compression=compression,
243+
should_close=True,
244+
mode=fsspec_mode,
245+
)
216246

217247
if is_fsspec_url(filepath_or_buffer):
218248
assert isinstance(
@@ -244,7 +274,7 @@ def get_filepath_or_buffer(
244274

245275
try:
246276
file_obj = fsspec.open(
247-
filepath_or_buffer, mode=mode or "rb", **(storage_options or {})
277+
filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
248278
).open()
249279
# GH 34626 Reads from Public Buckets without Credentials needs anon=True
250280
except tuple(err_types_to_retry_with_anon):
@@ -255,23 +285,41 @@ def get_filepath_or_buffer(
255285
storage_options = dict(storage_options)
256286
storage_options["anon"] = True
257287
file_obj = fsspec.open(
258-
filepath_or_buffer, mode=mode or "rb", **(storage_options or {})
288+
filepath_or_buffer, mode=fsspec_mode, **(storage_options or {})
259289
).open()
260290

261-
return file_obj, encoding, compression, True
291+
return IOargs(
292+
filepath_or_buffer=file_obj,
293+
encoding=encoding,
294+
compression=compression,
295+
should_close=True,
296+
mode=fsspec_mode,
297+
)
262298
elif storage_options:
263299
raise ValueError(
264300
"storage_options passed with file object or non-fsspec file path"
265301
)
266302

267303
if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
268-
return _expand_user(filepath_or_buffer), None, compression, False
304+
return IOargs(
305+
filepath_or_buffer=_expand_user(filepath_or_buffer),
306+
encoding=encoding,
307+
compression=compression,
308+
should_close=False,
309+
mode=mode,
310+
)
269311

270312
if not is_file_like(filepath_or_buffer):
271313
msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}"
272314
raise ValueError(msg)
273315

274-
return filepath_or_buffer, None, compression, False
316+
return IOargs(
317+
filepath_or_buffer=filepath_or_buffer,
318+
encoding=encoding,
319+
compression=compression,
320+
should_close=False,
321+
mode=mode,
322+
)
275323

276324

277325
def file_path_to_url(path: str) -> str:
@@ -452,6 +500,15 @@ def get_handle(
452500
need_text_wrapping = (BufferedIOBase, RawIOBase, S3File)
453501
except ImportError:
454502
need_text_wrapping = (BufferedIOBase, RawIOBase)
503+
# fsspec is an optional dependency. If it is available, add its file-object
504+
# class to the list of classes that need text wrapping. If fsspec is too old and is
505+
# needed, get_filepath_or_buffer would already have thrown an exception.
506+
try:
507+
from fsspec.spec import AbstractFileSystem
508+
509+
need_text_wrapping = (*need_text_wrapping, AbstractFileSystem)
510+
except ImportError:
511+
pass
455512

456513
handles: List[Union[IO, _MMapWrapper]] = list()
457514
f = path_or_buf
@@ -583,12 +640,15 @@ def __init__(
583640
self.archive_name = archive_name
584641
kwargs_zip: Dict[str, Any] = {"compression": zipfile.ZIP_DEFLATED}
585642
kwargs_zip.update(kwargs)
586-
super().__init__(file, mode, **kwargs_zip)
643+
super().__init__(file, mode, **kwargs_zip) # type: ignore[arg-type]
587644

588645
def write(self, data):
589646
archive_name = self.filename
590647
if self.archive_name is not None:
591648
archive_name = self.archive_name
649+
if archive_name is None:
650+
# ZipFile needs a non-empty string
651+
archive_name = "zip"
592652
super().writestr(archive_name, data)
593653

594654
@property

pandas/io/excel/_base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -352,9 +352,9 @@ def __init__(self, filepath_or_buffer, storage_options: StorageOptions = None):
352352
if is_url(filepath_or_buffer):
353353
filepath_or_buffer = BytesIO(urlopen(filepath_or_buffer).read())
354354
elif not isinstance(filepath_or_buffer, (ExcelFile, self._workbook_class)):
355-
filepath_or_buffer, _, _, _ = get_filepath_or_buffer(
355+
filepath_or_buffer = get_filepath_or_buffer(
356356
filepath_or_buffer, storage_options=storage_options
357-
)
357+
).filepath_or_buffer
358358

359359
if isinstance(filepath_or_buffer, self._workbook_class):
360360
self.book = filepath_or_buffer

pandas/io/feather_format.py

+13-10
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ def to_feather(df: DataFrame, path, storage_options: StorageOptions = None, **kw
3434
import_optional_dependency("pyarrow")
3535
from pyarrow import feather
3636

37-
path, _, _, should_close = get_filepath_or_buffer(
38-
path, mode="wb", storage_options=storage_options
39-
)
37+
ioargs = get_filepath_or_buffer(path, mode="wb", storage_options=storage_options)
4038

4139
if not isinstance(df, DataFrame):
4240
raise ValueError("feather only support IO with DataFrames")
@@ -74,7 +72,11 @@ def to_feather(df: DataFrame, path, storage_options: StorageOptions = None, **kw
7472
if df.columns.inferred_type not in valid_types:
7573
raise ValueError("feather must have string column names")
7674

77-
feather.write_feather(df, path, **kwargs)
75+
feather.write_feather(df, ioargs.filepath_or_buffer, **kwargs)
76+
77+
if ioargs.should_close:
78+
assert not isinstance(ioargs.filepath_or_buffer, str)
79+
ioargs.filepath_or_buffer.close()
7880

7981

8082
def read_feather(
@@ -122,14 +124,15 @@ def read_feather(
122124
import_optional_dependency("pyarrow")
123125
from pyarrow import feather
124126

125-
path, _, _, should_close = get_filepath_or_buffer(
126-
path, storage_options=storage_options
127-
)
127+
ioargs = get_filepath_or_buffer(path, storage_options=storage_options)
128128

129-
df = feather.read_feather(path, columns=columns, use_threads=bool(use_threads))
129+
df = feather.read_feather(
130+
ioargs.filepath_or_buffer, columns=columns, use_threads=bool(use_threads)
131+
)
130132

131133
# s3fs only validates the credentials when the file is closed.
132-
if should_close:
133-
path.close()
134+
if ioargs.should_close:
135+
assert not isinstance(ioargs.filepath_or_buffer, str)
136+
ioargs.filepath_or_buffer.close()
134137

135138
return df

0 commit comments

Comments
 (0)