diff --git a/ci/deps/azure-36-locale.yaml b/ci/deps/azure-36-locale.yaml index 56da56b45b702..a9b9a5a47ccf5 100644 --- a/ci/deps/azure-36-locale.yaml +++ b/ci/deps/azure-36-locale.yaml @@ -15,7 +15,6 @@ dependencies: # pandas dependencies - beautifulsoup4 - - gcsfs - html5lib - ipython - jinja2 @@ -31,7 +30,6 @@ dependencies: - pytables - python-dateutil - pytz - - s3fs - scipy - xarray - xlrd diff --git a/ci/deps/azure-37-locale.yaml b/ci/deps/azure-37-locale.yaml index 31155ac93931a..81e336cf1ed7f 100644 --- a/ci/deps/azure-37-locale.yaml +++ b/ci/deps/azure-37-locale.yaml @@ -27,7 +27,6 @@ dependencies: - pytables - python-dateutil - pytz - - s3fs - scipy - xarray - xlrd diff --git a/ci/deps/azure-windows-37.yaml b/ci/deps/azure-windows-37.yaml index 889d5c1bcfcdd..5bbd0e2795d7e 100644 --- a/ci/deps/azure-windows-37.yaml +++ b/ci/deps/azure-windows-37.yaml @@ -15,7 +15,8 @@ dependencies: # pandas dependencies - beautifulsoup4 - bottleneck - - gcsfs + - fsspec>=0.7.4 + - gcsfs>=0.6.0 - html5lib - jinja2 - lxml @@ -28,7 +29,7 @@ dependencies: - pytables - python-dateutil - pytz - - s3fs + - s3fs>=0.4.0 - scipy - sqlalchemy - xlrd diff --git a/ci/deps/travis-36-cov.yaml b/ci/deps/travis-36-cov.yaml index 2968c8f188d49..177e0d3f4c0af 100644 --- a/ci/deps/travis-36-cov.yaml +++ b/ci/deps/travis-36-cov.yaml @@ -18,7 +18,8 @@ dependencies: - cython>=0.29.16 - dask - fastparquet>=0.3.2 - - gcsfs + - fsspec>=0.7.4 + - gcsfs>=0.6.0 - geopandas - html5lib - matplotlib @@ -35,7 +36,7 @@ dependencies: - pytables - python-snappy - pytz - - s3fs + - s3fs>=0.4.0 - scikit-learn - scipy - sqlalchemy diff --git a/ci/deps/travis-36-locale.yaml b/ci/deps/travis-36-locale.yaml index 2c8403acf6971..03a1e751b6a86 100644 --- a/ci/deps/travis-36-locale.yaml +++ b/ci/deps/travis-36-locale.yaml @@ -16,7 +16,6 @@ dependencies: - blosc=1.14.3 - python-blosc - fastparquet=0.3.2 - - gcsfs=0.2.2 - html5lib - ipython - jinja2 @@ -33,7 +32,6 @@ dependencies: - pytables - python-dateutil - pytz - - s3fs=0.3.0 - scipy - sqlalchemy=1.1.4 - xarray=0.10 diff --git a/ci/deps/travis-36-slow.yaml b/ci/deps/travis-36-slow.yaml index df693f0e22c71..87bad59fa4873 100644 --- a/ci/deps/travis-36-slow.yaml +++ b/ci/deps/travis-36-slow.yaml @@ -13,6 +13,7 @@ dependencies: # pandas dependencies - beautifulsoup4 + - fsspec>=0.7.4 - html5lib - lxml - matplotlib @@ -25,7 +26,7 @@ dependencies: - pytables - python-dateutil - pytz - - s3fs + - s3fs>=0.4.0 - scipy - sqlalchemy - xlrd diff --git a/ci/deps/travis-37.yaml b/ci/deps/travis-37.yaml index 986728d0a4a40..e896233aac63c 100644 --- a/ci/deps/travis-37.yaml +++ b/ci/deps/travis-37.yaml @@ -13,12 +13,13 @@ dependencies: # pandas dependencies - botocore>=1.11 + - fsspec>=0.7.4 - numpy - python-dateutil - nomkl - pyarrow - pytz - - s3fs + - s3fs>=0.4.0 - tabulate - pyreadstat - pip diff --git a/doc/source/getting_started/install.rst b/doc/source/getting_started/install.rst index da1161c8f68b4..b79a9cd872c47 100644 --- a/doc/source/getting_started/install.rst +++ b/doc/source/getting_started/install.rst @@ -267,8 +267,9 @@ SQLAlchemy 1.1.4 SQL support for databases other tha SciPy 0.19.0 Miscellaneous statistical functions XLsxWriter 0.9.8 Excel writing blosc Compression for HDF5 +fsspec 0.7.4 Handling files aside from local and HTTP fastparquet 0.3.2 Parquet reading / writing -gcsfs 0.2.2 Google Cloud Storage access +gcsfs 0.6.0 Google Cloud Storage access html5lib HTML parser for read_html (see :ref:`note `) lxml 3.8.0 HTML parser for read_html (see :ref:`note `) matplotlib 2.2.2 Visualization @@ -282,7 +283,7 @@ pyreadstat SPSS files (.sav) reading pytables 3.4.3 HDF5 reading / writing pyxlsb 1.0.6 Reading for xlsb files qtpy Clipboard I/O -s3fs 0.3.0 Amazon S3 access +s3fs 0.4.0 Amazon S3 access tabulate 0.8.3 Printing in Markdown-friendly format (see `tabulate`_) xarray 0.8.2 pandas-like API for N-dimensional data xclip Clipboard I/O on linux diff --git a/doc/source/whatsnew/v1.1.0.rst b/doc/source/whatsnew/v1.1.0.rst index 10522ff797c59..ce82fa61a67f3 100644 --- a/doc/source/whatsnew/v1.1.0.rst +++ b/doc/source/whatsnew/v1.1.0.rst @@ -245,6 +245,22 @@ If needed you can adjust the bins with the argument ``offset`` (a Timedelta) tha For a full example, see: :ref:`timeseries.adjust-the-start-of-the-bins`. +fsspec now used for filesystem handling +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +For reading and writing to filesystems other than local and reading from HTTP(S), +the optional dependency ``fsspec`` will be used to dispatch operations (:issue:`33452`). +This will give unchanged +functionality for S3 and GCS storage, which were already supported, but also add +support for several other storage implementations such as `Azure Datalake and Blob`_, +SSH, FTP, dropbox and github. For docs and capabilities, see the `fsspec docs`_. + +The existing capability to interface with S3 and GCS will be unaffected by this +change, as ``fsspec`` will still bring in the same packages as before. + +.. _Azure Datalake and Blob: https://github.com/dask/adlfs + +.. _fsspec docs: https://filesystem-spec.readthedocs.io/en/latest/ .. _whatsnew_110.enhancements.other: @@ -696,7 +712,9 @@ Optional libraries below the lowest tested version may still work, but are not c +-----------------+-----------------+---------+ | fastparquet | 0.3.2 | | +-----------------+-----------------+---------+ -| gcsfs | 0.2.2 | | +| fsspec | 0.7.4 | | ++-----------------+-----------------+---------+ +| gcsfs | 0.6.0 | X | +-----------------+-----------------+---------+ | lxml | 3.8.0 | | +-----------------+-----------------+---------+ @@ -712,7 +730,7 @@ Optional libraries below the lowest tested version may still work, but are not c +-----------------+-----------------+---------+ | pytables | 3.4.3 | X | +-----------------+-----------------+---------+ -| s3fs | 0.3.0 | | +| s3fs | 0.4.0 | X | +-----------------+-----------------+---------+ | scipy | 1.2.0 | X | +-----------------+-----------------+---------+ diff --git a/environment.yml b/environment.yml index b81404094fa4c..3783b7d360f1a 100644 --- a/environment.yml +++ b/environment.yml @@ -98,7 +98,9 @@ dependencies: - pyqt>=5.9.2 # pandas.read_clipboard - pytables>=3.4.3 # pandas.read_hdf, DataFrame.to_hdf - - s3fs # pandas.read_csv... when using 's3://...' path + - s3fs>=0.4.0 # file IO when using 's3://...' path + - fsspec>=0.7.4 # for generic remote file operations + - gcsfs>=0.6.0 # file IO when using 'gcs://...' path - sqlalchemy # pandas.read_sql, DataFrame.to_sql - xarray # DataFrame.to_xarray - cftime # Needed for downstream xarray.CFTimeIndex test diff --git a/pandas/compat/_optional.py b/pandas/compat/_optional.py index 0a5e0f5050040..6423064732def 100644 --- a/pandas/compat/_optional.py +++ b/pandas/compat/_optional.py @@ -8,8 +8,9 @@ VERSIONS = { "bs4": "4.6.0", "bottleneck": "1.2.1", + "fsspec": "0.7.4", "fastparquet": "0.3.2", - "gcsfs": "0.2.2", + "gcsfs": "0.6.0", "lxml.etree": "3.8.0", "matplotlib": "2.2.2", "numexpr": "2.6.2", @@ -20,7 +21,7 @@ "pytables": "3.4.3", "pytest": "5.0.1", "pyxlsb": "1.0.6", - "s3fs": "0.3.0", + "s3fs": "0.4.0", "scipy": "1.2.0", "sqlalchemy": "1.1.4", "tables": "3.4.3", diff --git a/pandas/io/common.py b/pandas/io/common.py index 055f84970e916..51323c5ff3ef5 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -31,6 +31,7 @@ from pandas._typing import FilePathOrBuffer from pandas.compat import _get_lzma_file, _import_lzma +from pandas.compat._optional import import_optional_dependency from pandas.core.dtypes.common import is_file_like @@ -126,20 +127,6 @@ def stringify_path( return _expand_user(filepath_or_buffer) -def is_s3_url(url) -> bool: - """Check for an s3, s3n, or s3a url""" - if not isinstance(url, str): - return False - return parse_url(url).scheme in ["s3", "s3n", "s3a"] - - -def is_gcs_url(url) -> bool: - """Check for a gcs url""" - if not isinstance(url, str): - return False - return parse_url(url).scheme in ["gcs", "gs"] - - def urlopen(*args, **kwargs): """ Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of @@ -150,31 +137,16 @@ def urlopen(*args, **kwargs): return urllib.request.urlopen(*args, **kwargs) -def get_fs_for_path(filepath: str): +def is_fsspec_url(url: FilePathOrBuffer) -> bool: """ - Get appropriate filesystem given a filepath. - Supports s3fs, gcs and local file system. - - Parameters - ---------- - filepath : str - File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj - - Returns - ------- - s3fs.S3FileSystem, gcsfs.GCSFileSystem, None - Appropriate FileSystem to use. None for local filesystem. + Returns true if the given URL looks like + something fsspec can handle """ - if is_s3_url(filepath): - from pandas.io import s3 - - return s3.get_fs() - elif is_gcs_url(filepath): - from pandas.io import gcs - - return gcs.get_fs() - else: - return None + return ( + isinstance(url, str) + and "://" in url + and not url.startswith(("http://", "https://")) + ) def get_filepath_or_buffer( @@ -182,6 +154,7 @@ def get_filepath_or_buffer( encoding: Optional[str] = None, compression: Optional[str] = None, mode: Optional[str] = None, + storage_options: Optional[Dict[str, Any]] = None, ): """ If the filepath_or_buffer is a url, translate and return the buffer. @@ -194,6 +167,8 @@ def get_filepath_or_buffer( compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional encoding : the encoding to use to decode bytes, default is 'utf-8' mode : str, optional + storage_options: dict, optional + passed on to fsspec, if using it; this is not yet accessed by the public API Returns ------- @@ -204,6 +179,7 @@ def get_filepath_or_buffer( filepath_or_buffer = stringify_path(filepath_or_buffer) if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): + # TODO: fsspec can also handle HTTP via requests, but leaving this unchanged req = urlopen(filepath_or_buffer) content_encoding = req.headers.get("Content-Encoding", None) if content_encoding == "gzip": @@ -213,19 +189,23 @@ def get_filepath_or_buffer( req.close() return reader, encoding, compression, True - if is_s3_url(filepath_or_buffer): - from pandas.io import s3 - - return s3.get_filepath_or_buffer( - filepath_or_buffer, encoding=encoding, compression=compression, mode=mode - ) - - if is_gcs_url(filepath_or_buffer): - from pandas.io import gcs - - return gcs.get_filepath_or_buffer( - filepath_or_buffer, encoding=encoding, compression=compression, mode=mode - ) + if is_fsspec_url(filepath_or_buffer): + assert isinstance( + filepath_or_buffer, str + ) # just to appease mypy for this branch + # two special-case s3-like protocols; these have special meaning in Hadoop, + # but are equivalent to just "s3" from fsspec's point of view + # cc #11071 + if filepath_or_buffer.startswith("s3a://"): + filepath_or_buffer = filepath_or_buffer.replace("s3a://", "s3://") + if filepath_or_buffer.startswith("s3n://"): + filepath_or_buffer = filepath_or_buffer.replace("s3n://", "s3://") + fsspec = import_optional_dependency("fsspec") + + file_obj = fsspec.open( + filepath_or_buffer, mode=mode or "rb", **(storage_options or {}) + ).open() + return file_obj, encoding, compression, True if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): return _expand_user(filepath_or_buffer), None, compression, False diff --git a/pandas/io/gcs.py b/pandas/io/gcs.py deleted file mode 100644 index d2d8fc2d2139f..0000000000000 --- a/pandas/io/gcs.py +++ /dev/null @@ -1,22 +0,0 @@ -""" GCS support for remote file interactivity """ -from pandas.compat._optional import import_optional_dependency - -gcsfs = import_optional_dependency( - "gcsfs", extra="The gcsfs library is required to handle GCS files" -) - - -def get_fs(): - return gcsfs.GCSFileSystem() - - -def get_filepath_or_buffer( - filepath_or_buffer, encoding=None, compression=None, mode=None -): - - if mode is None: - mode = "rb" - - fs = get_fs() - filepath_or_buffer = fs.open(filepath_or_buffer, mode) - return filepath_or_buffer, None, compression, True diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index de9a14c82b3cb..a0c9242684f0f 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -8,12 +8,7 @@ from pandas import DataFrame, get_option -from pandas.io.common import ( - get_filepath_or_buffer, - get_fs_for_path, - is_gcs_url, - is_s3_url, -) +from pandas.io.common import _expand_user, get_filepath_or_buffer, is_fsspec_url def get_engine(engine: str) -> "BaseImpl": @@ -97,16 +92,24 @@ def write( **kwargs, ): self.validate_dataframe(df) - file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} if index is not None: from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - # write_to_dataset does not support a file-like object when - # a directory path is used, so just pass the path string. + + if is_fsspec_url(path) and "filesystem" not in kwargs: + # make fsspec instance, which pyarrow will use to open paths + import_optional_dependency("fsspec") + import fsspec.core + + fs, path = fsspec.core.url_to_fs(path) + kwargs["filesystem"] = fs + else: + path = _expand_user(path) if partition_cols is not None: + # writes to multiple files under the given path self.api.parquet.write_to_dataset( table, path, @@ -115,17 +118,21 @@ def write( **kwargs, ) else: - self.api.parquet.write_table( - table, file_obj_or_path, compression=compression, **kwargs - ) - if should_close: - file_obj_or_path.close() + # write to single output file + self.api.parquet.write_table(table, path, compression=compression, **kwargs) def read(self, path, columns=None, **kwargs): - fs = get_fs_for_path(path) - should_close = None - # Avoid calling get_filepath_or_buffer for s3/gcs URLs since - # since it returns an S3File which doesn't support dir reads in arrow + if is_fsspec_url(path) and "filesystem" not in kwargs: + import_optional_dependency("fsspec") + import fsspec.core + + fs, path = fsspec.core.url_to_fs(path) + should_close = False + else: + fs = kwargs.pop("filesystem", None) + should_close = False + path = _expand_user(path) + if not fs: path, _, _, should_close = get_filepath_or_buffer(path) @@ -173,13 +180,11 @@ def write( if partition_cols is not None: kwargs["file_scheme"] = "hive" - if is_s3_url(path) or is_gcs_url(path): - # if path is s3:// or gs:// we need to open the file in 'wb' mode. - # TODO: Support 'ab' + if is_fsspec_url(path): + fsspec = import_optional_dependency("fsspec") - path, _, _, _ = get_filepath_or_buffer(path, mode="wb") - # And pass the opened file to the fastparquet internal impl. - kwargs["open_with"] = lambda path, _: path + # if filesystem is provided by fsspec, file must be opened in 'wb' mode. + kwargs["open_with"] = lambda path, _: fsspec.open(path, "wb").open() else: path, _, _, _ = get_filepath_or_buffer(path) @@ -194,17 +199,11 @@ def write( ) def read(self, path, columns=None, **kwargs): - if is_s3_url(path): - from pandas.io.s3 import get_file_and_filesystem + if is_fsspec_url(path): + fsspec = import_optional_dependency("fsspec") - # When path is s3:// an S3File is returned. - # We need to retain the original path(str) while also - # pass the S3File().open function to fastparquet impl. - s3, filesystem = get_file_and_filesystem(path) - try: - parquet_file = self.api.ParquetFile(path, open_with=filesystem.open) - finally: - s3.close() + open_with = lambda path, _: fsspec.open(path, "rb").open() + parquet_file = self.api.ParquetFile(path, open_with=open_with) else: path, _, _, _ = get_filepath_or_buffer(path) parquet_file = self.api.ParquetFile(path) diff --git a/pandas/io/s3.py b/pandas/io/s3.py deleted file mode 100644 index 329c861d2386a..0000000000000 --- a/pandas/io/s3.py +++ /dev/null @@ -1,53 +0,0 @@ -""" s3 support for remote file interactivity """ -from typing import IO, Any, Optional, Tuple -from urllib.parse import urlparse as parse_url - -from pandas._typing import FilePathOrBuffer -from pandas.compat._optional import import_optional_dependency - -s3fs = import_optional_dependency( - "s3fs", extra="The s3fs package is required to handle s3 files." -) - - -def _strip_schema(url): - """Returns the url without the s3:// part""" - result = parse_url(url, allow_fragments=False) - return result.netloc + result.path - - -def get_fs(): - return s3fs.S3FileSystem(anon=False) - - -def get_file_and_filesystem( - filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None -) -> Tuple[IO, Any]: - from botocore.exceptions import NoCredentialsError - - if mode is None: - mode = "rb" - - fs = get_fs() - try: - file = fs.open(_strip_schema(filepath_or_buffer), mode) - except (FileNotFoundError, NoCredentialsError): - # boto3 has troubles when trying to access a public file - # when credentialed... - # An OSError is raised if you have credentials, but they - # aren't valid for that bucket. - # A NoCredentialsError is raised if you don't have creds - # for that bucket. - fs = get_fs() - file = fs.open(_strip_schema(filepath_or_buffer), mode) - return file, fs - - -def get_filepath_or_buffer( - filepath_or_buffer: FilePathOrBuffer, - encoding: Optional[str] = None, - compression: Optional[str] = None, - mode: Optional[str] = None, -) -> Tuple[IO, Optional[str], Optional[str], bool]: - file, _fs = get_file_and_filesystem(filepath_or_buffer, mode=mode) - return file, None, compression, True diff --git a/pandas/tests/io/json/test_pandas.py b/pandas/tests/io/json/test_pandas.py index 137e4c991d080..fa92b7cfb8026 100644 --- a/pandas/tests/io/json/test_pandas.py +++ b/pandas/tests/io/json/test_pandas.py @@ -1665,13 +1665,21 @@ def test_json_multiindex(self, dataframe, expected): assert result == expected def test_to_s3(self, s3_resource): + import time + # GH 28375 mock_bucket_name, target_file = "pandas-test", "test.json" df = DataFrame({"x": [1, 2, 3], "y": [2, 4, 6]}) df.to_json(f"s3://{mock_bucket_name}/{target_file}") - assert target_file in ( - obj.key for obj in s3_resource.Bucket("pandas-test").objects.all() - ) + timeout = 5 + while True: + if target_file in ( + obj.key for obj in s3_resource.Bucket("pandas-test").objects.all() + ): + break + time.sleep(0.1) + timeout -= 0.1 + assert timeout > 0, "Timed out waiting for file to appear on moto" def test_json_pandas_na(self): # GH 31615 diff --git a/pandas/tests/io/test_common.py b/pandas/tests/io/test_common.py index 6f1d4daeb39cb..e2f4ae04c1f9f 100644 --- a/pandas/tests/io/test_common.py +++ b/pandas/tests/io/test_common.py @@ -367,3 +367,13 @@ def test_unknown_engine(self): df.to_csv(path) with pytest.raises(ValueError, match="Unknown engine"): pd.read_csv(path, engine="pyt") + + +def test_is_fsspec_url(): + assert icom.is_fsspec_url("gcs://pandas/somethingelse.com") + assert icom.is_fsspec_url("gs://pandas/somethingelse.com") + # the following is the only remote URL that is handled without fsspec + assert not icom.is_fsspec_url("http://pandas/somethingelse.com") + assert not icom.is_fsspec_url("random:pandas/somethingelse.com") + assert not icom.is_fsspec_url("/local/path") + assert not icom.is_fsspec_url("relative/local/path") diff --git a/pandas/tests/io/test_fsspec.py b/pandas/tests/io/test_fsspec.py new file mode 100644 index 0000000000000..c397a61616c1c --- /dev/null +++ b/pandas/tests/io/test_fsspec.py @@ -0,0 +1,102 @@ +import numpy as np +import pytest + +from pandas import DataFrame, date_range, read_csv, read_parquet +import pandas._testing as tm +from pandas.util import _test_decorators as td + +df1 = DataFrame( + { + "int": [1, 3], + "float": [2.0, np.nan], + "str": ["t", "s"], + "dt": date_range("2018-06-18", periods=2), + } +) +# the ignore on the following line accounts for to_csv returning Optional(str) +# in general, but always str in the case we give no filename +text = df1.to_csv(index=False).encode() # type: ignore + + +@pytest.fixture +def cleared_fs(): + fsspec = pytest.importorskip("fsspec") + + memfs = fsspec.filesystem("memory") + yield memfs + memfs.store.clear() + + +def test_read_csv(cleared_fs): + from fsspec.implementations.memory import MemoryFile + + cleared_fs.store["test/test.csv"] = MemoryFile(data=text) + df2 = read_csv("memory://test/test.csv", parse_dates=["dt"]) + + tm.assert_frame_equal(df1, df2) + + +def test_reasonable_error(monkeypatch, cleared_fs): + from fsspec.registry import known_implementations + from fsspec import registry + + registry.target.clear() + with pytest.raises(ValueError) as e: + read_csv("nosuchprotocol://test/test.csv") + assert "nosuchprotocol" in str(e.value) + err_mgs = "test error messgae" + monkeypatch.setitem( + known_implementations, + "couldexist", + {"class": "unimportable.CouldExist", "err": err_mgs}, + ) + with pytest.raises(ImportError) as e: + read_csv("couldexist://test/test.csv") + assert err_mgs in str(e.value) + + +def test_to_csv(cleared_fs): + df1.to_csv("memory://test/test.csv", index=True) + df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0) + + tm.assert_frame_equal(df1, df2) + + +@td.skip_if_no("fastparquet") +def test_to_parquet_new_file(monkeypatch, cleared_fs): + """Regression test for writing to a not-yet-existent GCS Parquet file.""" + df1.to_parquet( + "memory://test/test.csv", index=True, engine="fastparquet", compression=None + ) + + +@td.skip_if_no("s3fs") +def test_from_s3_csv(s3_resource, tips_file): + tm.assert_equal(read_csv("s3://pandas-test/tips.csv"), read_csv(tips_file)) + # the following are decompressed by pandas, not fsspec + tm.assert_equal(read_csv("s3://pandas-test/tips.csv.gz"), read_csv(tips_file)) + tm.assert_equal(read_csv("s3://pandas-test/tips.csv.bz2"), read_csv(tips_file)) + + +@pytest.mark.parametrize("protocol", ["s3", "s3a", "s3n"]) +@td.skip_if_no("s3fs") +def test_s3_protocols(s3_resource, tips_file, protocol): + tm.assert_equal( + read_csv("%s://pandas-test/tips.csv" % protocol), read_csv(tips_file) + ) + + +@td.skip_if_no("s3fs") +@td.skip_if_no("fastparquet") +def test_s3_parquet(s3_resource): + fn = "s3://pandas-test/test.parquet" + df1.to_parquet(fn, index=False, engine="fastparquet", compression=None) + df2 = read_parquet(fn, engine="fastparquet") + tm.assert_equal(df1, df2) + + +@td.skip_if_installed("fsspec") +def test_not_present_exception(): + with pytest.raises(ImportError) as e: + read_csv("memory://test/test.csv") + assert "fsspec library is required" in str(e.value) diff --git a/pandas/tests/io/test_gcs.py b/pandas/tests/io/test_gcs.py index cf745fcc492a1..4d93119ffa3f5 100644 --- a/pandas/tests/io/test_gcs.py +++ b/pandas/tests/io/test_gcs.py @@ -1,4 +1,4 @@ -from io import StringIO +from io import BytesIO import os import numpy as np @@ -8,17 +8,14 @@ import pandas._testing as tm from pandas.util import _test_decorators as td -from pandas.io.common import is_gcs_url - - -def test_is_gcs_url(): - assert is_gcs_url("gcs://pandas/somethingelse.com") - assert is_gcs_url("gs://pandas/somethingelse.com") - assert not is_gcs_url("s3://pandas/somethingelse.com") - @td.skip_if_no("gcsfs") def test_read_csv_gcs(monkeypatch): + from fsspec import AbstractFileSystem + from fsspec import registry + + registry.target.clear() # noqa # remove state + df1 = DataFrame( { "int": [1, 3], @@ -28,9 +25,9 @@ def test_read_csv_gcs(monkeypatch): } ) - class MockGCSFileSystem: - def open(*args): - return StringIO(df1.to_csv(index=False)) + class MockGCSFileSystem(AbstractFileSystem): + def open(*args, **kwargs): + return BytesIO(df1.to_csv(index=False).encode()) monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem) df2 = read_csv("gs://test/test.csv", parse_dates=["dt"]) @@ -40,6 +37,10 @@ def open(*args): @td.skip_if_no("gcsfs") def test_to_csv_gcs(monkeypatch): + from fsspec import AbstractFileSystem + from fsspec import registry + + registry.target.clear() # noqa # remove state df1 = DataFrame( { "int": [1, 3], @@ -48,20 +49,22 @@ def test_to_csv_gcs(monkeypatch): "dt": date_range("2018-06-18", periods=2), } ) - s = StringIO() + s = BytesIO() + s.close = lambda: True - class MockGCSFileSystem: - def open(*args): + class MockGCSFileSystem(AbstractFileSystem): + def open(*args, **kwargs): + s.seek(0) return s monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem) df1.to_csv("gs://test/test.csv", index=True) def mock_get_filepath_or_buffer(*args, **kwargs): - return StringIO(df1.to_csv()), None, None, False + return BytesIO(df1.to_csv(index=True).encode()), None, None, False monkeypatch.setattr( - "pandas.io.gcs.get_filepath_or_buffer", mock_get_filepath_or_buffer + "pandas.io.common.get_filepath_or_buffer", mock_get_filepath_or_buffer ) df2 = read_csv("gs://test/test.csv", parse_dates=["dt"], index_col=0) @@ -73,6 +76,10 @@ def mock_get_filepath_or_buffer(*args, **kwargs): @td.skip_if_no("gcsfs") def test_to_parquet_gcs_new_file(monkeypatch, tmpdir): """Regression test for writing to a not-yet-existent GCS Parquet file.""" + from fsspec import AbstractFileSystem + from fsspec import registry + + registry.target.clear() # noqa # remove state df1 = DataFrame( { "int": [1, 3], @@ -82,7 +89,7 @@ def test_to_parquet_gcs_new_file(monkeypatch, tmpdir): } ) - class MockGCSFileSystem: + class MockGCSFileSystem(AbstractFileSystem): def open(self, path, mode="r", *args): if "w" not in mode: raise FileNotFoundError diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index efd34c58d7d19..82157f3d722a9 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -537,6 +537,18 @@ def test_categorical(self, pa): expected = df.astype(object) check_round_trip(df, pa, expected=expected) + def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa): + s3fs = pytest.importorskip("s3fs") + s3 = s3fs.S3FileSystem() + kw = dict(filesystem=s3) + check_round_trip( + df_compat, + pa, + path="pandas-test/pyarrow.parquet", + read_kwargs=kw, + write_kwargs=kw, + ) + def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") @@ -544,8 +556,6 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): @td.skip_if_no("s3fs") @pytest.mark.parametrize("partition_col", [["A"], []]) def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): - from pandas.io.s3 import get_fs as get_s3_fs - # GH #26388 # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 # As per pyarrow partitioned columns become 'categorical' dtypes @@ -559,11 +569,7 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): pa, expected=expected_df, path="s3://pandas-test/parquet_dir", - write_kwargs={ - "partition_cols": partition_col, - "compression": None, - "filesystem": get_s3_fs(), - }, + write_kwargs={"partition_cols": partition_col, "compression": None}, check_like=True, repeat=1, ) @@ -585,6 +591,15 @@ def test_read_file_like_obj_support(self, df_compat): df_from_buf = pd.read_parquet(buffer) tm.assert_frame_equal(df_compat, df_from_buf) + @td.skip_if_no("pyarrow") + def test_expand_user(self, df_compat, monkeypatch): + monkeypatch.setenv("HOME", "TestingUser") + monkeypatch.setenv("USERPROFILE", "TestingUser") + with pytest.raises(OSError, match=r".*TestingUser.*"): + pd.read_parquet("~/file.parquet") + with pytest.raises(OSError, match=r".*TestingUser.*"): + df_compat.to_parquet("~/file.parquet") + def test_partition_cols_supported(self, pa, df_full): # GH #23283 partition_cols = ["bool", "int"] diff --git a/pandas/tests/io/test_pickle.py b/pandas/tests/io/test_pickle.py index 42b4ea5ad9aac..e4d43db7834e3 100644 --- a/pandas/tests/io/test_pickle.py +++ b/pandas/tests/io/test_pickle.py @@ -455,42 +455,10 @@ def mock_urlopen_read(*args, **kwargs): tm.assert_frame_equal(df, result) -@td.skip_if_no("gcsfs") -@pytest.mark.parametrize("mockurl", ["gs://gcs.com", "gcs://gcs.com"]) -def test_pickle_gcsurl_roundtrip(monkeypatch, mockurl): - with tm.ensure_clean() as path: - - class MockGCSFileSystem: - def __init__(self, *args, **kwargs): - pass - - def open(self, *args): - mode = args[1] or None - f = open(path, mode) - return f - - monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem) - df = tm.makeDataFrame() - df.to_pickle(mockurl) - result = pd.read_pickle(mockurl) - tm.assert_frame_equal(df, result) - - -@td.skip_if_no("s3fs") -@pytest.mark.parametrize("mockurl", ["s3://s3.com", "s3n://s3.com", "s3a://s3.com"]) -def test_pickle_s3url_roundtrip(monkeypatch, mockurl): - with tm.ensure_clean() as path: - - class MockS3FileSystem: - def __init__(self, *args, **kwargs): - pass - - def open(self, *args): - mode = args[1] or None - f = open(path, mode) - return f - - monkeypatch.setattr("s3fs.S3FileSystem", MockS3FileSystem) +@td.skip_if_no("fsspec") +def test_pickle_fsspec_roundtrip(): + with tm.ensure_clean(): + mockurl = "memory://afile" df = tm.makeDataFrame() df.to_pickle(mockurl) result = pd.read_pickle(mockurl) diff --git a/pandas/tests/io/test_s3.py b/pandas/tests/io/test_s3.py index 04c6979596eca..a76be9465f62a 100644 --- a/pandas/tests/io/test_s3.py +++ b/pandas/tests/io/test_s3.py @@ -4,14 +4,6 @@ from pandas import read_csv -from pandas.io.common import is_s3_url - - -class TestS3URL: - def test_is_s3_url(self): - assert is_s3_url("s3://pandas/somethingelse.com") - assert not is_s3_url("s4://pandas/somethingelse.com") - def test_streaming_s3_objects(): # GH17135 diff --git a/requirements-dev.txt b/requirements-dev.txt index 754ec7ae28748..90f9fec2f4bdf 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -65,7 +65,9 @@ pyarrow>=0.13.1 python-snappy pyqt5>=5.9.2 tables>=3.4.3 -s3fs +s3fs>=0.4.0 +fsspec>=0.7.4 +gcsfs>=0.6.0 sqlalchemy xarray cftime