Skip to content

Commit e39390e

Browse files
alimcmaster1simonjayhawkins
authored andcommitted
IO: Fix parquet read from s3 directory (pandas-dev#33632)
1 parent 965981a commit e39390e

File tree

6 files changed

+90
-24
lines changed

6 files changed

+90
-24
lines changed

doc/source/whatsnew/v1.0.4.rst

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ Bug fixes
3333
- Bug in :meth:`Rolling.min` and :meth:`Rolling.max`: Growing memory usage after multiple calls when using a fixed window (:issue:`30726`)
3434
- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`)
3535
- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`)
36+
- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`)
37+
- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`)
3638
-
3739

3840
Contributors

pandas/io/common.py

+27
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,33 @@ def urlopen(*args, **kwargs):
141141
return urllib.request.urlopen(*args, **kwargs)
142142

143143

144+
def get_fs_for_path(filepath: str):
145+
"""
146+
Get appropriate filesystem given a filepath.
147+
Supports s3fs, gcs and local file system.
148+
149+
Parameters
150+
----------
151+
filepath : str
152+
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj
153+
154+
Returns
155+
-------
156+
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
157+
Appropriate FileSystem to use. None for local filesystem.
158+
"""
159+
if is_s3_url(filepath):
160+
from pandas.io import s3
161+
162+
return s3.get_fs()
163+
elif is_gcs_url(filepath):
164+
from pandas.io import gcs
165+
166+
return gcs.get_fs()
167+
else:
168+
return None
169+
170+
144171
def get_filepath_or_buffer(
145172
filepath_or_buffer: FilePathOrBuffer,
146173
encoding: Optional[str] = None,

pandas/io/gcs.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
)
77

88

9+
def get_fs():
10+
return gcsfs.GCSFileSystem()
11+
12+
913
def get_filepath_or_buffer(
1014
filepath_or_buffer, encoding=None, compression=None, mode=None
1115
):
1216

1317
if mode is None:
1418
mode = "rb"
1519

16-
fs = gcsfs.GCSFileSystem()
20+
fs = get_fs()
1721
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
1822
return filepath_or_buffer, None, compression, True

pandas/io/parquet.py

+17-14
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88

99
from pandas import DataFrame, get_option
1010

11-
from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
11+
from pandas.io.common import (
12+
get_filepath_or_buffer,
13+
get_fs_for_path,
14+
is_gcs_url,
15+
is_s3_url,
16+
)
1217

1318

1419
def get_engine(engine: str) -> "BaseImpl":
@@ -92,13 +97,15 @@ def write(
9297
**kwargs,
9398
):
9499
self.validate_dataframe(df)
95-
path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
100+
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
96101

97102
from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
98103
if index is not None:
99104
from_pandas_kwargs["preserve_index"] = index
100105

