Skip to content

IO: Fix parquet read from s3 directory #33632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
aa94fe7
parquet init
alimcmaster1 Apr 18, 2020
a30c71a
Doc Str
alimcmaster1 Apr 18, 2020
b2747eb
Simplify read
alimcmaster1 Apr 18, 2020
a51757a
Fix writer with partition
alimcmaster1 Apr 18, 2020
968f3b6
Test case
alimcmaster1 Apr 18, 2020
789f4ca
Clean up test case
alimcmaster1 Apr 18, 2020
040763e
Add whatsnew
alimcmaster1 Apr 18, 2020
40f5889
Clean ups
alimcmaster1 Apr 18, 2020
753d647
Clean ups
alimcmaster1 Apr 18, 2020
e4dcdc3
Update whatsnew
alimcmaster1 Apr 18, 2020
bb21431
Add skip if no
alimcmaster1 Apr 18, 2020
fb38932
Fix import
alimcmaster1 Apr 18, 2020
c29befd
Removed fixed xfail
alimcmaster1 Apr 18, 2020
4f78fc5
remove import
alimcmaster1 Apr 18, 2020
4b2828b
Merge master
alimcmaster1 Apr 21, 2020
463c2ea
Merge remote-tracking branch 'upstream/master' into mcmali-parquet
alimcmaster1 Apr 21, 2020
dabfe58
Add further test case
alimcmaster1 Apr 21, 2020
dea95f3
Update parquet.py
alimcmaster1 Apr 25, 2020
ae76e42
Update parquet.py
alimcmaster1 Apr 25, 2020
4b48326
Add whatsnew 2
alimcmaster1 Apr 26, 2020
211c36e
Rename var
alimcmaster1 Apr 26, 2020
4897a32
Improve get_fs_for_path docstring
alimcmaster1 Apr 26, 2020
bba4040
Merge remote-tracking branch 'origin/mcmali-parquet' into mcmali-parquet
alimcmaster1 Apr 26, 2020
5bc6327
Add doc example
alimcmaster1 Apr 26, 2020
ca89c21
Make whatsnew clearer
alimcmaster1 Apr 26, 2020
0df818e
Merge remote-tracking branch 'upstream/master' into mcmali-parquet
alimcmaster1 Apr 26, 2020
2a1a85c
Merge remote-tracking branch 'upstream/master' into mcmali-parquet
alimcmaster1 Apr 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/whatsnew/v1.1.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ I/O
- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`)
- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`)
- Bug in :meth:`~DataFrame.read_feather` was raising an `ArrowIOError` when reading an s3 or http file path (:issue:`29055`)
- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`)
- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`)

Plotting
^^^^^^^^
Expand Down
27 changes: 27 additions & 0 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,33 @@ def urlopen(*args, **kwargs):
return urllib.request.urlopen(*args, **kwargs)


def get_fs_for_path(filepath: str):
"""
Get appropriate filesystem given a filepath.
Supports s3fs, gcs and local file system.

Parameters
----------
filepath : str
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj

Returns
-------
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
Appropriate FileSystem to use. None for local filesystem.
"""
if is_s3_url(filepath):
from pandas.io import s3

return s3.get_fs()
elif is_gcs_url(filepath):
from pandas.io import gcs

return gcs.get_fs()
else:
return None


def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
Expand Down
6 changes: 5 additions & 1 deletion pandas/io/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
)


def get_fs():
return gcsfs.GCSFileSystem()


def get_filepath_or_buffer(
filepath_or_buffer, encoding=None, compression=None, mode=None
):

if mode is None:
mode = "rb"

fs = gcsfs.GCSFileSystem()
fs = get_fs()
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
return filepath_or_buffer, None, compression, True
33 changes: 19 additions & 14 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

from pandas import DataFrame, get_option

from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
from pandas.io.common import (
get_filepath_or_buffer,
get_fs_for_path,
is_gcs_url,
is_s3_url,
)


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -92,13 +97,15 @@ def write(
**kwargs,
):
self.validate_dataframe(df)
path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")

from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
if index is not None:
from_pandas_kwargs["preserve_index"] = index

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
# write_to_dataset does not support a file-like object when
# a dircetory path is used, so just pass the path string.
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table,
Expand All @@ -108,20 +115,18 @@ def write(
**kwargs,
)
else:
self.api.parquet.write_table(table, path, compression=compression, **kwargs)
self.api.parquet.write_table(
table, file_obj_or_path, compression=compression, **kwargs
)
if should_close:
path.close()
file_obj_or_path.close()

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)

kwargs["use_pandas_metadata"] = True
result = self.api.parquet.read_table(
path, columns=columns, **kwargs
).to_pandas()
if should_close:
path.close()

