Skip to content

Commit 22cf0f5

Browse files
authored
IO: Fix parquet read from s3 directory (#33632)
1 parent 64d544c commit 22cf0f5

File tree

6 files changed

+92
-24
lines changed

6 files changed

+92
-24
lines changed

doc/source/whatsnew/v1.1.0.rst

+2
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,8 @@ I/O
626626
- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`)
627627
- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`)
628628
- Bug in :meth:`~DataFrame.read_feather` was raising an `ArrowIOError` when reading an s3 or http file path (:issue:`29055`)
629+
- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`)
630+
- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`)
629631

630632
Plotting
631633
^^^^^^^^

pandas/io/common.py

+27
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,33 @@ def urlopen(*args, **kwargs):
150150
return urllib.request.urlopen(*args, **kwargs)
151151

152152

153+
def get_fs_for_path(filepath: str):
154+
"""
155+
Get appropriate filesystem given a filepath.
156+
Supports s3fs, gcs and local file system.
157+
158+
Parameters
159+
----------
160+
filepath : str
161+
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj
162+
163+
Returns
164+
-------
165+
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
166+
Appropriate FileSystem to use. None for local filesystem.
167+
"""
168+
if is_s3_url(filepath):
169+
from pandas.io import s3
170+
171+
return s3.get_fs()
172+
elif is_gcs_url(filepath):
173+
from pandas.io import gcs
174+
175+
return gcs.get_fs()
176+
else:
177+
return None
178+
179+
153180
def get_filepath_or_buffer(
154181
filepath_or_buffer: FilePathOrBuffer,
155182
encoding: Optional[str] = None,

pandas/io/gcs.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
)
77

88

9+
def get_fs():
10+
return gcsfs.GCSFileSystem()
11+
12+
913
def get_filepath_or_buffer(
1014
filepath_or_buffer, encoding=None, compression=None, mode=None
1115
):
1216

1317
if mode is None:
1418
mode = "rb"
1519

16-
fs = gcsfs.GCSFileSystem()
20+
fs = get_fs()
1721
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
1822
return filepath_or_buffer, None, compression, True

pandas/io/parquet.py

+19-14
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88

99
from pandas import DataFrame, get_option
1010

11-
from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
11+
from pandas.io.common import (
12+
get_filepath_or_buffer,
13+
get_fs_for_path,
14+
is_gcs_url,
15+
is_s3_url,
16+
)
1217

1318

1419
def get_engine(engine: str) -> "BaseImpl":
@@ -92,13 +97,15 @@ def write(
9297
**kwargs,
9398
):
9499
self.validate_dataframe(df)
95-
path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
100+
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
96101

97102
from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
98103
if index is not None:
99104
from_pandas_kwargs["preserve_index"] = index
100105

