Skip to content

ENH: add fsspec support #34266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
94e717f
Add remote file io using fsspec.
Apr 14, 2020
fd7e072
Attempt refactor and clean
May 19, 2020
302ba13
Merge branch 'master' into feature/add-fsspec-support
May 20, 2020
9e6d3b2
readd and adapt s3/gcs tests
May 21, 2020
4564c8d
remove gc from test
May 21, 2020
0654537
Simpler is_fsspec
May 21, 2020
8d45cbb
add test
May 21, 2020
006e736
Answered most points
May 28, 2020
724ebd8
Implemented suggestions
May 28, 2020
9da1689
lint
May 28, 2020
a595411
Add versions info
May 29, 2020
6dd1e92
Update some deps
May 29, 2020
6e13df7
issue link syntax
May 29, 2020
3262063
More specific test versions
Jun 2, 2020
4bc2411
Account for alternate S3 protocols, and ignore type error
Jun 2, 2020
68644ab
Add comment to mypy ignore insrtuction
Jun 2, 2020
32bc586
more mypy
Jun 2, 2020
037ef2c
more black
Jun 2, 2020
c3c3075
Make storage_options a dict rather than swallowing kwargs
Jun 3, 2020
85d6452
More requested changes
Jun 5, 2020
263dd3b
Remove fsspec from locale tests
Jun 10, 2020
d0afbc3
tweak
Jun 10, 2020
6a587a5
Merge branch 'master' into feature/add-fsspec-support
Jun 10, 2020
b2992c1
Merge branch 'master' into feature/add-fsspec-support
Jun 11, 2020
9c03745
requested changes
Jun 11, 2020
7982e7b
add gcsfs to environment.yml
Jun 12, 2020
946297b
rerun deps script
Jun 12, 2020
145306e
Merge branch 'master' into feature/add-fsspec-support
Jun 12, 2020
06e5a3a
account for passed filesystem again
Jun 12, 2020
8f3854c
specify should_close
Jun 12, 2020
50c08c8
lint
Jun 12, 2020
9b20dc6
Except http passed to fsspec in parquet
Jun 12, 2020
eb90fe8
lint
Jun 12, 2020
b3e2cd2
Merge branch 'master' into feature/add-fsspec-support
Jun 16, 2020
4977a00
redo whatsnew
Jun 16, 2020
29a9785
simplify parquet write
Jun 18, 2020
565031b
Retry S3 file probe with timeout, in test_to_s3
Jun 18, 2020
606ce11
expand user in non-fsspec paths for parquet; add test for this
Jun 19, 2020
60b80a6
reorder imports!
Jun 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ dependencies:

