Skip to content

IO: Fix parquet read from s3 directory #33632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
aa94fe7
parquet init
alimcmaster1 Apr 18, 2020
a30c71a
Doc Str
alimcmaster1 Apr 18, 2020
b2747eb
Simplify read
alimcmaster1 Apr 18, 2020
a51757a
Fix writer with partition
alimcmaster1 Apr 18, 2020
968f3b6
Test case
alimcmaster1 Apr 18, 2020
789f4ca
Clean up test case
alimcmaster1 Apr 18, 2020
040763e
Add whatsnew
alimcmaster1 Apr 18, 2020
40f5889
Clean ups
alimcmaster1 Apr 18, 2020
753d647
Clean ups
alimcmaster1 Apr 18, 2020
e4dcdc3
Update whatsnew
alimcmaster1 Apr 18, 2020
bb21431
Add skip if no
alimcmaster1 Apr 18, 2020
fb38932
Fix import
alimcmaster1 Apr 18, 2020
c29befd
Removed fixed xfail
alimcmaster1 Apr 18, 2020
4f78fc5
remove import
alimcmaster1 Apr 18, 2020
4b2828b
Merge master
alimcmaster1 Apr 21, 2020
463c2ea
Merge remote-tracking branch 'upstream/master' into mcmali-parquet
alimcmaster1 Apr 21, 2020
dabfe58
Add further test case
alimcmaster1 Apr 21, 2020
dea95f3
Update parquet.py
alimcmaster1 Apr 25, 2020
ae76e42
Update parquet.py
alimcmaster1 Apr 25, 2020
4b48326
Add whatsnew 2
alimcmaster1 Apr 26, 2020
211c36e
Rename var
alimcmaster1 Apr 26, 2020
4897a32
Improve get_fs_for_path docstring
alimcmaster1 Apr 26, 2020
bba4040
Merge remote-tracking branch 'origin/mcmali-parquet' into mcmali-parquet
alimcmaster1 Apr 26, 2020
5bc6327
Add doc example
alimcmaster1 Apr 26, 2020
ca89c21
Make whatsnew clearer
alimcmaster1 Apr 26, 2020
0df818e
Merge remote-tracking branch 'upstream/master' into mcmali-parquet
alimcmaster1 Apr 26, 2020
2a1a85c
Merge remote-tracking branch 'upstream/master' into mcmali-parquet
alimcmaster1 Apr 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/source/whatsnew/v1.0.3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Fixed regressions
Bug fixes
~~~~~~~~~

**I/O**
- :func:`read_parquet` now supports an s3 directory (:issue:`26388`)

Contributors
~~~~~~~~~~~~

Expand Down
19 changes: 18 additions & 1 deletion pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,23 @@ def urlopen(*args, **kwargs):
return urllib.request.urlopen(*args, **kwargs)


def get_fs_for_path(filepath):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you type this (and the return annotation)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left return type for now since it include optional dependencies.

e.g Union[s3fs.S3FileSystem, gcsfs.GCSFileSystem, None]

Can add imports to the TYPE_CHECKING block at the top if that's appropriate?

"""
Get appropriate filesystem given a filepath.
Support s3fs, gcs and local disk fs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make this a full doc-string Paramateres / Returns

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure done :)

"""
if is_s3_url(filepath):
from pandas.io import s3

return s3.get_fs()
elif is_gcs_url(filepath):
from pandas.io import gcs

return gcs.get_fs()
else:
return None


def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
Expand Down Expand Up @@ -192,7 +209,7 @@ def get_filepath_or_buffer(
compression = "gzip"
reader = BytesIO(req.read())
req.close()
return reader, encoding, compression, True
return reader, encoding, compression, True, None

if is_s3_url(filepath_or_buffer):
from pandas.io import s3
Expand Down
6 changes: 5 additions & 1 deletion pandas/io/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
)


def get_fs():
return gcsfs.GCSFileSystem()


def get_filepath_or_buffer(
filepath_or_buffer, encoding=None, compression=None, mode=None
):

if mode is None:
mode = "rb"

fs = gcsfs.GCSFileSystem()
fs = get_fs()
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
return filepath_or_buffer, None, compression, True
28 changes: 15 additions & 13 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

from pandas import DataFrame, get_option

from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
from pandas.io.common import (
get_filepath_or_buffer,
is_gcs_url,
is_s3_url,
get_fs_for_path,
)


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -92,8 +97,7 @@ def write(
**kwargs,
):
self.validate_dataframe(df)
path, _, _, _ = get_filepath_or_buffer(path, mode="wb")

file_obj, _, _, _ = get_filepath_or_buffer(path, mode="wb")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jorisvandenbossche think we can clean up the write method here to get rid of get_filepath_or_buffer similar to what i've done below for read. Will address in different PR.

