-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
ENH: add fsspec support #34266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: add fsspec support #34266
Conversation
I see some parquet changes here, will need a bit of work... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll want a whatsnew in 1.1.0.rst. You can add a subsection to enhancements for this.
environment.yml
Outdated
@@ -98,7 +98,7 @@ dependencies: | |||
|
|||
- pyqt>=5.9.2 # pandas.read_clipboard | |||
- pytables>=3.4.2 # pandas.read_hdf, DataFrame.to_hdf | |||
- s3fs # pandas.read_csv... when using 's3://...' path | |||
- s3fs # pandas.read_csv... when using 's3://...' path (also brings in fsspec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think explicitly list fsspec here, if we're import it.
What should we set the minimum supported version to? We'll add this to a few places in the library (compat._optional
, docs, ...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can update this, and the minimum versions?
pandas/io/common.py
Outdated
try: | ||
import fsspec # noqa: F401 | ||
|
||
return isinstance(url, str) and ("::" in url or "://" in url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also check if there's an fsspec-compatible implementation for that protocol? What's the behavior / error message if you have fsspec
installed but not s3fs
and do read_csv("s3://...")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fsspec gives specific error messages for known protocols that are not available due to missing dependency (s3fs would be such a one), and a different message is the protocol is completely unknown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that error surface to the user? Or does pandas swallow it somewhere along the way?
pandas/io/common.py
Outdated
def get_filepath_or_buffer( | ||
filepath_or_buffer: FilePathOrBuffer, | ||
encoding: Optional[str] = None, | ||
compression: Optional[str] = None, | ||
mode: Optional[str] = None, | ||
**storage_options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a type? Dict[str, Any]
I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Dict[str, Any]
work?
pandas/io/common.py
Outdated
@@ -175,6 +175,7 @@ def get_filepath_or_buffer( | |||
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional | |||
encoding : the encoding to use to decode bytes, default is 'utf-8' | |||
mode : str, optional | |||
storage_options: passed on to fsspec, if using it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storage_options: passed on to fsspec, if using it | |
**storage_options : dict, optional | |
passed on to fsspec.open, if using it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a versionadded tag (1.1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI, this isn't in the public API.
At some point we'll add this keyword to all of our IO routines, which would benefit from the versionadded. But that can be a separate PR.
pandas/tests/io/test_fsspec.py
Outdated
|
||
def test_to_csv(cleared_fs): | ||
df1.to_csv("memory://test/test.csv", index=True) | ||
gc.collect() # pandas does not explicitly close file buffers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(why) is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll double check - maybe this was only need by the previous version of fsspec
pandas/tests/io/test_fsspec.py
Outdated
|
||
|
||
@pytest.fixture | ||
@td.skip_if_installed("fsspec") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this skipped if fsspec is installed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, should be the opposite - but why did the tests run for me??
pandas/tests/io/test_s3.py
Outdated
@@ -1,25 +0,0 @@ | |||
from io import BytesIO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again: ideally we keep the old tests, aside from the is_s3_url
one.
Hello @martindurant! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found: There are currently no PEP 8 issues detected in this Pull Request. Cheers! 🍻 Comment last updated at 2020-06-19 18:15:56 UTC |
(NB: some failures are due to v0.7.4 of fsspec only being on conda-forge) |
but I will need some help with
(i.e., I know that, given the input, the output is str and not None, but how do I tell mypy that) |
pandas/io/common.py
Outdated
@@ -175,6 +175,7 @@ def get_filepath_or_buffer( | |||
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional | |||
encoding : the encoding to use to decode bytes, default is 'utf-8' | |||
mode : str, optional | |||
storage_options: passed on to fsspec, if using it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a versionadded tag (1.1)
pandas/tests/io/test_fsspec.py
Outdated
from pandas.util import _test_decorators as td | ||
|
||
from pandas.io.common import is_fsspec_url | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything in this module is safe if fsspec is not available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok to simply skip the module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean fsspec = pytest.importorskip("fsspec")
?
test_is_fsspec_url
should be moved to test_common in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linting error: https://github.com/pandas-dev/pandas/pull/34266/checks?check_run_id=697447968#step:11:10
Can you run mypy on this locally?
CI failures at https://dev.azure.com/pandas-dev/pandas/_build/results?buildId=35892&view=logs&j=98c84d08-34d8-513e-80be-2c581992dd5a&t=f75024c1-aff0-57fb-4f0e-35923b654e09&l=208. Do we need to specify a minimum version of fsspec?
cc @gfyoung if you have a chance to review.
pandas/io/common.py
Outdated
@@ -175,6 +175,7 @@ def get_filepath_or_buffer( | |||
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional | |||
encoding : the encoding to use to decode bytes, default is 'utf-8' | |||
mode : str, optional | |||
storage_options: passed on to fsspec, if using it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI, this isn't in the public API.
At some point we'll add this keyword to all of our IO routines, which would benefit from the versionadded. But that can be a separate PR.
pandas/io/common.py
Outdated
file_obj = fsspec.open( | ||
filepath_or_buffer, mode=mode or "rb", **storage_options | ||
).open() | ||
# TODO: both fsspec and pandas handle compression and encoding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would resolve the TODO here? To not handle compression or encoding in pandas? Can you update the comment to indicate that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that pandas must still handle compression and encoding for local and http, that code will not be deprecated. Therefore, I think it's fine that we don't advertise the fact that fsspec can do that part too, and open everything on the backend as "rb"/"wb", uncompressed. The TODO would be resolved if at some point we decided that fsspec should handle all file ops, which is not likely in the near term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, so I think the TODO can be removed.
environment.yml
Outdated
@@ -98,7 +98,7 @@ dependencies: | |||
|
|||
- pyqt>=5.9.2 # pandas.read_clipboard | |||
- pytables>=3.4.2 # pandas.read_hdf, DataFrame.to_hdf | |||
- s3fs # pandas.read_csv... when using 's3://...' path | |||
- s3fs # pandas.read_csv... when using 's3://...' path (also brings in fsspec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can update this, and the minimum versions?
pandas/io/common.py
Outdated
def get_filepath_or_buffer( | ||
filepath_or_buffer: FilePathOrBuffer, | ||
encoding: Optional[str] = None, | ||
compression: Optional[str] = None, | ||
mode: Optional[str] = None, | ||
**storage_options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Dict[str, Any]
work?
pandas/io/parquet.py
Outdated
@@ -107,6 +102,11 @@ def write( | |||
# write_to_dataset does not support a file-like object when | |||
# a directory path is used, so just pass the path string. | |||
if partition_cols is not None: | |||
if is_fsspec_url(path) and "filesystem" not in kwargs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you leave a comment explaining this "filesystem" not in kwargs
check? It's not obvious to me why it's needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fsspec
, you can specify the exact protocol you would like beyond that inferred from the URL. Given that we don't pass storage_options through yet, perhaps this gives more flexibility than required and I can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, edit on that: this is the filesystem parameter (i.e., an actual instance) to pyarrow. I have no idea if people might currently be using that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're saying the user could pass a filesystem like
df.to_parquet(..., filesystem=filesystem)
That certainly seems possible. Could you ensure that we have a test for that?
I need help with this. The return of
There runs are using an older fsspec from conda defaults. I can ask the conda team to update the version, but we should be certain that this PR doesn't require any further changes in fsspec. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a new subsection to "Enhancements" to doc/source/whatsnew/v1.1.0.rst`?
pandas/io/common.py
Outdated
file_obj = fsspec.open( | ||
filepath_or_buffer, mode=mode or "rb", **storage_options | ||
).open() | ||
# TODO: both fsspec and pandas handle compression and encoding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, so I think the TODO can be removed.
pandas/io/parquet.py
Outdated
@@ -107,6 +102,11 @@ def write( | |||
# write_to_dataset does not support a file-like object when | |||
# a directory path is used, so just pass the path string. | |||
if partition_cols is not None: | |||
if is_fsspec_url(path) and "filesystem" not in kwargs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're saying the user could pass a filesystem like
df.to_parquet(..., filesystem=filesystem)
That certainly seems possible. Could you ensure that we have a test for that?
Added test with filesystem= Added whatsnew Changed imports, updated comments
doc/source/whatsnew/v1.1.0.rst
Outdated
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
For reading and writing to filesystems other than local and reading from HTTP(S), | ||
the optional dependency ``fsspec`` will be used to dispatch operations. This will give unchanged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link to the original issue at the end of the first sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not fixed yet.
Note: fsspec 0.7.4 will be up on conda defaults soon. At that point I'll remove the WIP tag. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know that we'll need to wait for the fsspec to be updated on defaults. You can update the various ci/deps/*.yml
files to include fsspec>=0.7.4
wherever we include s3fs or gcsfs already. That should force it to use conda-forge till defaults is updated.
Can you also run scripts/generate_pip_deps_from_conda.py
?
doc/source/whatsnew/v1.1.0.rst
Outdated
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
For reading and writing to filesystems other than local and reading from HTTP(S), | ||
the optional dependency ``fsspec`` will be used to dispatch operations. This will give unchanged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not fixed yet.
Those two points were fixed on my local branch, not yet pushed. |
I think one of the test failures in https://dev.azure.com/pandas-dev/pandas/_build/results?buildId=37143&view=logs&j=bef1c175-2c1b-51ae-044a-2437c76fc339&t=770e7bb1-09f5-5ebf-b63b-578d2906aac9 is real.
Should we not be ussing fsspec when reading from a URL? |
In order not to change current behaviour, http(s) is excepted in get buffer, because it was not handled by fsspec previously. |
@jorisvandenbossche are you good with this now that #34500 is in (once the merge issue in the whatsnew is sorted out)? |
self.api.parquet.write_table( | ||
table, file_obj_or_path, compression=compression, **kwargs | ||
) | ||
self.api.parquet.write_table(table, path, compression=compression, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might have been discussed before in this PR, but why this change from file_obj_or_path
to path
? Because with this change, file_obj_or_path
is basically never used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know what, this can be made simpler...
The reason is, pyarrow can take a filesystem+path and do the right thing, and for the partitioning case, this is the only way. I think we can deal without opening the file in the pandas code at all. I'll push that suggestion, and add comments for what the blocks do to be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new code above certainly looks fine, I am only wondering if there were things that get_filepath_or_buffer
was doing before we might now be missing.
It gets a buffer for urls, but I don't think this actually works for writing.
In addition, it also seems to do a os.path.expanduser
on strings. I suppose this is not tested (and not sure many people would rely on that), but that might lead to a regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You want to add expanduser
to the function? I wouldn't bother going through get_filepath_or_buffer
just for this, and as you say, it isn't tested or documented anywhere. In fact, test may be a little tricky for the various OSs, I'm not sure you can guarantee that "~/afile"
-> "${HOME}/afile"
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that it works on master
In [2]: df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
In [3]: df.to_parquet("~/test-4.parquet")
and raises on this branch.
In [3]: df.to_parquet("~/test-5.parquet")
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
<ipython-input-3-c7dcbbaae894> in <module>
----> 1 df.to_parquet("~/test-5.parquet")
~/sandbox/pandas/pandas/util/_decorators.py in wrapper(*args, **kwargs)
197 else:
198 kwargs[new_arg_name] = new_arg_value
--> 199 return func(*args, **kwargs)
200
201 return cast(F, wrapper)
~/sandbox/pandas/pandas/core/frame.py in to_parquet(self, path, engine, compression, index, partition_cols, **kwargs)
2323 index=index,
2324 partition_cols=partition_cols,
-> 2325 **kwargs,
2326 )
2327
~/sandbox/pandas/pandas/io/parquet.py in to_parquet(df, path, engine, compression, index, partition_cols, **kwargs)
266 index=index,
267 partition_cols=partition_cols,
--> 268 **kwargs,
269 )
270
~/sandbox/pandas/pandas/io/parquet.py in write(self, df, path, compression, index, partition_cols, **kwargs)
118 else:
119 # write to single output file
--> 120 self.api.parquet.write_table(table, path, compression=compression, **kwargs)
121
122 def read(self, path, columns=None, **kwargs):
~/Envs/pandas-dev/lib/python3.7/site-packages/pyarrow/parquet.py in write_table(table, where, row_group_size, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, coerce_timestamps, allow_truncated_timestamps, data_page_size, flavor, filesystem, compression_level, **kwargs)
1341 use_deprecated_int96_timestamps=use_int96,
1342 compression_level=compression_level,
-> 1343 **kwargs) as writer:
1344 writer.write_table(table, row_group_size=row_group_size)
1345 except Exception:
~/Envs/pandas-dev/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, where, schema, filesystem, flavor, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, compression_level, **options)
434 filesystem, path = resolve_filesystem_and_path(where, filesystem)
435 if filesystem is not None:
--> 436 sink = self.file_handle = filesystem.open(path, 'wb')
437 else:
438 sink = where
~/Envs/pandas-dev/lib/python3.7/site-packages/pyarrow/filesystem.py in open(self, path, mode)
242 """
243 path = _stringify_path(path)
--> 244 return open(path, mode=mode)
245
246 @property
FileNotFoundError: [Errno 2] No such file or directory: '~/test-5.parquet'
pandas consistently expands ~
to the homedir, so we'll need to ensure we don't regress on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but how do we test this reliably?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have similar tests for csv.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We test the common utility:
pandas/pandas/tests/io/test_common.py
Lines 55 to 68 in 80ba4c4
def test_expand_user(self): | |
filename = "~/sometest" | |
expanded_name = icom._expand_user(filename) | |
assert expanded_name != filename | |
assert os.path.isabs(expanded_name) | |
assert os.path.expanduser(filename) == expanded_name | |
def test_expand_user_normal_path(self): | |
filename = "/somefolder/sometest" | |
expanded_name = icom._expand_user(filename) | |
assert expanded_name == filename | |
assert os.path.expanduser(filename) == expanded_name |
and then probaby suppose that all IO methods use those common utilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little weak...
I'll put in something. By the way, I expect that expanding the home dir didn't work before either when partitioning.
parquet_ds = self.api.parquet.ParquetDataset( | ||
path, filesystem=get_fs_for_path(path), **kwargs | ||
) | ||
if is_fsspec_url(path) and "filesystem" not in kwargs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check this comment?
(I will try if I can write a test that would catch it)
obj.key for obj in s3_resource.Bucket("pandas-test").objects.all() | ||
) | ||
timeout = 5 | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is concerning. Do you know what's causing it? I would think that everything is synchronous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why... S3 is not supposed to be immediately consistent, maybe the botocore caches or moto doesn't update it's index immediately, or something like that.
(travis is passing, but has not updated in the list above, at least for me) |
@jorisvandenbossche , anything else? |
parquet_ds = self.api.parquet.ParquetDataset( | ||
path, filesystem=get_fs_for_path(path), **kwargs | ||
) | ||
if is_fsspec_url(path) and "filesystem" not in kwargs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping for this one
else: | ||
fs = kwargs.pop("filesystem", None) | ||
should_close = False | ||
path = _expand_user(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No blocking comment (could also be as a follow-up), but for my understanding (and could maybe use a code comment): what was the reason again that this whole is_fsspec_url(path): .. else: ..
block cannot be handled by get_filepath_or_buffer
?
(which would eg handle the _expand_user
as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That we are not getting an open file, we are passing a path and filesystem to arrow for it to handle. This is important for the partitioning case, but works for the simple case too. Wasn't it your suggestion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't it your suggestion?
I don't think so, since this was already there from the beginning of the PR before my first review ;) Not that it matters much.
Anyway, to answer my own question (I think): on a second look, this is simply because get_filepath_or_buffer
doesn't return a filesystem (and which is indeed needed here, as you indicated). I was just confused about that for a moment.
Thanks! |
Phew :) |
Thanks! |
Adapt a mock for the changed implementation at pandas, see pandas-dev/pandas#34266. Closes #51.
Supersedes #33549
closes #33452
black pandas
git diff upstream/master -u -- "*.py" | flake8 --diff