101106
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
107+
# write_to_dataset does not support a file-like object when
108+
# a dircetory path is used, so just pass the path string.
102109
if partition_cols is not None:
103110
self.api.parquet.write_to_dataset(
104111
table,
@@ -108,20 +115,18 @@ def write(
108115
**kwargs,
109116
)
110117
else:
111-
self.api.parquet.write_table(table, path, compression=compression, **kwargs)
118+
self.api.parquet.write_table(
119+
table, file_obj_or_path, compression=compression, **kwargs
120+
)
112121
if should_close:
113-
path.close()
122+
file_obj_or_path.close()
114123

115124
def read(self, path, columns=None, **kwargs):
116-
path, _, _, should_close = get_filepath_or_buffer(path)
117-
118-
kwargs["use_pandas_metadata"] = True
119-
result = self.api.parquet.read_table(
120-
path, columns=columns, **kwargs
121-
).to_pandas()
122-
if should_close:
123-
path.close()
124-
125+
parquet_ds = self.api.parquet.ParquetDataset(
126+
path, filesystem=get_fs_for_path(path), **kwargs
127+
)
128+
kwargs["columns"] = columns
129+
result = parquet_ds.read_pandas(**kwargs).to_pandas()
125130
return result
126131

127132

@@ -273,7 +278,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
273278
A file URL can also be a path to a directory that contains multiple
274279
partitioned parquet files. Both pyarrow and fastparquet support
275280
paths to directories as well as file URLs. A directory path could be:
276-
``file://localhost/path/to/tables``
281+
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
277282
278283
If you want to pass in a path object, pandas accepts any
279284
``os.PathLike``.

pandas/io/s3.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ def _strip_schema(url):
1616
return result.netloc + result.path
1717

1818

19+
def get_fs():
20+
return s3fs.S3FileSystem(anon=False)
21+
22+
1923
def get_file_and_filesystem(
2024
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
2125
) -> Tuple[IO, Any]:
@@ -24,7 +28,7 @@ def get_file_and_filesystem(
2428
if mode is None:
2529
mode = "rb"
2630

27-
fs = s3fs.S3FileSystem(anon=False)
31+
fs = get_fs()
2832
try:
2933
file = fs.open(_strip_schema(filepath_or_buffer), mode)
3034
except (FileNotFoundError, NoCredentialsError):
@@ -34,7 +38,7 @@ def get_file_and_filesystem(
3438
# aren't valid for that bucket.
3539
# A NoCredentialsError is raised if you don't have creds
3640
# for that bucket.
37-
fs = s3fs.S3FileSystem(anon=True)
41+
fs = get_fs()
3842
file = fs.open(_strip_schema(filepath_or_buffer), mode)
3943
return file, fs
4044

pandas/tests/io/test_parquet.py

+33-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
""" test parquet compat """
22
import datetime
33
from distutils.version import LooseVersion
4-
import locale
54
import os
65
from warnings import catch_warnings
76

@@ -131,6 +130,7 @@ def check_round_trip(
131130
read_kwargs=None,
132131
expected=None,
133132
check_names=True,
133+
check_like=False,
134134
repeat=2,
135135
):
136136
"""Verify parquet serializer and deserializer produce the same results.
@@ -150,6 +150,8 @@ def check_round_trip(
150150
Expected deserialization result, otherwise will be equal to `df`
151151
check_names: list of str, optional
152152
Closed set of column names to be compared
153+
check_like: bool, optional
154+
If True, ignore the order of index & columns.
153155
repeat: int, optional
154156
How many times to repeat the test
155157
"""
@@ -169,7 +171,9 @@ def compare(repeat):
169171
with catch_warnings(record=True):
170172
actual = read_parquet(path, **read_kwargs)
171173

172-
tm.assert_frame_equal(expected, actual, check_names=check_names)
174+
tm.assert_frame_equal(
175+
expected, actual, check_names=check_names, check_like=check_like
176+
)
173177

174178
if path is None:
175179
with tm.ensure_clean() as path:
@@ -532,15 +536,37 @@ def test_categorical(self, pa):
532536
expected = df.astype(object)
533537
check_round_trip(df, pa, expected=expected)
534538

535-
# GH#33077 2020-03-27
536-
@pytest.mark.xfail(
537-
locale.getlocale()[0] == "zh_CN",
538-
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
539-
)
540539
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
541540
# GH #19134
542541
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")
543542

543+
@td.skip_if_no("s3fs")
544+
@pytest.mark.parametrize("partition_col", [["A"], []])
545+
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
546+
from pandas.io.s3 import get_fs as get_s3_fs
547+
548+
# GH #26388
549+
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
550+
# As per pyarrow partitioned columns become 'categorical' dtypes
551+
# and are added to back of dataframe on read
552+
553+
expected_df = df_compat.copy()
554+
if partition_col:
555+
expected_df[partition_col] = expected_df[partition_col].astype("category")
556+
check_round_trip(
557+
df_compat,
558+
pa,
559+
expected=expected_df,
560+
path="s3://pandas-test/parquet_dir",
561+
write_kwargs={
562+
"partition_cols": partition_col,
563+
"compression": None,
564+
"filesystem": get_s3_fs(),
565+
},
566+
check_like=True,
567+
repeat=1,
568+
)
569+
544570
def test_partition_cols_supported(self, pa, df_full):
545571
# GH #23283
546572
partition_cols = ["bool", "int"]

0 commit comments

Comments
 (0)