-
-
Notifications
You must be signed in to change notification settings - Fork 18.4k
ENH: add fsspec support #34266
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: add fsspec support #34266
Changes from 10 commits
94e717f
fd7e072
302ba13
9e6d3b2
4564c8d
0654537
8d45cbb
006e736
724ebd8
9da1689
a595411
6dd1e92
6e13df7
3262063
4bc2411
68644ab
32bc586
037ef2c
c3c3075
85d6452
263dd3b
d0afbc3
6a587a5
b2992c1
9c03745
7982e7b
946297b
145306e
06e5a3a
8f3854c
50c08c8
9b20dc6
eb90fe8
b3e2cd2
4977a00
29a9785
565031b
606ce11
60b80a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -197,6 +197,20 @@ For a full example, see: :ref:`timeseries.adjust-the-start-of-the-bins`. | |
|
||
.. _whatsnew_110.enhancements.other: | ||
|
||
fsspec now used for filesystem handling | ||
jorisvandenbossche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
For reading and writing to filesystems other than local and reading from HTTP(S), | ||
the optional dependency ``fsspec`` will be used to dispatch operations. This will give unchanged | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Link to the original issue at the end of the first sentence. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also not fixed yet. |
||
functionality for S3 and GCS storage, which were already supported, but also add | ||
support for several other storage implementations such as Azure Datalake and Blob, | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
SSH, FTP, dropbox and github. For docs and capabilities, see the `fsspec docs`_. | ||
|
||
In the future, we will implement a way to pass parameters to the invoked | ||
filesystem instances. | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
.. _fsspec docs: https://filesystem-spec.readthedocs.io/en/latest/ | ||
|
||
Other enhancements | ||
^^^^^^^^^^^^^^^^^^ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
|
||
from pandas._typing import FilePathOrBuffer | ||
from pandas.compat import _get_lzma_file, _import_lzma | ||
from pandas.compat._optional import import_optional_dependency | ||
|
||
from pandas.core.dtypes.common import is_file_like | ||
|
||
|
@@ -126,20 +127,6 @@ def stringify_path( | |
return _expand_user(filepath_or_buffer) | ||
|
||
|
||
def is_s3_url(url) -> bool: | ||
"""Check for an s3, s3n, or s3a url""" | ||
if not isinstance(url, str): | ||
return False | ||
return parse_url(url).scheme in ["s3", "s3n", "s3a"] | ||
|
||
|
||
def is_gcs_url(url) -> bool: | ||
"""Check for a gcs url""" | ||
if not isinstance(url, str): | ||
return False | ||
return parse_url(url).scheme in ["gcs", "gs"] | ||
|
||
|
||
def urlopen(*args, **kwargs): | ||
""" | ||
Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of | ||
|
@@ -150,38 +137,20 @@ def urlopen(*args, **kwargs): | |
return urllib.request.urlopen(*args, **kwargs) | ||
|
||
|
||
def get_fs_for_path(filepath: str): | ||
def is_fsspec_url(url: FilePathOrBuffer) -> bool: | ||
""" | ||
Get appropriate filesystem given a filepath. | ||
Supports s3fs, gcs and local file system. | ||
|
||
Parameters | ||
---------- | ||
filepath : str | ||
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj | ||
|
||
Returns | ||
------- | ||
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None | ||
Appropriate FileSystem to use. None for local filesystem. | ||
Returns true if fsspec is installed and the given URL looks like | ||
something fsspec can handle | ||
jreback marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
if is_s3_url(filepath): | ||
from pandas.io import s3 | ||
|
||
return s3.get_fs() | ||
elif is_gcs_url(filepath): | ||
from pandas.io import gcs | ||
|
||
return gcs.get_fs() | ||
else: | ||
return None | ||
return isinstance(url, str) and ("::" in url or "://" in url) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's for compound URLs, e.g., to enable local caching like "simplecache::s3://bucket/path" (or indeed via dask workers) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there ever one of those doesn't also include a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do special-case to assume "file://" where there is no protocol, but happy to drop that possibility in this use case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're saying that something like I think for now I'd prefer to avoid that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
|
||
|
||
def get_filepath_or_buffer( | ||
filepath_or_buffer: FilePathOrBuffer, | ||
encoding: Optional[str] = None, | ||
compression: Optional[str] = None, | ||
mode: Optional[str] = None, | ||
**storage_options: Dict[str, Any], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we want callers to pass a dict or collect additional kwargs here, the docstring implies the former. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not passing anything at all yet, so I don't mind whether it's kwargs or a dict keyword. I imagine in a user function like read_csv, there would be a
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
""" | ||
If the filepath_or_buffer is a url, translate and return the buffer. | ||
|
@@ -194,6 +163,8 @@ def get_filepath_or_buffer( | |
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional | ||
encoding : the encoding to use to decode bytes, default is 'utf-8' | ||
mode : str, optional | ||
storage_options: dict | ||
passed on to fsspec, if using it; this is not yet accessed by the public API | ||
|
||
Returns | ||
------- | ||
|
@@ -204,6 +175,7 @@ def get_filepath_or_buffer( | |
filepath_or_buffer = stringify_path(filepath_or_buffer) | ||
|
||
if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer): | ||
# TODO: fsspec can also handle HTTP via requests, but leaving this unchanged | ||
req = urlopen(filepath_or_buffer) | ||
content_encoding = req.headers.get("Content-Encoding", None) | ||
if content_encoding == "gzip": | ||
|
@@ -213,19 +185,13 @@ def get_filepath_or_buffer( | |
req.close() | ||
return reader, encoding, compression, True | ||
|
||
if is_s3_url(filepath_or_buffer): | ||
from pandas.io import s3 | ||
|
||
return s3.get_filepath_or_buffer( | ||
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode | ||
) | ||
if is_fsspec_url(filepath_or_buffer): | ||
fsspec = import_optional_dependency("fsspec") | ||
|
||
if is_gcs_url(filepath_or_buffer): | ||
from pandas.io import gcs | ||
|
||
return gcs.get_filepath_or_buffer( | ||
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode | ||
) | ||
file_obj = fsspec.open( | ||
filepath_or_buffer, mode=mode or "rb", **storage_options | ||
).open() | ||
return file_obj, encoding, compression, True | ||
|
||
if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): | ||
return _expand_user(filepath_or_buffer), None, compression, False | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,12 +8,7 @@ | |
|
||
from pandas import DataFrame, get_option | ||
|
||
from pandas.io.common import ( | ||
get_filepath_or_buffer, | ||
get_fs_for_path, | ||
is_gcs_url, | ||
is_s3_url, | ||
) | ||
from pandas.io.common import get_filepath_or_buffer, is_fsspec_url | ||
|
||
|
||
def get_engine(engine: str) -> "BaseImpl": | ||
|
@@ -107,6 +102,14 @@ def write( | |
# write_to_dataset does not support a file-like object when | ||
# a directory path is used, so just pass the path string. | ||
if partition_cols is not None: | ||
# user may provide filesystem= with an instance, in which case it takes | ||
# priority and fsspec need not analyse the path | ||
if is_fsspec_url(path) and "filesystem" not in kwargs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you leave a comment explaining this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, edit on that: this is the filesystem parameter (i.e., an actual instance) to pyarrow. I have no idea if people might currently be using that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, you're saying the user could pass a filesystem like
That certainly seems possible. Could you ensure that we have a test for that? |
||
import_optional_dependency("fsspec") | ||
import fsspec.core | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
fs, path = fsspec.core.url_to_fs(path) | ||
kwargs["filesystem"] = fs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could potentially also be useful for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the non-partitioned case, we pass a file-like object directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could not process the path into a file object and pass the filesystem in both cases, if preferred. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If that's what we do right now, fine to leave it like that in this PR. |
||
self.api.parquet.write_to_dataset( | ||
table, | ||
path, | ||
|
@@ -122,9 +125,17 @@ def write( | |
file_obj_or_path.close() | ||
|
||
def read(self, path, columns=None, **kwargs): | ||
parquet_ds = self.api.parquet.ParquetDataset( | ||
path, filesystem=get_fs_for_path(path), **kwargs | ||
) | ||
if is_fsspec_url(path) and "filesystem" not in kwargs: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you additionally check that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you check this comment? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ping for this one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I am also fine with doing this as a follow-up myself) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought you meant that you intended to handle it; and yes please, you are in the best place to check the finer details of the calls to pyarrow. |
||
import_optional_dependency("fsspec") | ||
import fsspec.core | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
fs, path = fsspec.core.url_to_fs(path) | ||
martindurant marked this conversation as resolved.
Show resolved
Hide resolved
|
||
parquet_ds = self.api.parquet.ParquetDataset(path, filesystem=fs, **kwargs) | ||
else: | ||
parquet_ds = self.api.parquet.ParquetDataset(path, **kwargs) | ||
# this key valid for ParquetDataset but not read_pandas | ||
kwargs.pop("filesystem", None) | ||
|
||
kwargs["columns"] = columns | ||
result = parquet_ds.read_pandas(**kwargs).to_pandas() | ||
return result | ||
|
@@ -164,13 +175,11 @@ def write( | |
if partition_cols is not None: | ||
kwargs["file_scheme"] = "hive" | ||
|
||
if is_s3_url(path) or is_gcs_url(path): | ||
# if path is s3:// or gs:// we need to open the file in 'wb' mode. | ||
# TODO: Support 'ab' | ||
if is_fsspec_url(path): | ||
fsspec = import_optional_dependency("fsspec") | ||
|
||
path, _, _, _ = get_filepath_or_buffer(path, mode="wb") | ||
# And pass the opened file to the fastparquet internal impl. | ||
kwargs["open_with"] = lambda path, _: path | ||
# if filesystem is provided by fsspec, file must be opened in 'wb' mode. | ||
kwargs["open_with"] = lambda path, _: fsspec.open(path, "wb").open() | ||
else: | ||
path, _, _, _ = get_filepath_or_buffer(path) | ||
|
||
|
@@ -185,17 +194,11 @@ def write( | |
) | ||
|
||
def read(self, path, columns=None, **kwargs): | ||
if is_s3_url(path): | ||
from pandas.io.s3 import get_file_and_filesystem | ||
if is_fsspec_url(path): | ||
fsspec = import_optional_dependency("fsspec") | ||
|
||
# When path is s3:// an S3File is returned. | ||
# We need to retain the original path(str) while also | ||
# pass the S3File().open function to fastparquet impl. | ||
s3, filesystem = get_file_and_filesystem(path) | ||
try: | ||
parquet_file = self.api.ParquetFile(path, open_with=filesystem.open) | ||
finally: | ||
s3.close() | ||
open_with = lambda path, _: fsspec.open(path, "rb").open() | ||
parquet_file = self.api.ParquetFile(path, open_with=open_with) | ||
else: | ||
path, _, _, _ = get_filepath_or_buffer(path) | ||
parquet_file = self.api.ParquetFile(path) | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would clarify the note a bit further, as now it sounds you need this for any kind of file operations, something like "for remote filesystems" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, it handles other things too, just not "local" or "http(s)". That would make for a long comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then something like "filesystems other than local or http(s)" isn't too long I would say