-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
ENH: add fsspec support #34266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: add fsspec support #34266
Changes from 2 commits
94e717f
fd7e072
302ba13
9e6d3b2
4564c8d
0654537
8d45cbb
006e736
724ebd8
9da1689
a595411
6dd1e92
6e13df7
3262063
4bc2411
68644ab
32bc586
037ef2c
c3c3075
85d6452
263dd3b
d0afbc3
6a587a5
b2992c1
9c03745
7982e7b
946297b
145306e
06e5a3a
8f3854c
50c08c8
9b20dc6
eb90fe8
b3e2cd2
4977a00
29a9785
565031b
606ce11
60b80a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -134,20 +134,6 @@ def stringify_path( | |||||||
return _expand_user(filepath_or_buffer) | ||||||||
|
||||||||
|
||||||||
def is_s3_url(url) -> bool: | ||||||||
"""Check for an s3, s3n, or s3a url""" | ||||||||
if not isinstance(url, str): | ||||||||
return False | ||||||||
return parse_url(url).scheme in ["s3", "s3n", "s3a"] | ||||||||
|
||||||||
|
||||||||
def is_gcs_url(url) -> bool: | ||||||||
"""Check for a gcs url""" | ||||||||
if not isinstance(url, str): | ||||||||
return False | ||||||||
return parse_url(url).scheme in ["gcs", "gs"] | ||||||||
|
||||||||
|
||||||||
def urlopen(*args, **kwargs): | ||||||||
""" | ||||||||
Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of | ||||||||
|
@@ -158,11 +144,25 @@ def urlopen(*args, **kwargs): | |||||||
return urllib.request.urlopen(*args, **kwargs) | ||||||||
|
||||||||
|
||||||||
def is_fsspec_url(url: FilePathOrBuffer) -> bool: | ||||||||
""" | ||||||||
Returns true if fsspec is installed and the given URL looks like | ||||||||
something fsspec can handle | ||||||||
jreback marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
""" | ||||||||
try: | ||||||||
import fsspec # noqa: F401 | ||||||||
|
||||||||
return isinstance(url, str) and ("::" in url or "://" in url) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this also check if there's an fsspec-compatible implementation for that protocol? What's the behavior / error message if you have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fsspec gives specific error messages for known protocols that are not available due to missing dependency (s3fs would be such a one), and a different message is the protocol is completely unknown. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does that error surface to the user? Or does pandas swallow it somewhere along the way? |
||||||||
except ImportError: | ||||||||
return False | ||||||||
|
||||||||
|
||||||||
def get_filepath_or_buffer( | ||||||||
filepath_or_buffer: FilePathOrBuffer, | ||||||||
encoding: Optional[str] = None, | ||||||||
compression: Optional[str] = None, | ||||||||
mode: Optional[str] = None, | ||||||||
**storage_options, | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does |
||||||||
): | ||||||||
""" | ||||||||
If the filepath_or_buffer is a url, translate and return the buffer. | ||||||||
|
@@ -175,6 +175,7 @@ def get_filepath_or_buffer( | |||||||
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional | ||||||||
encoding : the encoding to use to decode bytes, default is 'utf-8' | ||||||||
mode : str, optional | ||||||||
storage_options: passed on to fsspec, if using it | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a versionadded tag (1.1) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just FYI, this isn't in the public API. At some point we'll add this keyword to all of our IO routines, which would benefit from the versionadded. But that can be a separate PR. |
||||||||
|
||||||||
Returns | ||||||||
------- | ||||||||
|
@@ -185,6 +186,7 @@ def get_filepath_or_buffer( | |||||||
filepath_or_buffer = stringify_path(filepath_or_buffer) | ||||||||
|
||||||||
if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): | ||||||||
# TODO: fsspec can also handle HTTP via requests, but leaving this unchanged | ||||||||
req = urlopen(filepath_or_buffer) | ||||||||
content_encoding = req.headers.get("Content-Encoding", None) | ||||||||
if content_encoding == "gzip": | ||||||||
|
@@ -194,19 +196,14 @@ def get_filepath_or_buffer( | |||||||
req.close() | ||||||||
return reader, encoding, compression, True | ||||||||
|
||||||||
if is_s3_url(filepath_or_buffer): | ||||||||
from pandas.io import s3 | ||||||||
|
||||||||
return s3.get_filepath_or_buffer( | ||||||||
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode | ||||||||
) | ||||||||
if is_fsspec_url(filepath_or_buffer): | ||||||||
import fsspec | ||||||||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
|
||||||||
if is_gcs_url(filepath_or_buffer): | ||||||||
from pandas.io import gcs | ||||||||
|
||||||||
return gcs.get_filepath_or_buffer( | ||||||||
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode | ||||||||
) | ||||||||
file_obj = fsspec.open( | ||||||||
filepath_or_buffer, mode=mode or "rb", **storage_options | ||||||||
).open() | ||||||||
# TODO: both fsspec and pandas handle compression and encoding | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would resolve the TODO here? To not handle compression or encoding in pandas? Can you update the comment to indicate that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given that pandas must still handle compression and encoding for local and http, that code will not be deprecated. Therefore, I think it's fine that we don't advertise the fact that fsspec can do that part too, and open everything on the backend as "rb"/"wb", uncompressed. The TODO would be resolved if at some point we decided that fsspec should handle all file ops, which is not likely in the near term. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, so I think the TODO can be removed. |
||||||||
return file_obj, encoding, compression, True | ||||||||
|
||||||||
if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): | ||||||||
return _expand_user(filepath_or_buffer), None, compression, False | ||||||||
|
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import gc | ||
|
||
import numpy as np | ||
import pytest | ||
|
||
from pandas import DataFrame, date_range, read_csv | ||
import pandas._testing as tm | ||
from pandas.util import _test_decorators as td | ||
|
||
from pandas.io.common import is_fsspec_url | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. everything in this module is safe if fsspec is not available? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok to simply skip the module There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you mean |
||
df1 = DataFrame( | ||
{ | ||
"int": [1, 3], | ||
"float": [2.0, np.nan], | ||
"str": ["t", "s"], | ||
"dt": date_range("2018-06-18", periods=2), | ||
} | ||
) | ||
text = df1.to_csv(index=False).encode() | ||
TomAugspurger marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
@pytest.fixture | ||
@td.skip_if_installed("fsspec") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this skipped if fsspec is installed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, should be the opposite - but why did the tests run for me?? |
||
def cleared_fs(): | ||
import fsspec | ||
|
||
memfs = fsspec.filesystem("memory") | ||
try: | ||
yield memfs | ||
finally: | ||
memfs.store.clear() | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def test_is_fsspec_url(): | ||
assert is_fsspec_url("gcs://pandas/somethingelse.com") | ||
assert is_fsspec_url("gs://pandas/somethingelse.com") | ||
assert not is_fsspec_url("random:pandas/somethingelse.com") | ||
|
||
|
||
def test_read_csv(cleared_fs): | ||
from fsspec.implementations.memory import MemoryFile | ||
|
||
cleared_fs.store["test/test.csv"] = MemoryFile(data=text) | ||
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"]) | ||
|
||
tm.assert_frame_equal(df1, df2) | ||
|
||
|
||
def test_to_csv(cleared_fs): | ||
df1.to_csv("memory://test/test.csv", index=True) | ||
gc.collect() # pandas does not explicitly close file buffers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (why) is this necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll double check - maybe this was only need by the previous version of fsspec |
||
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0) | ||
|
||
tm.assert_frame_equal(df1, df2) | ||
|
||
|
||
@td.skip_if_no("fastparquet") | ||
def test_to_parquet_new_file(monkeypatch): | ||
"""Regression test for writing to a not-yet-existent GCS Parquet file.""" | ||
df1.to_parquet( | ||
"memory://test/test.csv", index=True, engine="fastparquet", compression=None | ||
) | ||
|
||
|
||
@td.skip_if_installed("fsspec") | ||
def test_not_present_exception(): | ||
with pytest.raises(ImportError) as e: | ||
read_csv("memory://test/test.csv") | ||
assert "fsspec library is required" in str(e.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think explicitly list fsspec here, if we're import it.
What should we set the minimum supported version to? We'll add this to a few places in the library (
compat._optional
, docs, ...).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can update this, and the minimum versions?