- pyqt>=5.9.2 # pandas.read_clipboard
- pytables>=3.4.3 # pandas.read_hdf, DataFrame.to_hdf
- s3fs # pandas.read_csv... when using 's3://...' path
- s3fs>=0.4.0 # pandas.read_csv... when using 's3://...' path (also brings in fsspec)
- sqlalchemy # pandas.read_sql, DataFrame.to_sql
- xarray # DataFrame.to_xarray
- cftime # Needed for downstream xarray.CFTimeIndex test
Expand Down
64 changes: 15 additions & 49 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,6 @@ def stringify_path(
return _expand_user(filepath_or_buffer)


def is_s3_url(url) -> bool:
"""Check for an s3, s3n, or s3a url"""
if not isinstance(url, str):
return False
return parse_url(url).scheme in ["s3", "s3n", "s3a"]


def is_gcs_url(url) -> bool:
"""Check for a gcs url"""
if not isinstance(url, str):
return False
return parse_url(url).scheme in ["gcs", "gs"]


def urlopen(*args, **kwargs):
"""
Lazy-import wrapper for stdlib urlopen, as that imports a big chunk of
Expand All @@ -150,38 +136,20 @@ def urlopen(*args, **kwargs):
return urllib.request.urlopen(*args, **kwargs)


def get_fs_for_path(filepath: str):
def is_fsspec_url(url: FilePathOrBuffer) -> bool:
"""
Get appropriate filesystem given a filepath.
Supports s3fs, gcs and local file system.

Parameters
----------
filepath : str
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj

Returns
-------
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
Appropriate FileSystem to use. None for local filesystem.
Returns true if fsspec is installed and the given URL looks like
something fsspec can handle
"""
if is_s3_url(filepath):
from pandas.io import s3

return s3.get_fs()
elif is_gcs_url(filepath):
from pandas.io import gcs

return gcs.get_fs()
else:
return None
return isinstance(url, str) and ("::" in url or "://" in url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the :: for? If it's necessary, can you ensure that we have tests for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's for compound URLs, e.g., to enable local caching like "simplecache::s3://bucket/path" (or indeed via dask workers)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever one of those doesn't also include a ://? I'd prefer to keep this check as narrow as possible, just to avoid surprises.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do special-case to assume "file://" where there is no protocol, but happy to drop that possibility in this use case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're saying that something like simplecache::foo/bar.csv will be converted to simplecache::file://foo/bar.csv?

I think for now I'd prefer to avoid that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok



def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
compression: Optional[str] = None,
mode: Optional[str] = None,
**storage_options: Dict[str, Any],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want callers to pass a dict or collect additional kwargs here, the docstring implies the former.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not passing anything at all yet, so I don't mind whether it's kwargs or a dict keyword. I imagine in a user function like read_csv, there would be a storage_options={...}, which is the common usage in Dask and Intake.

):
"""
If the filepath_or_buffer is a url, translate and return the buffer.
Expand All @@ -194,6 +162,8 @@ def get_filepath_or_buffer(
compression : {{'gzip', 'bz2', 'zip', 'xz', None}}, optional
encoding : the encoding to use to decode bytes, default is 'utf-8'
mode : str, optional
storage_options: dict
passed on to fsspec, if using it; this is not yet accessed by the public API

Returns
-------
Expand All @@ -204,6 +174,7 @@ def get_filepath_or_buffer(
filepath_or_buffer = stringify_path(filepath_or_buffer)

if isinstance(filepath_or_buffer, str) and is_url(filepath_or_buffer):
# TODO: fsspec can also handle HTTP via requests, but leaving this unchanged
req = urlopen(filepath_or_buffer)
content_encoding = req.headers.get("Content-Encoding", None)
if content_encoding == "gzip":
Expand All @@ -213,19 +184,14 @@ def get_filepath_or_buffer(
req.close()
return reader, encoding, compression, True

if is_s3_url(filepath_or_buffer):
from pandas.io import s3

return s3.get_filepath_or_buffer(
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
)
if is_fsspec_url(filepath_or_buffer):
import fsspec

if is_gcs_url(filepath_or_buffer):
from pandas.io import gcs

return gcs.get_filepath_or_buffer(
filepath_or_buffer, encoding=encoding, compression=compression, mode=mode
)
file_obj = fsspec.open(
filepath_or_buffer, mode=mode or "rb", **storage_options
).open()
# TODO: both fsspec and pandas handle compression and encoding
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would resolve the TODO here? To not handle compression or encoding in pandas? Can you update the comment to indicate that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that pandas must still handle compression and encoding for local and http, that code will not be deprecated. Therefore, I think it's fine that we don't advertise the fact that fsspec can do that part too, and open everything on the backend as "rb"/"wb", uncompressed. The TODO would be resolved if at some point we decided that fsspec should handle all file ops, which is not likely in the near term.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, so I think the TODO can be removed.

return file_obj, encoding, compression, True

if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)):
return _expand_user(filepath_or_buffer), None, compression, False
Expand Down
22 changes: 0 additions & 22 deletions pandas/io/gcs.py

This file was deleted.

47 changes: 22 additions & 25 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@

from pandas import DataFrame, get_option

from pandas.io.common import (
get_filepath_or_buffer,
get_fs_for_path,
is_gcs_url,
is_s3_url,
)
from pandas.io.common import get_filepath_or_buffer, is_fsspec_url


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -107,6 +102,11 @@ def write(
# write_to_dataset does not support a file-like object when
# a directory path is used, so just pass the path string.
if partition_cols is not None:
if is_fsspec_url(path) and "filesystem" not in kwargs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a comment explaining this "filesystem" not in kwargs check? It's not obvious to me why it's needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fsspec, you can specify the exact protocol you would like beyond that inferred from the URL. Given that we don't pass storage_options through yet, perhaps this gives more flexibility than required and I can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, edit on that: this is the filesystem parameter (i.e., an actual instance) to pyarrow. I have no idea if people might currently be using that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're saying the user could pass a filesystem like

df.to_parquet(..., filesystem=filesystem)

That certainly seems possible. Could you ensure that we have a test for that?

import fsspec.core

fs, path = fsspec.core.url_to_fs(path)
kwargs["filesystem"] = fs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could potentially also be useful for the if partition_cols is None case using write_table ? (and keep the abilities consistent regardless of the partition_cols keyword)
Also write_table takes a filesystem keyword, it seems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the non-partitioned case, we pass a file-like object directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could not process the path into a file object and pass the filesystem in both cases, if preferred.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the non-partitioned case, we pass a file-like object directly.

If that's what we do right now, fine to leave it like that in this PR.

self.api.parquet.write_to_dataset(
table,
path,
Expand All @@ -122,9 +122,14 @@ def write(
file_obj_or_path.close()

def read(self, path, columns=None, **kwargs):
parquet_ds = self.api.parquet.ParquetDataset(
path, filesystem=get_fs_for_path(path), **kwargs
)
if is_fsspec_url(path) and "filesystem" not in kwargs:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you additionally check that use_legacy_dataset=False is not in the kwargs ? As long as fsspec/filesystem_spec#295 is not solved, converting a string URI into a path + fsspec filesystem would make that option unusable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check this comment?
(I will try if I can write a test that would catch it)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping for this one

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I am also fine with doing this as a follow-up myself)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you meant that you intended to handle it; and yes please, you are in the best place to check the finer details of the calls to pyarrow.

import fsspec.core

fs, path = fsspec.core.url_to_fs(path)
parquet_ds = self.api.parquet.ParquetDataset(path, filesystem=fs, **kwargs)
else:
parquet_ds = self.api.parquet.ParquetDataset(path, **kwargs)

kwargs["columns"] = columns
result = parquet_ds.read_pandas(**kwargs).to_pandas()
return result
Expand Down Expand Up @@ -164,13 +169,11 @@ def write(
if partition_cols is not None:
kwargs["file_scheme"] = "hive"

if is_s3_url(path) or is_gcs_url(path):
# if path is s3:// or gs:// we need to open the file in 'wb' mode.
# TODO: Support 'ab'
if is_fsspec_url(path):
import fsspec

path, _, _, _ = get_filepath_or_buffer(path, mode="wb")
# And pass the opened file to the fastparquet internal impl.
kwargs["open_with"] = lambda path, _: path
# if filesystem is provided by fsspec, file must be opened in 'wb' mode.
kwargs["open_with"] = lambda path, _: fsspec.open(path, "wb").open()
else:
path, _, _, _ = get_filepath_or_buffer(path)

Expand All @@ -185,17 +188,11 @@ def write(
)

def read(self, path, columns=None, **kwargs):
if is_s3_url(path):
from pandas.io.s3 import get_file_and_filesystem
if is_fsspec_url(path):
import fsspec

# When path is s3:// an S3File is returned.
# We need to retain the original path(str) while also
# pass the S3File().open function to fastparquet impl.
s3, filesystem = get_file_and_filesystem(path)
try:
parquet_file = self.api.ParquetFile(path, open_with=filesystem.open)
finally:
s3.close()
open_with = lambda path, _: fsspec.open(path, "rb").open()
parquet_file = self.api.ParquetFile(path, open_with=open_with)
else:
path, _, _, _ = get_filepath_or_buffer(path)
parquet_file = self.api.ParquetFile(path)
Expand Down
53 changes: 0 additions & 53 deletions pandas/io/s3.py

This file was deleted.

6 changes: 6 additions & 0 deletions pandas/tests/io/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,9 @@ def test_unknown_engine(self):
df.to_csv(path)
with pytest.raises(ValueError, match="Unknown engine"):
pd.read_csv(path, engine="pyt")


def test_is_fsspec_url():
assert icom.is_fsspec_url("gcs://pandas/somethingelse.com")
assert icom.is_fsspec_url("gs://pandas/somethingelse.com")
assert not icom.is_fsspec_url("random:pandas/somethingelse.com")
98 changes: 98 additions & 0 deletions pandas/tests/io/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import numpy as np
import pytest

from pandas import DataFrame, date_range, read_csv, read_parquet
import pandas._testing as tm
from pandas.util import _test_decorators as td

df1 = DataFrame(
{
"int": [1, 3],
"float": [2.0, np.nan],
"str": ["t", "s"],
"dt": date_range("2018-06-18", periods=2),
}
)
text = df1.to_csv(index=False).encode()


@pytest.fixture
def cleared_fs():
import fsspec

memfs = fsspec.filesystem("memory")
try:
yield memfs
finally:
memfs.store.clear()


@td.skip_if_no("fsspec")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pytest.importorskip inside the cleared_fs fixture? Then you don't need to repeat this.

def test_read_csv(cleared_fs):
from fsspec.implementations.memory import MemoryFile

cleared_fs.store["test/test.csv"] = MemoryFile(data=text)
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"])

tm.assert_frame_equal(df1, df2)


@td.skip_if_no("fsspec")
def test_reasonable_error(monkeypatch):
from fsspec.registry import known_implementations
from fsspec import registry

registry.target.clear()
with pytest.raises(ValueError) as e:
read_csv("nosuchprotocol://test/test.csv")
assert "nosuchprotocol" in str(e.value)
err_mgs = "test error messgae"
monkeypatch.setitem(
known_implementations,
"couldexist",
{"class": "unimportable.CouldExist", "err": err_mgs},
)
with pytest.raises(ImportError) as e:
read_csv("couldexist://test/test.csv")
assert err_mgs in str(e.value)


@td.skip_if_no("fsspec")
def test_to_csv(cleared_fs):
df1.to_csv("memory://test/test.csv", index=True)
df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0)

tm.assert_frame_equal(df1, df2)


@td.skip_if_no("fastparquet")
@td.skip_if_no("fsspec")
def test_to_parquet_new_file(monkeypatch):
"""Regression test for writing to a not-yet-existent GCS Parquet file."""
df1.to_parquet(
"memory://test/test.csv", index=True, engine="fastparquet", compression=None
)


@td.skip_if_no("s3fs")
def test_from_s3_csv(s3_resource, tips_file):
tm.assert_equal(read_csv("s3://pandas-test/tips.csv"), read_csv(tips_file))
# the following are decompressed by pandas, not fsspec
tm.assert_equal(read_csv("s3://pandas-test/tips.csv.gz"), read_csv(tips_file))
tm.assert_equal(read_csv("s3://pandas-test/tips.csv.bz2"), read_csv(tips_file))


@td.skip_if_no("s3fs")
@td.skip_if_no("fastparquet")
def test_s3_parquet(s3_resource):
fn = "s3://pandas-test/test.parquet"
df1.to_parquet(fn, index=False, engine="fastparquet", compression=None)
df2 = read_parquet(fn, engine="fastparquet")
tm.assert_equal(df1, df2)


@td.skip_if_installed("fsspec")
def test_not_present_exception():
with pytest.raises(ImportError) as e:
read_csv("memory://test/test.csv")
assert "fsspec library is required" in str(e.value)
Loading