101106
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
107+
# write_to_dataset does not support a file-like object when
108+
# a dircetory path is used, so just pass the path string.
102109
if partition_cols is not None:
103110
self.api.parquet.write_to_dataset(
104111
table,
@@ -111,24 +118,20 @@ def write(
111118
else:
112119
self.api.parquet.write_table(
113120
table,
114-
path,
121+
file_obj_or_path,
115122
compression=compression,
116123
coerce_timestamps=coerce_timestamps,
117124
**kwargs,
118125
)
119126
if should_close:
120-
path.close()
127+
file_obj_or_path.close()
121128

122129
def read(self, path, columns=None, **kwargs):
123-
path, _, _, should_close = get_filepath_or_buffer(path)
124-
125-
kwargs["use_pandas_metadata"] = True
126-
result = self.api.parquet.read_table(
127-
path, columns=columns, **kwargs
128-
).to_pandas()
129-
if should_close:
130-
path.close()
131-
130+
parquet_ds = self.api.parquet.ParquetDataset(
131+
path, filesystem=get_fs_for_path(path), **kwargs
132+
)
133+
kwargs["columns"] = columns
134+
result = parquet_ds.read_pandas(**kwargs).to_pandas()
132135
return result
133136

134137

@@ -283,7 +286,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
283286
A file URL can also be a path to a directory that contains multiple
284287
partitioned parquet files. Both pyarrow and fastparquet support
285288
paths to directories as well as file URLs. A directory path could be:
286-
``file://localhost/path/to/tables``
289+
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
287290
288291
If you want to pass in a path object, pandas accepts any
289292
``os.PathLike``.

pandas/io/s3.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ def _strip_schema(url):
1616
return result.netloc + result.path
1717

1818

19+
def get_fs():
20+
return s3fs.S3FileSystem(anon=False)
21+
22+
1923
def get_file_and_filesystem(
2024
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
2125
) -> Tuple[IO, Any]:
@@ -24,7 +28,7 @@ def get_file_and_filesystem(
2428
if mode is None:
2529
mode = "rb"
2630

27-
fs = s3fs.S3FileSystem(anon=False)
31+
fs = get_fs()
2832
try:
2933
file = fs.open(_strip_schema(filepath_or_buffer), mode)
3034
except (FileNotFoundError, NoCredentialsError):
@@ -34,7 +38,7 @@ def get_file_and_filesystem(
3438
# aren't valid for that bucket.
3539
# A NoCredentialsError is raised if you don't have creds
3640
# for that bucket.
37-
fs = s3fs.S3FileSystem(anon=True)
41+
fs = get_fs()
3842
file = fs.open(_strip_schema(filepath_or_buffer), mode)
3943
return file, fs
4044

pandas/tests/io/test_parquet.py

+33-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
""" test parquet compat """
22
import datetime
33
from distutils.version import LooseVersion
4-
import locale
54
import os
65
from warnings import catch_warnings
76

@@ -130,6 +129,7 @@ def check_round_trip(
130129
read_kwargs=None,
131130
expected=None,
132131
check_names=True,
132+
check_like=False,
133133
repeat=2,
134134
):
135135
"""Verify parquet serializer and deserializer produce the same results.
@@ -149,6 +149,8 @@ def check_round_trip(
149149
Expected deserialization result, otherwise will be equal to `df`
150150
check_names: list of str, optional
151151
Closed set of column names to be compared
152+
check_like: bool, optional
153+
If True, ignore the order of index & columns.
152154
repeat: int, optional
153155
How many times to repeat the test
154156
"""
@@ -169,7 +171,9 @@ def compare(repeat):
169171
with catch_warnings(record=True):
170172
actual = read_parquet(path, **read_kwargs)
171173

172-
tm.assert_frame_equal(expected, actual, check_names=check_names)
174+
tm.assert_frame_equal(
175+
expected, actual, check_names=check_names, check_like=check_like
176+
)
173177

174178
if path is None:
175179
with tm.ensure_clean() as path:
@@ -485,15 +489,37 @@ def test_categorical(self, pa):
485489
expected = df.astype(object)
486490
check_round_trip(df, pa, expected=expected)
487491

488-
# GH#33077 2020-03-27
489-
@pytest.mark.xfail(
490-
locale.getlocale()[0] == "zh_CN",
491-
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
492-
)
493492
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
494493
# GH #19134
495494
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")
496495

496+
@td.skip_if_no("s3fs")
497+
@pytest.mark.parametrize("partition_col", [["A"], []])
498+
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
499+
from pandas.io.s3 import get_fs as get_s3_fs
500+
501+
# GH #26388
502+
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
503+
# As per pyarrow partitioned columns become 'categorical' dtypes
504+
# and are added to back of dataframe on read
505+
506+
expected_df = df_compat.copy()
507+
if partition_col:
508+
expected_df[partition_col] = expected_df[partition_col].astype("category")
509+
check_round_trip(
510+
df_compat,
511+
pa,
512+
expected=expected_df,
513+
path="s3://pandas-test/parquet_dir",
514+
write_kwargs={
515+
"partition_cols": partition_col,
516+
"compression": None,
517+
"filesystem": get_s3_fs(),
518+
},
519+
check_like=True,
520+
repeat=1,
521+
)
522+
497523
def test_partition_cols_supported(self, pa, df_full):
498524
# GH #23283
499525
partition_cols = ["bool", "int"]

0 commit comments

Comments
 (0)