Skip to content

Commit 82c5ce2

Browse files
Backport PR #33645, #33632 and #34087 on branch 1.0.x (#34173)
Co-authored-by: alimcmaster1 <[email protected]>
1 parent 64f206f commit 82c5ce2

File tree

9 files changed

+134
-49
lines changed

9 files changed

+134
-49
lines changed

doc/source/whatsnew/v1.0.4.rst

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ Bug fixes
3131
~~~~~~~~~
3232
- Bug in :meth:`SeriesGroupBy.first`, :meth:`SeriesGroupBy.last`, :meth:`SeriesGroupBy.min`, and :meth:`SeriesGroupBy.max` returning floats when applied to nullable Booleans (:issue:`33071`)
3333
- Bug in :meth:`Rolling.min` and :meth:`Rolling.max`: Growing memory usage after multiple calls when using a fixed window (:issue:`30726`)
34+
- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`)
35+
- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`)
36+
- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`)
37+
- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`)
3438
-
3539

3640
Contributors

pandas/io/common.py

+27
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,33 @@ def urlopen(*args, **kwargs):
141141
return urllib.request.urlopen(*args, **kwargs)
142142

143143

144+
def get_fs_for_path(filepath: str):
145+
"""
146+
Get appropriate filesystem given a filepath.
147+
Supports s3fs, gcs and local file system.
148+
149+
Parameters
150+
----------
151+
filepath : str
152+
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj
153+
154+
Returns
155+
-------
156+
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
157+
Appropriate FileSystem to use. None for local filesystem.
158+
"""
159+
if is_s3_url(filepath):
160+
from pandas.io import s3
161+
162+
return s3.get_fs()
163+
elif is_gcs_url(filepath):
164+
from pandas.io import gcs
165+
166+
return gcs.get_fs()
167+
else:
168+
return None
169+
170+
144171
def get_filepath_or_buffer(
145172
filepath_or_buffer: FilePathOrBuffer,
146173
encoding: Optional[str] = None,

pandas/io/formats/csvs.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def __init__(
6262
# Extract compression mode as given, if dict
6363
compression, self.compression_args = get_compression_method(compression)
6464

65-
self.path_or_buf, _, _, _ = get_filepath_or_buffer(
65+
self.path_or_buf, _, _, self.should_close = get_filepath_or_buffer(
6666
path_or_buf, encoding=encoding, compression=compression, mode=mode
6767
)
6868
self.sep = sep
@@ -224,6 +224,8 @@ def save(self) -> None:
224224
f.close()
225225
for _fh in handles:
226226
_fh.close()
227+
elif self.should_close:
228+
f.close()
227229

228230
def _save_header(self):
229231
writer = self.writer

pandas/io/gcs.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@
66
)
77

88

9+
def get_fs():
10+
return gcsfs.GCSFileSystem()
11+
12+
913
def get_filepath_or_buffer(
1014
filepath_or_buffer, encoding=None, compression=None, mode=None
1115
):
1216

1317
if mode is None:
1418
mode = "rb"
1519

16-
fs = gcsfs.GCSFileSystem()
20+
fs = get_fs()
1721
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
1822
return filepath_or_buffer, None, compression, True

pandas/io/parquet.py

+18-13
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88

99
from pandas import DataFrame, get_option
1010

11-
from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
11+
from pandas.io.common import (
12+
get_filepath_or_buffer,
13+
get_fs_for_path,
14+
is_gcs_url,
15+
is_s3_url,
16+
)
1217

1318

1419
def get_engine(engine: str) -> "BaseImpl":
@@ -92,13 +97,15 @@ def write(
9297
**kwargs,
9398
):
9499
self.validate_dataframe(df)
95-
path, _, _, _ = get_filepath_or_buffer(path, mode="wb")
100+
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
96101

97102
from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
98103
if index is not None:
99104
from_pandas_kwargs["preserve_index"] = index
100105

101106
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
107+
# write_to_dataset does not support a file-like object when
108+
# a dircetory path is used, so just pass the path string.
102109
if partition_cols is not None:
103110
self.api.parquet.write_to_dataset(
104111
table,
@@ -111,22 +118,20 @@ def write(
111118
else:
112119
self.api.parquet.write_table(
113120
table,
114-
path,
121+
file_obj_or_path,
115122
compression=compression,
116123
coerce_timestamps=coerce_timestamps,
117124
**kwargs,
118125
)
119-
120-
def read(self, path, columns=None, **kwargs):
121-
path, _, _, should_close = get_filepath_or_buffer(path)
122-
123-
kwargs["use_pandas_metadata"] = True
124-
result = self.api.parquet.read_table(
125-
path, columns=columns, **kwargs
126-
).to_pandas()
127126
if should_close:
128-
path.close()
127+
file_obj_or_path.close()
129128

129+
def read(self, path, columns=None, **kwargs):
130+
parquet_ds = self.api.parquet.ParquetDataset(
131+
path, filesystem=get_fs_for_path(path), **kwargs
132+
)
133+
kwargs["columns"] = columns
134+
result = parquet_ds.read_pandas(**kwargs).to_pandas()
130135
return result
131136

132137

@@ -281,7 +286,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
281286
A file URL can also be a path to a directory that contains multiple
282287
partitioned parquet files. Both pyarrow and fastparquet support
283288
paths to directories as well as file URLs. A directory path could be:
284-
``file://localhost/path/to/tables``
289+
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
285290
286291
If you want to pass in a path object, pandas accepts any
287292
``os.PathLike``.

pandas/io/s3.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ def _strip_schema(url):
1616
return result.netloc + result.path
1717

1818

19+
def get_fs():
20+
return s3fs.S3FileSystem(anon=False)
21+
22+
1923
def get_file_and_filesystem(
2024
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
2125
) -> Tuple[IO, Any]:
@@ -24,7 +28,7 @@ def get_file_and_filesystem(
2428
if mode is None:
2529
mode = "rb"
2630

27-
fs = s3fs.S3FileSystem(anon=False)
31+
fs = get_fs()
2832
try:
2933
file = fs.open(_strip_schema(filepath_or_buffer), mode)
3034
except (FileNotFoundError, NoCredentialsError):
@@ -34,7 +38,7 @@ def get_file_and_filesystem(
3438
# aren't valid for that bucket.
3539
# A NoCredentialsError is raised if you don't have creds
3640
# for that bucket.
37-
fs = s3fs.S3FileSystem(anon=True)
41+
fs = get_fs()
3842
file = fs.open(_strip_schema(filepath_or_buffer), mode)
3943
return file, fs
4044

pandas/tests/io/parser/test_network.py

+29-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ def tips_df(datapath):
5454
@pytest.mark.usefixtures("s3_resource")
5555
@td.skip_if_not_us_locale()
5656
class TestS3:
57+
@td.skip_if_no("s3fs")
5758
def test_parse_public_s3_bucket(self, tips_df):
58-
pytest.importorskip("s3fs")
5959

6060
# more of an integration test due to the not-public contents portion
6161
# can probably mock this though.
@@ -159,7 +159,7 @@ def test_parse_public_s3_bucket_nrows_python(self, tips_df):
159159
assert not df.empty
160160
tm.assert_frame_equal(tips_df.iloc[:10], df)
161161

162-
def test_s3_fails(self):
162+
def test_read_s3_fails(self):
163163
with pytest.raises(IOError):
164164
read_csv("s3://nyqpug/asdf.csv")
165165

@@ -168,6 +168,33 @@ def test_s3_fails(self):
168168
with pytest.raises(IOError):
169169
read_csv("s3://cant_get_it/file.csv")
170170

171+
def test_write_s3_csv_fails(self, tips_df):
172+
# GH 32486
173+
# Attempting to write to an invalid S3 path should raise
174+
import botocore
175+
176+
# GH 34087
177+
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
178+
# Catch a ClientError since AWS Service Errors are defined dynamically
179+
error = (FileNotFoundError, botocore.exceptions.ClientError)
180+
181+
with pytest.raises(error, match="The specified bucket does not exist"):
182+
tips_df.to_csv("s3://an_s3_bucket_data_doesnt_exit/not_real.csv")
183+
184+
@td.skip_if_no("pyarrow")
185+
def test_write_s3_parquet_fails(self, tips_df):
186+
# GH 27679
187+
# Attempting to write to an invalid S3 path should raise
188+
import botocore
189+
190+
# GH 34087
191+
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
192+
# Catch a ClientError since AWS Service Errors are defined dynamically
193+
error = (FileNotFoundError, botocore.exceptions.ClientError)
194+
195+
with pytest.raises(error, match="The specified bucket does not exist"):
196+
tips_df.to_parquet("s3://an_s3_bucket_data_doesnt_exit/not_real.parquet")
197+
171198
def test_read_csv_handles_boto_s3_object(self, s3_resource, tips_file):
172199
# see gh-16135
173200

pandas/tests/io/test_gcs.py

+9-23
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,15 @@ def open(*args):
5656

5757
monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem)
5858
df1.to_csv("gs://test/test.csv", index=True)
59-
df2 = read_csv(StringIO(s.getvalue()), parse_dates=["dt"], index_col=0)
59+
60+
def mock_get_filepath_or_buffer(*args, **kwargs):
61+
return StringIO(df1.to_csv()), None, None, False
62+
63+
monkeypatch.setattr(
64+
"pandas.io.gcs.get_filepath_or_buffer", mock_get_filepath_or_buffer
65+
)
66+
67+
df2 = read_csv("gs://test/test.csv", parse_dates=["dt"], index_col=0)
6068

6169
tm.assert_frame_equal(df1, df2)
6270

@@ -86,28 +94,6 @@ def open(self, path, mode="r", *args):
8694
)
8795

8896

89-
@td.skip_if_no("gcsfs")
90-
def test_gcs_get_filepath_or_buffer(monkeypatch):
91-
df1 = DataFrame(
92-
{
93-
"int": [1, 3],
94-
"float": [2.0, np.nan],
95-
"str": ["t", "s"],
96-
"dt": date_range("2018-06-18", periods=2),
97-
}
98-
)
99-
100-
def mock_get_filepath_or_buffer(*args, **kwargs):
101-
return (StringIO(df1.to_csv(index=False)), None, None, False)
102-
103-
monkeypatch.setattr(
104-
"pandas.io.gcs.get_filepath_or_buffer", mock_get_filepath_or_buffer
105-
)
106-
df2 = read_csv("gs://test/test.csv", parse_dates=["dt"])
107-
108-
tm.assert_frame_equal(df1, df2)
109-
110-
11197
@td.skip_if_installed("gcsfs")
11298
def test_gcs_not_present_exception():
11399
with pytest.raises(ImportError) as e:

pandas/tests/io/test_parquet.py

+33-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
""" test parquet compat """
22
import datetime
33
from distutils.version import LooseVersion
4-
import locale
54
import os
65
from warnings import catch_warnings
76

@@ -130,6 +129,7 @@ def check_round_trip(
130129
read_kwargs=None,
131130
expected=None,
132131
check_names=True,
132+
check_like=False,
133133
repeat=2,
134134
):
135135
"""Verify parquet serializer and deserializer produce the same results.
@@ -149,6 +149,8 @@ def check_round_trip(
149149
Expected deserialization result, otherwise will be equal to `df`
150150
check_names: list of str, optional
151151
Closed set of column names to be compared
152+
check_like: bool, optional
153+
If True, ignore the order of index & columns.
152154
repeat: int, optional
153155
How many times to repeat the test
154156
"""
@@ -169,7 +171,9 @@ def compare(repeat):
169171
with catch_warnings(record=True):
170172
actual = read_parquet(path, **read_kwargs)
171173

172-
tm.assert_frame_equal(expected, actual, check_names=check_names)
174+
tm.assert_frame_equal(
175+
expected, actual, check_names=check_names, check_like=check_like
176+
)
173177

174178
if path is None:
175179
with tm.ensure_clean() as path:
@@ -485,15 +489,37 @@ def test_categorical(self, pa):
485489
expected = df.astype(object)
486490
check_round_trip(df, pa, expected=expected)
487491

488-
# GH#33077 2020-03-27
489-
@pytest.mark.xfail(
490-
locale.getlocale()[0] == "zh_CN",
491-
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
492-
)
493492
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
494493
# GH #19134
495494
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")
496495

496+
@td.skip_if_no("s3fs")
497+
@pytest.mark.parametrize("partition_col", [["A"], []])
498+
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
499+
from pandas.io.s3 import get_fs as get_s3_fs
500+
501+
# GH #26388
502+
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
503+
# As per pyarrow partitioned columns become 'categorical' dtypes
504+
# and are added to back of dataframe on read
505+
506+
expected_df = df_compat.copy()
507+
if partition_col:
508+
expected_df[partition_col] = expected_df[partition_col].astype("category")
509+
check_round_trip(
510+
df_compat,
511+
pa,
512+
expected=expected_df,
513+
path="s3://pandas-test/parquet_dir",
514+
write_kwargs={
515+
"partition_cols": partition_col,
516+
"compression": None,
517+
"filesystem": get_s3_fs(),
518+
},
519+
check_like=True,
520+
repeat=1,
521+
)
522+
497523
def test_partition_cols_supported(self, pa, df_full):
498524
# GH #23283
499525
partition_cols = ["bool", "int"]

0 commit comments

Comments
 (0)