diff --git a/doc/source/user_guide/io.rst b/doc/source/user_guide/io.rst index ab233f653061a..35403b5c8b66f 100644 --- a/doc/source/user_guide/io.rst +++ b/doc/source/user_guide/io.rst @@ -1649,29 +1649,72 @@ options include: Specifying any of the above options will produce a ``ParserWarning`` unless the python engine is selected explicitly using ``engine='python'``. -Reading remote files -'''''''''''''''''''' +.. _io.remote: + +Reading/writing remote files +'''''''''''''''''''''''''''' -You can pass in a URL to a CSV file: +You can pass in a URL to read or write remote files to many of Pandas' IO +functions - the following example shows reading a CSV file: .. code-block:: python df = pd.read_csv('https://download.bls.gov/pub/time.series/cu/cu.item', sep='\t') -S3 URLs are handled as well but require installing the `S3Fs +All URLs which are not local files or HTTP(s) are handled by +`fsspec`_, if installed, and its various filesystem implementations +(including Amazon S3, Google Cloud, SSH, FTP, webHDFS...). +Some of these implementations will require additional packages to be +installed, for example +S3 URLs require the `s3fs `_ library: .. code-block:: python - df = pd.read_csv('s3://pandas-test/tips.csv') + df = pd.read_json('s3://pandas-test/adatafile.json') + +When dealing with remote storage systems, you might need +extra configuration with environment variables or config files in +special locations. For example, to access data in your S3 bucket, +you will need to define credentials in one of the several ways listed in +the `S3Fs documentation +`_. The same is true +for several of the storage backends, and you should follow the links +at `fsimpl1`_ for implementations built into ``fsspec`` and `fsimpl2`_ +for those not included in the main ``fsspec`` +distribution. + +You can also pass parameters directly to the backend driver. For example, +if you do *not* have S3 credentials, you can still access public data by +specifying an anonymous connection, such as + +.. versionadded:: 1.2.0 + +.. code-block:: python + + pd.read_csv("s3://ncei-wcsd-archive/data/processed/SH1305/18kHz/SaKe2013" + "-D20130523-T080854_to_SaKe2013-D20130523-T085643.csv", + storage_options={"anon": True}) + +``fsspec`` also allows complex URLs, for accessing data in compressed +archives, local caching of files, and more. To locally cache the above +example, you would modify the call to + +.. code-block:: python -If your S3 bucket requires credentials you will need to set them as environment -variables or in the ``~/.aws/credentials`` config file, refer to the `S3Fs -documentation on credentials -`_. + pd.read_csv("simplecache::s3://ncei-wcsd-archive/data/processed/SH1305/18kHz/" + "SaKe2013-D20130523-T080854_to_SaKe2013-D20130523-T085643.csv", + storage_options={"s3": {"anon": True}}) +where we specify that the "anon" parameter is meant for the "s3" part of +the implementation, not to the caching implementation. Note that this caches to a temporary +directory for the duration of the session only, but you can also specify +a permanent store. +.. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/ +.. _fsimpl1: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations +.. _fsimpl2: https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations Writing out data '''''''''''''''' diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 15616f4a6f27c..c0182654e97fd 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -13,6 +13,20 @@ including other versions of pandas. Enhancements ~~~~~~~~~~~~ +Passing arguments to fsspec backends +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Many read/write functions have acquired the ``storage_options`` optional argument, +to pass a dictionary of parameters to the storage backend. This allows, for +example, for passing credentials to S3 and GCS storage. The details of what +parameters can be passed to which backends can be found in the documentation +of the individual storage backends (detailed from the fsspec docs for +`builtin implementations`_ and linked to `external ones`_). See +Section :ref:`io.remote`. + +.. _builtin implementations: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations +.. _external ones: https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations + .. _whatsnew_120.binary_handle_to_csv: Support for binary file handles in ``to_csv`` diff --git a/pandas/_typing.py b/pandas/_typing.py index 76ec527e6e258..47a102ddc70e0 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -106,3 +106,6 @@ List[AggFuncTypeBase], Dict[Label, Union[AggFuncTypeBase, List[AggFuncTypeBase]]], ] + +# for arbitrary kwargs passed during reading/writing files +StorageOptions = Optional[Dict[str, Any]] diff --git a/pandas/conftest.py b/pandas/conftest.py index e0adb37e7d2f5..10124b09ae249 100644 --- a/pandas/conftest.py +++ b/pandas/conftest.py @@ -1224,3 +1224,25 @@ def sort_by_key(request): Tests None (no key) and the identity key. """ return request.param + + +@pytest.fixture() +def fsspectest(): + pytest.importorskip("fsspec") + from fsspec import register_implementation + from fsspec.implementations.memory import MemoryFileSystem + from fsspec.registry import _registry as registry + + class TestMemoryFS(MemoryFileSystem): + protocol = "testmem" + test = [None] + + def __init__(self, **kwargs): + self.test[0] = kwargs.pop("test", None) + super().__init__(**kwargs) + + register_implementation("testmem", TestMemoryFS, clobber=True) + yield TestMemoryFS() + registry.pop("testmem", None) + TestMemoryFS.test[0] = None + TestMemoryFS.store.clear() diff --git a/pandas/core/frame.py b/pandas/core/frame.py index aabdac16e9a1a..39fcf1a3fd573 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -55,6 +55,7 @@ Label, Level, Renamer, + StorageOptions, ValueKeyFunc, ) from pandas.compat import PY37 @@ -2056,6 +2057,7 @@ def to_stata( version: Optional[int] = 114, convert_strl: Optional[Sequence[Label]] = None, compression: Union[str, Mapping[str, str], None] = "infer", + storage_options: StorageOptions = None, ) -> None: """ Export DataFrame object to Stata dta format. @@ -2132,6 +2134,16 @@ def to_stata( .. versionadded:: 1.1.0 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values. + + .. versionadded:: 1.2.0 + Raises ------ NotImplementedError @@ -2192,6 +2204,7 @@ def to_stata( write_index=write_index, variable_labels=variable_labels, compression=compression, + storage_options=storage_options, **kwargs, ) writer.write_file() @@ -2244,9 +2257,10 @@ def to_feather(self, path, **kwargs) -> None: ) def to_markdown( self, - buf: Optional[IO[str]] = None, - mode: Optional[str] = None, + buf: Optional[Union[IO[str], str]] = None, + mode: str = "wt", index: bool = True, + storage_options: StorageOptions = None, **kwargs, ) -> Optional[str]: if "showindex" in kwargs: @@ -2264,9 +2278,14 @@ def to_markdown( result = tabulate.tabulate(self, **kwargs) if buf is None: return result - buf, _, _, _ = get_filepath_or_buffer(buf, mode=mode) + buf, _, _, should_close = get_filepath_or_buffer( + buf, mode=mode, storage_options=storage_options + ) assert buf is not None # Help mypy. + assert not isinstance(buf, str) buf.writelines(result) + if should_close: + buf.close() return None @deprecate_kwarg(old_arg_name="fname", new_arg_name="path") @@ -2277,6 +2296,7 @@ def to_parquet( compression: Optional[str] = "snappy", index: Optional[bool] = None, partition_cols: Optional[List[str]] = None, + storage_options: StorageOptions = None, **kwargs, ) -> None: """ @@ -2325,6 +2345,16 @@ def to_parquet( .. versionadded:: 0.24.0 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + **kwargs Additional arguments passed to the parquet library. See :ref:`pandas io ` for more details. @@ -2371,6 +2401,7 @@ def to_parquet( compression=compression, index=index, partition_cols=partition_cols, + storage_options=storage_options, **kwargs, ) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 843b602a12823..5ac8a6b165171 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -40,6 +40,7 @@ Label, Level, Renamer, + StorageOptions, TimedeltaConvertibleTypes, TimestampConvertibleTypes, ValueKeyFunc, @@ -2042,6 +2043,7 @@ def to_json( compression: Optional[str] = "infer", index: bool_t = True, indent: Optional[int] = None, + storage_options: StorageOptions = None, ) -> Optional[str]: """ Convert the object to a JSON string. @@ -2125,6 +2127,16 @@ def to_json( .. versionadded:: 1.0.0 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + Returns ------- None or str @@ -2303,6 +2315,7 @@ def to_json( compression=compression, index=index, indent=indent, + storage_options=storage_options, ) def to_hdf( @@ -2617,6 +2630,7 @@ def to_pickle( path, compression: Optional[str] = "infer", protocol: int = pickle.HIGHEST_PROTOCOL, + storage_options: StorageOptions = None, ) -> None: """ Pickle (serialize) object to file. @@ -2637,6 +2651,16 @@ def to_pickle( .. [1] https://docs.python.org/3/library/pickle.html. + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + See Also -------- read_pickle : Load pickled pandas object (or any object) from file. @@ -2670,7 +2694,13 @@ def to_pickle( """ from pandas.io.pickle import to_pickle - to_pickle(self, path, compression=compression, protocol=protocol) + to_pickle( + self, + path, + compression=compression, + protocol=protocol, + storage_options=storage_options, + ) def to_clipboard( self, excel: bool_t = True, sep: Optional[str] = None, **kwargs @@ -3015,6 +3045,7 @@ def to_csv( escapechar: Optional[str] = None, decimal: Optional[str] = ".", errors: str = "strict", + storage_options: StorageOptions = None, ) -> Optional[str]: r""" Write object to a comma-separated values (csv) file. @@ -3126,6 +3157,16 @@ def to_csv( .. versionadded:: 1.1.0 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + Returns ------- None or str @@ -3178,6 +3219,7 @@ def to_csv( doublequote=doublequote, escapechar=escapechar, decimal=decimal, + storage_options=storage_options, ) formatter.save() diff --git a/pandas/core/series.py b/pandas/core/series.py index 9e70120f67969..37558ed5e99a3 100644 --- a/pandas/core/series.py +++ b/pandas/core/series.py @@ -31,6 +31,7 @@ FrameOrSeriesUnion, IndexKeyFunc, Label, + StorageOptions, ValueKeyFunc, ) from pandas.compat.numpy import function as nv @@ -1422,8 +1423,9 @@ def to_string( def to_markdown( self, buf: Optional[IO[str]] = None, - mode: Optional[str] = None, + mode: str = "wt", index: bool = True, + storage_options: StorageOptions = None, **kwargs, ) -> Optional[str]: """ @@ -1436,12 +1438,22 @@ def to_markdown( buf : str, Path or StringIO-like, optional, default None Buffer to write to. If None, the output is returned as a string. mode : str, optional - Mode in which file is opened. + Mode in which file is opened, "wt" by default. index : bool, optional, default True Add index (row) labels. .. versionadded:: 1.1.0 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + **kwargs These parameters will be passed to `tabulate \ `_. @@ -1477,7 +1489,9 @@ def to_markdown( | 3 | quetzal | +----+----------+ """ - return self.to_frame().to_markdown(buf, mode, index, **kwargs) + return self.to_frame().to_markdown( + buf, mode, index, storage_options=storage_options, **kwargs + ) # ---------------------------------------------------------------------- diff --git a/pandas/io/common.py b/pandas/io/common.py index 34e4425c657f1..9ac642e58b544 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -29,7 +29,7 @@ ) import zipfile -from pandas._typing import FilePathOrBuffer +from pandas._typing import FilePathOrBuffer, StorageOptions from pandas.compat import _get_lzma_file, _import_lzma from pandas.compat._optional import import_optional_dependency @@ -162,7 +162,7 @@ def get_filepath_or_buffer( encoding: Optional[str] = None, compression: Optional[str] = None, mode: Optional[str] = None, - storage_options: Optional[Dict[str, Any]] = None, + storage_options: StorageOptions = None, ): """ If the filepath_or_buffer is a url, translate and return the buffer. @@ -175,8 +175,16 @@ def get_filepath_or_buffer( compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional encoding : the encoding to use to decode bytes, default is 'utf-8' mode : str, optional - storage_options: dict, optional - passed on to fsspec, if using it; this is not yet accessed by the public API + + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 Returns ------- @@ -188,6 +196,10 @@ def get_filepath_or_buffer( if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): # TODO: fsspec can also handle HTTP via requests, but leaving this unchanged + if storage_options: + raise ValueError( + "storage_options passed with file object or non-fsspec file path" + ) req = urlopen(filepath_or_buffer) content_encoding = req.headers.get("Content-Encoding", None) if content_encoding == "gzip": @@ -242,6 +254,10 @@ def get_filepath_or_buffer( ).open() return file_obj, encoding, compression, True + elif storage_options: + raise ValueError( + "storage_options passed with file object or non-fsspec file path" + ) if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): return _expand_user(filepath_or_buffer), None, compression, False diff --git a/pandas/io/feather_format.py b/pandas/io/feather_format.py index dfa43942fc8b3..2c664e73b9463 100644 --- a/pandas/io/feather_format.py +++ b/pandas/io/feather_format.py @@ -4,10 +4,10 @@ from pandas import DataFrame, Int64Index, RangeIndex -from pandas.io.common import get_filepath_or_buffer, stringify_path +from pandas.io.common import get_filepath_or_buffer -def to_feather(df: DataFrame, path, **kwargs): +def to_feather(df: DataFrame, path, storage_options=None, **kwargs): """ Write a DataFrame to the binary Feather format. @@ -15,6 +15,17 @@ def to_feather(df: DataFrame, path, **kwargs): ---------- df : DataFrame path : string file path, or file-like object + + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + **kwargs : Additional keywords passed to `pyarrow.feather.write_feather`. @@ -23,7 +34,9 @@ def to_feather(df: DataFrame, path, **kwargs): import_optional_dependency("pyarrow") from pyarrow import feather - path = stringify_path(path) + path, _, _, should_close = get_filepath_or_buffer( + path, mode="wb", storage_options=storage_options + ) if not isinstance(df, DataFrame): raise ValueError("feather only support IO with DataFrames") @@ -64,7 +77,7 @@ def to_feather(df: DataFrame, path, **kwargs): feather.write_feather(df, path, **kwargs) -def read_feather(path, columns=None, use_threads: bool = True): +def read_feather(path, columns=None, use_threads: bool = True, storage_options=None): """ Load a feather-format object from the file path. @@ -98,7 +111,9 @@ def read_feather(path, columns=None, use_threads: bool = True): import_optional_dependency("pyarrow") from pyarrow import feather - path, _, _, should_close = get_filepath_or_buffer(path) + path, _, _, should_close = get_filepath_or_buffer( + path, storage_options=storage_options + ) df = feather.read_feather(path, columns=columns, use_threads=bool(use_threads)) diff --git a/pandas/io/formats/csvs.py b/pandas/io/formats/csvs.py index b10946a20d041..6eceb94387171 100644 --- a/pandas/io/formats/csvs.py +++ b/pandas/io/formats/csvs.py @@ -11,7 +11,7 @@ import numpy as np from pandas._libs import writers as libwriters -from pandas._typing import FilePathOrBuffer +from pandas._typing import FilePathOrBuffer, StorageOptions from pandas.core.dtypes.generic import ( ABCDatetimeIndex, @@ -53,6 +53,7 @@ def __init__( doublequote: bool = True, escapechar: Optional[str] = None, decimal=".", + storage_options: StorageOptions = None, ): self.obj = obj @@ -63,7 +64,11 @@ def __init__( compression, self.compression_args = get_compression_method(compression) self.path_or_buf, _, _, self.should_close = get_filepath_or_buffer( - path_or_buf, encoding=encoding, compression=compression, mode=mode + path_or_buf, + encoding=encoding, + compression=compression, + mode=mode, + storage_options=storage_options, ) self.sep = sep self.na_rep = na_rep diff --git a/pandas/io/json/_json.py b/pandas/io/json/_json.py index 0b06a26d4aa3c..0d2b351926343 100644 --- a/pandas/io/json/_json.py +++ b/pandas/io/json/_json.py @@ -9,7 +9,7 @@ import pandas._libs.json as json from pandas._libs.tslibs import iNaT -from pandas._typing import JSONSerializable +from pandas._typing import JSONSerializable, StorageOptions from pandas.errors import AbstractMethodError from pandas.util._decorators import deprecate_kwarg, deprecate_nonkeyword_arguments @@ -44,6 +44,7 @@ def to_json( compression: Optional[str] = "infer", index: bool = True, indent: int = 0, + storage_options: StorageOptions = None, ): if not index and orient not in ["split", "table"]: @@ -52,8 +53,11 @@ def to_json( ) if path_or_buf is not None: - path_or_buf, _, _, _ = get_filepath_or_buffer( - path_or_buf, compression=compression, mode="w" + path_or_buf, _, _, should_close = get_filepath_or_buffer( + path_or_buf, + compression=compression, + mode="wt", + storage_options=storage_options, ) if lines and orient != "records": @@ -97,6 +101,8 @@ def to_json( return s else: path_or_buf.write(s) + if should_close: + path_or_buf.close() class Writer: @@ -365,6 +371,7 @@ def read_json( chunksize: Optional[int] = None, compression="infer", nrows: Optional[int] = None, + storage_options: StorageOptions = None, ): """ Convert a JSON string to pandas object. @@ -510,6 +517,16 @@ def read_json( .. versionadded:: 1.1 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + Returns ------- Series or DataFrame @@ -592,7 +609,10 @@ def read_json( compression = infer_compression(path_or_buf, compression) filepath_or_buffer, _, compression, should_close = get_filepath_or_buffer( - path_or_buf, encoding=encoding, compression=compression + path_or_buf, + encoding=encoding, + compression=compression, + storage_options=storage_options, ) json_reader = JsonReader( diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 8c4b63767ac06..7f0eef039a1e8 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -3,7 +3,7 @@ from typing import Any, AnyStr, Dict, List, Optional from warnings import catch_warnings -from pandas._typing import FilePathOrBuffer +from pandas._typing import FilePathOrBuffer, StorageOptions from pandas.compat._optional import import_optional_dependency from pandas.errors import AbstractMethodError @@ -89,6 +89,7 @@ def write( path: FilePathOrBuffer[AnyStr], compression: Optional[str] = "snappy", index: Optional[bool] = None, + storage_options: StorageOptions = None, partition_cols: Optional[List[str]] = None, **kwargs, ): @@ -105,9 +106,13 @@ def write( import_optional_dependency("fsspec") import fsspec.core - fs, path = fsspec.core.url_to_fs(path) + fs, path = fsspec.core.url_to_fs(path, **(storage_options or {})) kwargs["filesystem"] = fs else: + if storage_options: + raise ValueError( + "storage_options passed with file object or non-fsspec file path" + ) path = _expand_user(path) if partition_cols is not None: # writes to multiple files under the given path @@ -122,14 +127,20 @@ def write( # write to single output file self.api.parquet.write_table(table, path, compression=compression, **kwargs) - def read(self, path, columns=None, **kwargs): + def read( + self, path, columns=None, storage_options: StorageOptions = None, **kwargs, + ): if is_fsspec_url(path) and "filesystem" not in kwargs: import_optional_dependency("fsspec") import fsspec.core - fs, path = fsspec.core.url_to_fs(path) + fs, path = fsspec.core.url_to_fs(path, **(storage_options or {})) should_close = False else: + if storage_options: + raise ValueError( + "storage_options passed with buffer or non-fsspec filepath" + ) fs = kwargs.pop("filesystem", None) should_close = False path = _expand_user(path) @@ -163,6 +174,7 @@ def write( compression="snappy", index=None, partition_cols=None, + storage_options: StorageOptions = None, **kwargs, ): self.validate_dataframe(df) @@ -185,8 +197,14 @@ def write( fsspec = import_optional_dependency("fsspec") # if filesystem is provided by fsspec, file must be opened in 'wb' mode. - kwargs["open_with"] = lambda path, _: fsspec.open(path, "wb").open() + kwargs["open_with"] = lambda path, _: fsspec.open( + path, "wb", **(storage_options or {}) + ).open() else: + if storage_options: + raise ValueError( + "storage_options passed with file object or non-fsspec file path" + ) path, _, _, _ = get_filepath_or_buffer(path) with catch_warnings(record=True): @@ -199,11 +217,15 @@ def write( **kwargs, ) - def read(self, path, columns=None, **kwargs): + def read( + self, path, columns=None, storage_options: StorageOptions = None, **kwargs, + ): if is_fsspec_url(path): fsspec = import_optional_dependency("fsspec") - open_with = lambda path, _: fsspec.open(path, "rb").open() + open_with = lambda path, _: fsspec.open( + path, "rb", **(storage_options or {}) + ).open() parquet_file = self.api.ParquetFile(path, open_with=open_with) else: path, _, _, _ = get_filepath_or_buffer(path) @@ -218,6 +240,7 @@ def to_parquet( engine: str = "auto", compression: Optional[str] = "snappy", index: Optional[bool] = None, + storage_options: StorageOptions = None, partition_cols: Optional[List[str]] = None, **kwargs, ): @@ -261,6 +284,16 @@ def to_parquet( .. versionadded:: 0.24.0 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + kwargs Additional keyword arguments passed to the engine """ @@ -273,6 +306,7 @@ def to_parquet( compression=compression, index=index, partition_cols=partition_cols, + storage_options=storage_options, **kwargs, ) diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index d4f346f8c1087..9dc0e1f71d13b 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -420,6 +420,7 @@ def _validate_names(names): def _read(filepath_or_buffer: FilePathOrBuffer, kwds): """Generic reader of line files.""" encoding = kwds.get("encoding", None) + storage_options = kwds.get("storage_options", None) if encoding is not None: encoding = re.sub("_", "-", encoding).lower() kwds["encoding"] = encoding @@ -432,7 +433,7 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds): # though mypy handling of conditional imports is difficult. # See https://github.com/python/mypy/issues/1297 fp_or_buf, _, compression, should_close = get_filepath_or_buffer( - filepath_or_buffer, encoding, compression + filepath_or_buffer, encoding, compression, storage_options=storage_options ) kwds["compression"] = compression @@ -595,6 +596,7 @@ def read_csv( low_memory=_c_parser_defaults["low_memory"], memory_map=False, float_precision=None, + storage_options=None, ): # gh-23761 # @@ -681,6 +683,7 @@ def read_csv( mangle_dupe_cols=mangle_dupe_cols, infer_datetime_format=infer_datetime_format, skip_blank_lines=skip_blank_lines, + storage_options=storage_options, ) return _read(filepath_or_buffer, kwds) diff --git a/pandas/io/pickle.py b/pandas/io/pickle.py index 3b35b54a6dc16..549d55e65546d 100644 --- a/pandas/io/pickle.py +++ b/pandas/io/pickle.py @@ -3,7 +3,7 @@ from typing import Any, Optional import warnings -from pandas._typing import FilePathOrBuffer +from pandas._typing import FilePathOrBuffer, StorageOptions from pandas.compat import pickle_compat as pc from pandas.io.common import get_filepath_or_buffer, get_handle @@ -14,6 +14,7 @@ def to_pickle( filepath_or_buffer: FilePathOrBuffer, compression: Optional[str] = "infer", protocol: int = pickle.HIGHEST_PROTOCOL, + storage_options: StorageOptions = None, ): """ Pickle (serialize) object to file. @@ -42,6 +43,16 @@ def to_pickle( protocol parameter is equivalent to setting its value to HIGHEST_PROTOCOL. + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + .. [1] https://docs.python.org/3/library/pickle.html See Also @@ -76,7 +87,10 @@ def to_pickle( >>> os.remove("./dummy.pkl") """ fp_or_buf, _, compression, should_close = get_filepath_or_buffer( - filepath_or_buffer, compression=compression, mode="wb" + filepath_or_buffer, + compression=compression, + mode="wb", + storage_options=storage_options, ) if not isinstance(fp_or_buf, str) and compression == "infer": compression = None @@ -97,7 +111,9 @@ def to_pickle( def read_pickle( - filepath_or_buffer: FilePathOrBuffer, compression: Optional[str] = "infer" + filepath_or_buffer: FilePathOrBuffer, + compression: Optional[str] = "infer", + storage_options: StorageOptions = None, ): """ Load pickled pandas object (or any object) from file. @@ -121,6 +137,16 @@ def read_pickle( compression) If 'infer' and 'path_or_url' is not path-like, then use None (= no decompression). + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + Returns ------- unpickled : same type as object stored in file @@ -162,7 +188,7 @@ def read_pickle( >>> os.remove("./dummy.pkl") """ fp_or_buf, _, compression, should_close = get_filepath_or_buffer( - filepath_or_buffer, compression=compression + filepath_or_buffer, compression=compression, storage_options=storage_options ) if not isinstance(fp_or_buf, str) and compression == "infer": compression = None diff --git a/pandas/io/sas/sas_xport.py b/pandas/io/sas/sas_xport.py index 7fc1bc6d3eb6c..6cf248b748107 100644 --- a/pandas/io/sas/sas_xport.py +++ b/pandas/io/sas/sas_xport.py @@ -244,7 +244,7 @@ class XportReader(ReaderBase, abc.Iterator): __doc__ = _xport_reader_doc def __init__( - self, filepath_or_buffer, index=None, encoding="ISO-8859-1", chunksize=None + self, filepath_or_buffer, index=None, encoding="ISO-8859-1", chunksize=None, ): self._encoding = encoding diff --git a/pandas/io/stata.py b/pandas/io/stata.py index cb23b781a7ad2..7a25617885839 100644 --- a/pandas/io/stata.py +++ b/pandas/io/stata.py @@ -11,7 +11,7 @@ """ from collections import abc import datetime -from io import BytesIO, IOBase +from io import BytesIO import os from pathlib import Path import struct @@ -35,7 +35,7 @@ from pandas._libs.lib import infer_dtype from pandas._libs.writers import max_len_string_array -from pandas._typing import FilePathOrBuffer, Label +from pandas._typing import FilePathOrBuffer, Label, StorageOptions from pandas.util._decorators import Appender from pandas.core.dtypes.common import ( @@ -1035,6 +1035,7 @@ def __init__( columns: Optional[Sequence[str]] = None, order_categoricals: bool = True, chunksize: Optional[int] = None, + storage_options: StorageOptions = None, ): super().__init__() self.col_sizes: List[int] = [] @@ -1068,13 +1069,16 @@ def __init__( self._native_byteorder = _set_endianness(sys.byteorder) path_or_buf = stringify_path(path_or_buf) if isinstance(path_or_buf, str): - path_or_buf, encoding, _, should_close = get_filepath_or_buffer(path_or_buf) + path_or_buf, encoding, _, should_close = get_filepath_or_buffer( + path_or_buf, storage_options=storage_options + ) if isinstance(path_or_buf, (str, bytes)): self.path_or_buf = open(path_or_buf, "rb") - elif isinstance(path_or_buf, IOBase): + elif hasattr(path_or_buf, "read"): # Copy to BytesIO, and ensure no encoding - contents = path_or_buf.read() + pb: Any = path_or_buf + contents = pb.read() self.path_or_buf = BytesIO(contents) self._read_header() @@ -1906,6 +1910,7 @@ def read_stata( order_categoricals: bool = True, chunksize: Optional[int] = None, iterator: bool = False, + storage_options: StorageOptions = None, ) -> Union[DataFrame, StataReader]: reader = StataReader( @@ -1918,6 +1923,7 @@ def read_stata( columns=columns, order_categoricals=order_categoricals, chunksize=chunksize, + storage_options=storage_options, ) if iterator or chunksize: @@ -1931,7 +1937,9 @@ def read_stata( def _open_file_binary_write( - fname: FilePathOrBuffer, compression: Union[str, Mapping[str, str], None], + fname: FilePathOrBuffer, + compression: Union[str, Mapping[str, str], None], + storage_options: StorageOptions = None, ) -> Tuple[BinaryIO, bool, Optional[Union[str, Mapping[str, str]]]]: """ Open a binary file or no-op if file-like. @@ -1943,6 +1951,16 @@ def _open_file_binary_write( compression : {str, dict, None} The compression method to use. + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + Returns ------- file : file-like object @@ -1961,7 +1979,10 @@ def _open_file_binary_write( compression_typ, compression_args = get_compression_method(compression) compression_typ = infer_compression(fname, compression_typ) path_or_buf, _, compression_typ, _ = get_filepath_or_buffer( - fname, compression=compression_typ + fname, + mode="wb", + compression=compression_typ, + storage_options=storage_options, ) if compression_typ is not None: compression = compression_args @@ -2158,6 +2179,16 @@ class StataWriter(StataParser): .. versionadded:: 1.1.0 + storage_options : dict, optional + Extra options that make sense for a particular storage connection, e.g. + host, port, username, password, etc., if using a URL that will + be parsed by ``fsspec``, e.g., starting "s3://", "gcs://". An error + will be raised if providing this argument with a local path or + a file-like buffer. See the fsspec and backend storage implementation + docs for the set of allowed keys and values + + .. versionadded:: 1.2.0 + Returns ------- writer : StataWriter instance @@ -2207,6 +2238,7 @@ def __init__( data_label: Optional[str] = None, variable_labels: Optional[Dict[Label, str]] = None, compression: Union[str, Mapping[str, str], None] = "infer", + storage_options: StorageOptions = None, ): super().__init__() self._convert_dates = {} if convert_dates is None else convert_dates @@ -2219,6 +2251,7 @@ def __init__( self._output_file: Optional[BinaryIO] = None # attach nobs, nvars, data, varlist, typlist self._prepare_pandas(data) + self.storage_options = storage_options if byteorder is None: byteorder = sys.byteorder @@ -2505,7 +2538,7 @@ def _encode_strings(self) -> None: def write_file(self) -> None: self._file, self._own_file, compression = _open_file_binary_write( - self._fname, self._compression + self._fname, self._compression, storage_options=self.storage_options ) if compression is not None: self._output_file = self._file @@ -3088,6 +3121,7 @@ def __init__( variable_labels: Optional[Dict[Label, str]] = None, convert_strl: Optional[Sequence[Label]] = None, compression: Union[str, Mapping[str, str], None] = "infer", + storage_options: StorageOptions = None, ): # Copy to new list since convert_strl might be modified later self._convert_strl: List[Label] = [] @@ -3104,6 +3138,7 @@ def __init__( data_label=data_label, variable_labels=variable_labels, compression=compression, + storage_options=storage_options, ) self._map: Dict[str, int] = {} self._strl_blob = b"" @@ -3491,6 +3526,7 @@ def __init__( convert_strl: Optional[Sequence[Label]] = None, version: Optional[int] = None, compression: Union[str, Mapping[str, str], None] = "infer", + storage_options: StorageOptions = None, ): if version is None: version = 118 if data.shape[1] <= 32767 else 119 @@ -3513,6 +3549,7 @@ def __init__( variable_labels=variable_labels, convert_strl=convert_strl, compression=compression, + storage_options=storage_options, ) # Override version set in StataWriter117 init self._dta_version = version diff --git a/pandas/tests/io/test_fsspec.py b/pandas/tests/io/test_fsspec.py index a0723452ccb70..3e89f6ca4ae16 100644 --- a/pandas/tests/io/test_fsspec.py +++ b/pandas/tests/io/test_fsspec.py @@ -1,7 +1,18 @@ +import io + import numpy as np import pytest -from pandas import DataFrame, date_range, read_csv, read_parquet +from pandas import ( + DataFrame, + date_range, + read_csv, + read_feather, + read_json, + read_parquet, + read_pickle, + read_stata, +) import pandas._testing as tm from pandas.util import _test_decorators as td @@ -63,6 +74,16 @@ def test_to_csv(cleared_fs): tm.assert_frame_equal(df1, df2) +def test_csv_options(fsspectest): + df = DataFrame({"a": [0]}) + df.to_csv( + "testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False + ) + assert fsspectest.test[0] == "csv_write" + read_csv("testmem://test/test.csv", storage_options={"test": "csv_read"}) + assert fsspectest.test[0] == "csv_read" + + @td.skip_if_no("fastparquet") def test_to_parquet_new_file(monkeypatch, cleared_fs): """Regression test for writing to a not-yet-existent GCS Parquet file.""" @@ -71,6 +92,44 @@ def test_to_parquet_new_file(monkeypatch, cleared_fs): ) +@td.skip_if_no("pyarrow") +def test_arrowparquet_options(fsspectest): + """Regression test for writing to a not-yet-existent GCS Parquet file.""" + df = DataFrame({"a": [0]}) + df.to_parquet( + "testmem://test/test.csv", + engine="pyarrow", + compression=None, + storage_options={"test": "parquet_write"}, + ) + assert fsspectest.test[0] == "parquet_write" + read_parquet( + "testmem://test/test.csv", + engine="pyarrow", + storage_options={"test": "parquet_read"}, + ) + assert fsspectest.test[0] == "parquet_read" + + +@td.skip_if_no("fastparquet") +def test_fastparquet_options(fsspectest): + """Regression test for writing to a not-yet-existent GCS Parquet file.""" + df = DataFrame({"a": [0]}) + df.to_parquet( + "testmem://test/test.csv", + engine="fastparquet", + compression=None, + storage_options={"test": "parquet_write"}, + ) + assert fsspectest.test[0] == "parquet_write" + read_parquet( + "testmem://test/test.csv", + engine="fastparquet", + storage_options={"test": "parquet_read"}, + ) + assert fsspectest.test[0] == "parquet_read" + + @td.skip_if_no("s3fs") def test_from_s3_csv(s3_resource, tips_file): tm.assert_equal(read_csv("s3://pandas-test/tips.csv"), read_csv(tips_file)) @@ -101,3 +160,67 @@ def test_not_present_exception(): with pytest.raises(ImportError) as e: read_csv("memory://test/test.csv") assert "fsspec library is required" in str(e.value) + + +@td.skip_if_no("pyarrow") +def test_feather_options(fsspectest): + df = DataFrame({"a": [0]}) + df.to_feather("testmem://afile", storage_options={"test": "feather_write"}) + assert fsspectest.test[0] == "feather_write" + out = read_feather("testmem://afile", storage_options={"test": "feather_read"}) + assert fsspectest.test[0] == "feather_read" + tm.assert_frame_equal(df, out) + + +def test_pickle_options(fsspectest): + df = DataFrame({"a": [0]}) + df.to_pickle("testmem://afile", storage_options={"test": "pickle_write"}) + assert fsspectest.test[0] == "pickle_write" + out = read_pickle("testmem://afile", storage_options={"test": "pickle_read"}) + assert fsspectest.test[0] == "pickle_read" + tm.assert_frame_equal(df, out) + + +def test_json_options(fsspectest): + df = DataFrame({"a": [0]}) + df.to_json("testmem://afile", storage_options={"test": "json_write"}) + assert fsspectest.test[0] == "json_write" + out = read_json("testmem://afile", storage_options={"test": "json_read"}) + assert fsspectest.test[0] == "json_read" + tm.assert_frame_equal(df, out) + + +def test_stata_options(fsspectest): + df = DataFrame({"a": [0]}) + df.to_stata( + "testmem://afile", storage_options={"test": "stata_write"}, write_index=False + ) + assert fsspectest.test[0] == "stata_write" + out = read_stata("testmem://afile", storage_options={"test": "stata_read"}) + assert fsspectest.test[0] == "stata_read" + tm.assert_frame_equal(df, out.astype("int64")) + + +@td.skip_if_no("tabulate") +def test_markdown_options(fsspectest): + df = DataFrame({"a": [0]}) + df.to_markdown("testmem://afile", storage_options={"test": "md_write"}) + assert fsspectest.test[0] == "md_write" + assert fsspectest.cat("afile") + + +@td.skip_if_no("pyarrow") +def test_non_fsspec_options(): + with pytest.raises(ValueError, match="storage_options"): + read_csv("localfile", storage_options={"a": True}) + with pytest.raises(ValueError, match="storage_options"): + # separate test for parquet, which has a different code path + read_parquet("localfile", storage_options={"a": True}) + by = io.BytesIO() + + with pytest.raises(ValueError, match="storage_options"): + read_csv(by, storage_options={"a": True}) + + df = DataFrame({"a": [0]}) + with pytest.raises(ValueError, match="storage_options"): + df.to_parquet("nonfsspecpath", storage_options={"a": True}) diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index 5e0f7edf4d8ae..a137e76b1696b 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -32,7 +32,7 @@ def test_read_without_creds_from_pub_bucket(): @tm.network @td.skip_if_no("s3fs") -def test_read_with_creds_from_pub_bucke(): +def test_read_with_creds_from_pub_bucket(): # Ensure we can read from a public bucket with credentials # GH 34626 # Use Amazon Open Data Registry - https://registry.opendata.aws/gdelt