from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
if index is not None:
from_pandas_kwargs["preserve_index"] = index
Expand All @@ -108,18 +112,16 @@ def write(
**kwargs,
)
else:
self.api.parquet.write_table(table, path, compression=compression, **kwargs)
self.api.parquet.write_table(
table, file_obj, compression=compression, **kwargs
)

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)

kwargs["use_pandas_metadata"] = True
result = self.api.parquet.read_table(
path, columns=columns, **kwargs
).to_pandas()
if should_close:
path.close()

parquet_ds = self.api.parquet.ParquetDataset(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alimcmaster1

This change breaks clients that pass a file-like object for path. ParquetDataset doesn't provide the same file-like object handling that the original get_filepath_or_buffer did.

Here's the call stack I'm seeing:

.tox/test/lib/python3.7/site-packages/pandas/io/parquet.py:315: in read_parquet
    return impl.read(path, columns=columns, **kwargs)
.tox/test/lib/python3.7/site-packages/pandas/io/parquet.py:131: in read
    path, filesystem=get_fs_for_path(path), **kwargs
.tox/test/lib/python3.7/site-packages/pyarrow/parquet.py:1162: in __init__
    self.paths = _parse_uri(path_or_paths)
.tox/test/lib/python3.7/site-packages/pyarrow/parquet.py:47: in _parse_uri
    path = _stringify_path(path)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed bug report #34467

path, filesystem=get_fs_for_path(path), **kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this filesystem=get_fs_for_path(path) needed? What happens if you just pass the path? (which I assume has eg a s3://.. in it?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow seems to only allow a file path opposed to a dir path. Removing filesystem arg here throws:

            for path in path_or_paths:
                if not fs.isfile(path):
                    raise IOError('Passed non-file path: {0}'
>                                 .format(path))
E                   OSError: Passed non-file path: s3://pandas-test/parquet_dir

../../../.conda/envs/pandas-dev/lib/python3.7/site-packages/pyarrow/parquet.py:1229: OSError

To repo see the test case test_s3_roundtrip_for_dir I wrote below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK. I see now in pyarrow that apparently string URIs with "s3://..." are not supported (while "hdfs://" is supported). That's something we should fix on the pyarrow side as well. But of course until then this is fine.

)
kwargs["columns"] = columns
result = parquet_ds.read_pandas(**kwargs).to_pandas()
return result


Expand Down
8 changes: 6 additions & 2 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def _strip_schema(url):
return result.netloc + result.path


def get_fs():
return s3fs.S3FileSystem(anon=False)


def get_file_and_filesystem(
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
) -> Tuple[IO, Any]:
Expand All @@ -24,7 +28,7 @@ def get_file_and_filesystem(
if mode is None:
mode = "rb"

fs = s3fs.S3FileSystem(anon=False)
fs = get_fs()
try:
file = fs.open(_strip_schema(filepath_or_buffer), mode)
except (FileNotFoundError, NoCredentialsError):
Expand All @@ -34,7 +38,7 @@ def get_file_and_filesystem(
# aren't valid for that bucket.
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = s3fs.S3FileSystem(anon=True)
fs = get_fs()
file = fs.open(_strip_schema(filepath_or_buffer), mode)
return file, fs

Expand Down
31 changes: 30 additions & 1 deletion pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
read_parquet,
to_parquet,
)
from pandas.io.s3 import get_fs as get_s3_fs

try:
import pyarrow # noqa
Expand Down Expand Up @@ -131,6 +132,7 @@ def check_round_trip(
read_kwargs=None,
expected=None,
check_names=True,
check_like=False,
repeat=2,
):
"""Verify parquet serializer and deserializer produce the same results.
Expand All @@ -150,6 +152,8 @@ def check_round_trip(
Expected deserialization result, otherwise will be equal to `df`
check_names: list of str, optional
Closed set of column names to be compared
check_like: bool, optional
If True, ignore the order of index & columns.
repeat: int, optional
How many times to repeat the test
"""
Expand All @@ -169,7 +173,9 @@ def compare(repeat):
with catch_warnings(record=True):
actual = read_parquet(path, **read_kwargs)

tm.assert_frame_equal(expected, actual, check_names=check_names)
tm.assert_frame_equal(
expected, actual, check_names=check_names, check_like=check_like
)

if path is None:
with tm.ensure_clean() as path:
Expand Down Expand Up @@ -537,6 +543,29 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa):
# GH #19134
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")

def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa):
# GH #26388
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
# As per pyarrow partitioned columns become 'categorical' dtypes
# and are added to back of dataframe on read

partition_col = "A"
expected_df = df_compat.copy()
expected_df[partition_col] = expected_df[partition_col].astype("category")
check_round_trip(
df_compat,
pa,
expected=expected_df,
path="s3://pandas-test/parquet_dir",
write_kwargs={
"partition_cols": partition_col,
"compression": None,
"filesystem": get_s3_fs(),
},
check_like=True,
repeat=1,
)

def test_partition_cols_supported(self, pa, df_full):
# GH #23283
partition_cols = ["bool", "int"]
Expand Down