Skip to content

ENH: add fsspec support #34266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
94e717f
Add remote file io using fsspec.
Apr 14, 2020
fd7e072
Attempt refactor and clean
May 19, 2020
302ba13
Merge branch 'master' into feature/add-fsspec-support
May 20, 2020
9e6d3b2
readd and adapt s3/gcs tests
May 21, 2020
4564c8d
remove gc from test
May 21, 2020
0654537
Simpler is_fsspec
May 21, 2020
8d45cbb
add test
May 21, 2020
006e736
Answered most points
May 28, 2020
724ebd8
Implemented suggestions
May 28, 2020
9da1689
lint
May 28, 2020
a595411
Add versions info
May 29, 2020
6dd1e92
Update some deps
May 29, 2020
6e13df7
issue link syntax
May 29, 2020
3262063
More specific test versions
Jun 2, 2020
4bc2411
Account for alternate S3 protocols, and ignore type error
Jun 2, 2020
68644ab
Add comment to mypy ignore insrtuction
Jun 2, 2020
32bc586
more mypy
Jun 2, 2020
037ef2c
more black
Jun 2, 2020
c3c3075
Make storage_options a dict rather than swallowing kwargs
Jun 3, 2020
85d6452
More requested changes
Jun 5, 2020
263dd3b
Remove fsspec from locale tests
Jun 10, 2020
d0afbc3
tweak
Jun 10, 2020
6a587a5
Merge branch 'master' into feature/add-fsspec-support
Jun 10, 2020
b2992c1
Merge branch 'master' into feature/add-fsspec-support
Jun 11, 2020
9c03745
requested changes
Jun 11, 2020
7982e7b
add gcsfs to environment.yml
Jun 12, 2020
946297b
rerun deps script
Jun 12, 2020
145306e
Merge branch 'master' into feature/add-fsspec-support
Jun 12, 2020
06e5a3a
account for passed filesystem again
Jun 12, 2020
8f3854c
specify should_close
Jun 12, 2020
50c08c8
lint
Jun 12, 2020
9b20dc6
Except http passed to fsspec in parquet
Jun 12, 2020
eb90fe8
lint
Jun 12, 2020
b3e2cd2
Merge branch 'master' into feature/add-fsspec-support
Jun 16, 2020
4977a00
redo whatsnew
Jun 16, 2020
29a9785
simplify parquet write
Jun 18, 2020
565031b
Retry S3 file probe with timeout, in test_to_s3
Jun 18, 2020
606ce11
expand user in non-fsspec paths for parquet; add test for this
Jun 19, 2020
60b80a6
reorder imports!
Jun 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ dependencies:

- pyqt>=5.9.2 # pandas.read_clipboard
- pytables>=3.4.2 # pandas.read_hdf, DataFrame.to_hdf
- s3fs # pandas.read_csv... when using 's3://...' path
- s3fs # pandas.read_csv... when using 's3://...' path (also brings in fsspec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think explicitly list fsspec here, if we're import it.

What should we set the minimum supported version to? We'll add this to a few places in the library (compat._optional, docs, ...).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can update this, and the minimum versions?