parquet_ds = self.api.parquet.ParquetDataset(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alimcmaster1

This change breaks clients that pass a file-like object for path. ParquetDataset doesn't provide the same file-like object handling that the original get_filepath_or_buffer did.

Here's the call stack I'm seeing:

.tox/test/lib/python3.7/site-packages/pandas/io/parquet.py:315: in read_parquet
    return impl.read(path, columns=columns, **kwargs)
.tox/test/lib/python3.7/site-packages/pandas/io/parquet.py:131: in read
    path, filesystem=get_fs_for_path(path), **kwargs
.tox/test/lib/python3.7/site-packages/pyarrow/parquet.py:1162: in __init__
    self.paths = _parse_uri(path_or_paths)
.tox/test/lib/python3.7/site-packages/pyarrow/parquet.py:47: in _parse_uri
    path = _stringify_path(path)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed bug report #34467

path, filesystem=get_fs_for_path(path), **kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this filesystem=get_fs_for_path(path) needed? What happens if you just pass the path? (which I assume has eg a s3://.. in it?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow seems to only allow a file path opposed to a dir path. Removing filesystem arg here throws:

            for path in path_or_paths:
                if not fs.isfile(path):
                    raise IOError('Passed non-file path: {0}'
>                                 .format(path))
E                   OSError: Passed non-file path: s3://pandas-test/parquet_dir

../../../.conda/envs/pandas-dev/lib/python3.7/site-packages/pyarrow/parquet.py:1229: OSError

To repo see the test case test_s3_roundtrip_for_dir I wrote below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK. I see now in pyarrow that apparently string URIs with "s3://..." are not supported (while "hdfs://" is supported). That's something we should fix on the pyarrow side as well. But of course until then this is fine.

)
kwargs["columns"] = columns
result = parquet_ds.read_pandas(**kwargs).to_pandas()
return result


Expand Down Expand Up @@ -273,7 +278,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables``
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``

If you want to pass in a path object, pandas accepts any
``os.PathLike``.
Expand Down
8 changes: 6 additions & 2 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def _strip_schema(url):
return result.netloc + result.path


def get_fs():
return s3fs.S3FileSystem(anon=False)


def get_file_and_filesystem(
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
) -> Tuple[IO, Any]:
Expand All @@ -24,7 +28,7 @@ def get_file_and_filesystem(
if mode is None:
mode = "rb"

fs = s3fs.S3FileSystem(anon=False)
fs = get_fs()
try:
file = fs.open(_strip_schema(filepath_or_buffer), mode)
except (FileNotFoundError, NoCredentialsError):
Expand All @@ -34,7 +38,7 @@ def get_file_and_filesystem(
# aren't valid for that bucket.
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
fs = get_fs()
file = fs.open(_strip_schema(filepath_or_buffer), mode)
return file, fs

Expand Down
40 changes: 33 additions & 7 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
""" test parquet compat """
import datetime
from distutils.version import LooseVersion
import locale
import os
from warnings import catch_warnings

Expand Down Expand Up @@ -131,6 +130,7 @@ def check_round_trip(
read_kwargs=None,
expected=None,
check_names=True,
check_like=False,
repeat=2,
):
"""Verify parquet serializer and deserializer produce the same results.
Expand All @@ -150,6 +150,8 @@ def check_round_trip(
Expected deserialization result, otherwise will be equal to `df`
check_names: list of str, optional
Closed set of column names to be compared
check_like: bool, optional
If True, ignore the order of index & columns.
repeat: int, optional
How many times to repeat the test
"""
Expand All @@ -169,7 +171,9 @@ def compare(repeat):
with catch_warnings(record=True):
actual = read_parquet(path, **read_kwargs)

tm.assert_frame_equal(expected, actual, check_names=check_names)
tm.assert_frame_equal(
expected, actual, check_names=check_names, check_like=check_like
)

if path is None:
with tm.ensure_clean() as path:
Expand Down Expand Up @@ -532,15 +536,37 @@ def test_categorical(self, pa):
expected = df.astype(object)
check_round_trip(df, pa, expected=expected)

# GH#33077 2020-03-27
@pytest.mark.xfail(
locale.getlocale()[0] == "zh_CN",
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
)
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
# GH #19134
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")

@td.skip_if_no("s3fs")
@pytest.mark.parametrize("partition_col", [["A"], []])
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
from pandas.io.s3 import get_fs as get_s3_fs

# GH #26388
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
# As per pyarrow partitioned columns become 'categorical' dtypes
# and are added to back of dataframe on read

expected_df = df_compat.copy()
if partition_col:
expected_df[partition_col] = expected_df[partition_col].astype("category")
check_round_trip(
df_compat,
pa,
expected=expected_df,
path="s3://pandas-test/parquet_dir",
write_kwargs={
"partition_cols": partition_col,
"compression": None,
"filesystem": get_s3_fs(),
},
check_like=True,
repeat=1,
)

def test_partition_cols_supported(self, pa, df_full):
# GH #23283
partition_cols = ["bool", "int"]
Expand Down