- sqlalchemy # pandas.read_sql, DataFrame.to_sql
- xarray # DataFrame.to_xarray
- cftime # Needed for downstream xarray.CFTimeIndex test
Expand Down
49 changes: 23 additions & 26 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,6 @@ def stringify_path(
return _expand_user(filepath_or_buffer)


def is_s3_url(url) -> bool:
"""Check for an s3, s3n, or s3a url"""
if not isinstance(url, str):
return False
return parse_url(url).scheme in ["s3", "s3n", "s3a"]


def is_gcs_url(url) -> bool:
"""Check for a gcs url"""
if not isinstance(url, str):
return False
return parse_url(url).scheme in ["gcs", "gs"]


def urlopen(*args, **kwargs):
"""
Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of
Expand All @@ -158,11 +144,25 @@ def urlopen(*args, **kwargs):
return urllib.request.urlopen(*args, **kwargs)


def is_fsspec_url(url: FilePathOrBuffer) -> bool:
"""
Returns true if fsspec is installed and the given URL looks like
something fsspec can handle
"""
try:
import fsspec # noqa: F401

return isinstance(url, str) and ("::" in url or "://" in url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also check if there's an fsspec-compatible implementation for that protocol? What's the behavior / error message if you have fsspec installed but not s3fs and do read_csv("s3://...")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsspec gives specific error messages for known protocols that are not available due to missing dependency (s3fs would be such a one), and a different message is the protocol is completely unknown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that error surface to the user? Or does pandas swallow it somewhere along the way?

except ImportError:
return False


def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
compression: Optional[str] = None,
mode: Optional[str] = None,
**storage_options,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a type? Dict[str, Any] I think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Dict[str, Any] work?

):
"""
If the filepath_or_buffer is a url, translate and return the buffer.
Expand All @@ -175,6 +175,7 @@ def get_filepath_or_buffer(
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional
encoding : the encoding to use to decode bytes, default is 'utf-8'
mode : str, optional
storage_options: passed on to fsspec, if using it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage_options: passed on to fsspec, if using it
**storage_options : dict, optional
passed on to fsspec.open, if using it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a versionadded tag (1.1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, this isn't in the public API.

At some point we'll add this keyword to all of our IO routines, which would benefit from the versionadded. But that can be a separate PR.


Returns
-------
Expand All @@ -185,6 +186,7 @@ def get_filepath_or_buffer(
filepath_or_buffer = stringify_path(filepath_or_buffer)

if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
# TODO: fsspec can also handle HTTP via requests, but leaving this unchanged
req = urlopen(filepath_or_buffer)
content_encoding = req.headers.get("Content-Encoding", None)
if content_encoding == "gzip":
Expand All @@ -194,19 +196,14 @@ def get_filepath_or_buffer(
req.close()
return reader, encoding, compression, True

if is_s3_url(filepath_or_buffer):
from pandas.io import s3

return s3.get_filepath_or_buffer(
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
)
if is_fsspec_url(filepath_or_buffer):
import fsspec

if is_gcs_url(filepath_or_buffer):
from pandas.io import gcs

return gcs.get_filepath_or_buffer(
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
)
file_obj = fsspec.open(
filepath_or_buffer, mode=mode or "rb", **storage_options
).open()
# TODO: both fsspec and pandas handle compression and encoding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would resolve the TODO here? To not handle compression or encoding in pandas? Can you update the comment to indicate that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that pandas must still handle compression and encoding for local and http, that code will not be deprecated. Therefore, I think it's fine that we don't advertise the fact that fsspec can do that part too, and open everything on the backend as "rb"/"wb", uncompressed. The TODO would be resolved if at some point we decided that fsspec should handle all file ops, which is not likely in the near term.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, so I think the TODO can be removed.

return file_obj, encoding, compression, True

if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
return _expand_user(filepath_or_buffer), None, compression, False
Expand Down
18 changes: 0 additions & 18 deletions pandas/io/gcs.py

This file was deleted.

26 changes: 7 additions & 19 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from pandas import DataFrame, get_option

from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
from pandas.io.common import get_filepath_or_buffer, is_fsspec_url


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -157,13 +157,13 @@ def write(
if partition_cols is not None:
kwargs["file_scheme"] = "hive"

if is_s3_url(path) or is_gcs_url(path):
if is_fsspec_url(path):
import fsspec

# if path is s3:// or gs:// we need to open the file in 'wb' mode.
# TODO: Support 'ab'

path, _, _, _ = get_filepath_or_buffer(path, mode="wb")
# And pass the opened file to the fastparquet internal impl.
kwargs["open_with"] = lambda path, _: path
kwargs["open_with"] = lambda path, _: fsspec.open(path, "wb").open()
else:
path, _, _, _ = get_filepath_or_buffer(path)

Expand All @@ -178,20 +178,8 @@ def write(
)

def read(self, path, columns=None, **kwargs):
if is_s3_url(path):
from pandas.io.s3 import get_file_and_filesystem

# When path is s3:// an S3File is returned.
# We need to retain the original path(str) while also
# pass the S3File().open function to fsatparquet impl.
s3, filesystem = get_file_and_filesystem(path)
try:
parquet_file = self.api.ParquetFile(path, open_with=filesystem.open)
finally:
s3.close()
else:
path, _, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path)
path, _, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path)

return parquet_file.to_pandas(columns=columns, **kwargs)

Expand Down
49 changes: 0 additions & 49 deletions pandas/io/s3.py

This file was deleted.

70 changes: 70 additions & 0 deletions pandas/tests/io/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import gc

import numpy as np
import pytest

from pandas import DataFrame, date_range, read_csv
import pandas._testing as tm
from pandas.util import _test_decorators as td

from pandas.io.common import is_fsspec_url

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

everything in this module is safe if fsspec is not available?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok to simply skip the module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean fsspec = pytest.importorskip("fsspec")?
test_is_fsspec_url should be moved to test_common in that case.

df1 = DataFrame(
{
"int": [1, 3],
"float": [2.0, np.nan],
"str": ["t", "s"],
"dt": date_range("2018-06-18", periods=2),
}
)
text = df1.to_csv(index=False).encode()


@pytest.fixture
@td.skip_if_installed("fsspec")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this skipped if fsspec is installed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, should be the opposite - but why did the tests run for me??

def cleared_fs():
import fsspec

memfs = fsspec.filesystem("memory")
try:
yield memfs
finally:
memfs.store.clear()


def test_is_fsspec_url():
assert is_fsspec_url("gcs://pandas/somethingelse.com")
assert is_fsspec_url("gs://pandas/somethingelse.com")
assert not is_fsspec_url("random:pandas/somethingelse.com")


def test_read_csv(cleared_fs):
from fsspec.implementations.memory import MemoryFile

cleared_fs.store["test/test.csv"] = MemoryFile(data=text)
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"])

tm.assert_frame_equal(df1, df2)


def test_to_csv(cleared_fs):
df1.to_csv("memory://test/test.csv", index=True)
gc.collect() # pandas does not explicitly close file buffers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(why) is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll double check - maybe this was only need by the previous version of fsspec

df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0)

tm.assert_frame_equal(df1, df2)


@td.skip_if_no("fastparquet")
def test_to_parquet_new_file(monkeypatch):
"""Regression test for writing to a not-yet-existent GCS Parquet file."""
df1.to_parquet(
"memory://test/test.csv", index=True, engine="fastparquet", compression=None
)


@td.skip_if_installed("fsspec")
def test_not_present_exception():
with pytest.raises(ImportError) as e:
read_csv("memory://test/test.csv")
assert "fsspec library is required" in str(e.value)
Loading