Skip to content

Commit 81371a4

Browse files
authored
Revert backport of #33632: Parquet & s3 I/O changes (#34632)
Also skip for it_IT locale
1 parent 471abcc commit 81371a4

File tree

5 files changed

+24
-88
lines changed

5 files changed

+24
-88
lines changed

pandas/io/common.py

-27
Original file line numberDiff line numberDiff line change
@@ -141,33 +141,6 @@ def urlopen(*args, **kwargs):
141141
return urllib.request.urlopen(*args, **kwargs)
142142

143143

144-
def get_fs_for_path(filepath: str):
145-
"""
146-
Get appropriate filesystem given a filepath.
147-
Supports s3fs, gcs and local file system.
148-
149-
Parameters
150-
----------
151-
filepath : str
152-
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj
153-
154-
Returns
155-
-------
156-
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
157-
Appropriate FileSystem to use. None for local filesystem.
158-
"""
159-
if is_s3_url(filepath):
160-
from pandas.io import s3
161-
162-
return s3.get_fs()
163-
elif is_gcs_url(filepath):
164-
from pandas.io import gcs
165-
166-
return gcs.get_fs()
167-
else:
168-
return None
169-
170-
171144
def get_filepath_or_buffer(
172145
filepath_or_buffer: FilePathOrBuffer,
173146
encoding: Optional[str] = None,

pandas/io/gcs.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,13 @@
66
)
77

88

9-
def get_fs():
10-
return gcsfs.GCSFileSystem()
11-
12-
139
def get_filepath_or_buffer(
1410
filepath_or_buffer, encoding=None, compression=None, mode=None
1511
):
1612

1713
if mode is None:
1814
mode = "rb"
1915

20-
fs = get_fs()
16+
fs = gcsfs.GCSFileSystem()
2117
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
2218
return filepath_or_buffer, None, compression, True

pandas/io/parquet.py

+14-17
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,7 @@
88

99
from pandas import DataFrame, get_option
1010

11-
from pandas.io.common import (
12-
get_filepath_or_buffer,
13-
get_fs_for_path,
14-
is_gcs_url,
15-
is_s3_url,
16-
)
11+
from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url
1712

1813

1914
def get_engine(engine: str) -> "BaseImpl":
@@ -97,15 +92,13 @@ def write(
9792
**kwargs,
9893
):
9994
self.validate_dataframe(df)
100-
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
95+
path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
10196

10297
from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
10398
if index is not None:
10499
from_pandas_kwargs["preserve_index"] = index
105100

106101
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
107-
# write_to_dataset does not support a file-like object when
108-
# a dircetory path is used, so just pass the path string.
109102
if partition_cols is not None:
110103
self.api.parquet.write_to_dataset(
111104
table,
@@ -118,20 +111,24 @@ def write(
118111
else:
119112
self.api.parquet.write_table(
120113
table,
121-
file_obj_or_path,
114+
path,
122115
compression=compression,
123116
coerce_timestamps=coerce_timestamps,
124117
**kwargs,
125118
)
126119
if should_close:
127-
file_obj_or_path.close()
120+
path.close()
128121

129122
def read(self, path, columns=None, **kwargs):
130-
parquet_ds = self.api.parquet.ParquetDataset(
131-
path, filesystem=get_fs_for_path(path), **kwargs
132-
)
133-
kwargs["columns"] = columns
134-
result = parquet_ds.read_pandas(**kwargs).to_pandas()
123+
path, _, _, should_close = get_filepath_or_buffer(path)
124+
125+
kwargs["use_pandas_metadata"] = True
126+
result = self.api.parquet.read_table(
127+
path, columns=columns, **kwargs
128+
).to_pandas()
129+
if should_close:
130+
path.close()
131+
135132
return result
136133

137134

@@ -286,7 +283,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
286283
A file URL can also be a path to a directory that contains multiple
287284
partitioned parquet files. Both pyarrow and fastparquet support
288285
paths to directories as well as file URLs. A directory path could be:
289-
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
286+
``file://localhost/path/to/tables``
290287
291288
If you want to pass in a path object, pandas accepts any
292289
``os.PathLike``.

pandas/io/s3.py

+2-6
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ def _strip_schema(url):
1616
return result.netloc + result.path
1717

1818

19-
def get_fs():
20-
return s3fs.S3FileSystem(anon=False)
21-
22-
2319
def get_file_and_filesystem(
2420
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
2521
) -> Tuple[IO, Any]:
@@ -28,7 +24,7 @@ def get_file_and_filesystem(
2824
if mode is None:
2925
mode = "rb"
3026

31-
fs = get_fs()
27+
fs = s3fs.S3FileSystem(anon=False)
3228
try:
3329
file = fs.open(_strip_schema(filepath_or_buffer), mode)
3430
except (FileNotFoundError, NoCredentialsError):
@@ -38,7 +34,7 @@ def get_file_and_filesystem(
3834
# aren't valid for that bucket.
3935
# A NoCredentialsError is raised if you don't have creds
4036
# for that bucket.
41-
fs = get_fs()
37+
fs = s3fs.S3FileSystem(anon=True)
4238
file = fs.open(_strip_schema(filepath_or_buffer), mode)
4339
return file, fs
4440

pandas/tests/io/test_parquet.py

+7-33
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
""" test parquet compat """
22
import datetime
33
from distutils.version import LooseVersion
4+
import locale
45
import os
56
from warnings import catch_warnings
67

@@ -129,7 +130,6 @@ def check_round_trip(
129130
read_kwargs=None,
130131
expected=None,
131132
check_names=True,
132-
check_like=False,
133133
repeat=2,
134134
):
135135
"""Verify parquet serializer and deserializer produce the same results.
@@ -149,8 +149,6 @@ def check_round_trip(
149149
Expected deserialization result, otherwise will be equal to `df`
150150
check_names: list of str, optional
151151
Closed set of column names to be compared
152-
check_like: bool, optional
153-
If True, ignore the order of index & columns.
154152
repeat: int, optional
155153
How many times to repeat the test
156154
"""
@@ -171,9 +169,7 @@ def compare(repeat):
171169
with catch_warnings(record=True):
172170
actual = read_parquet(path, **read_kwargs)
173171

174-
tm.assert_frame_equal(
175-
expected, actual, check_names=check_names, check_like=check_like
176-
)
172+
tm.assert_frame_equal(expected, actual, check_names=check_names)
177173

178174
if path is None:
179175
with tm.ensure_clean() as path:
@@ -489,37 +485,15 @@ def test_categorical(self, pa):
489485
expected = df.astype(object)
490486
check_round_trip(df, pa, expected=expected)
491487

488+
# GH#33077 2020-03-27
489+
@pytest.mark.xfail(
490+
locale.getlocale()[0] in ["zh_CN", "it_IT"],
491+
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
492+
)
492493
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
493494
# GH #19134
494495
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")
495496

496-
@td.skip_if_no("s3fs")
497-
@pytest.mark.parametrize("partition_col", [["A"], []])
498-
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
499-
from pandas.io.s3 import get_fs as get_s3_fs
500-
501-
# GH #26388
502-
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
503-
# As per pyarrow partitioned columns become 'categorical' dtypes
504-
# and are added to back of dataframe on read
505-
506-
expected_df = df_compat.copy()
507-
if partition_col:
508-
expected_df[partition_col] = expected_df[partition_col].astype("category")
509-
check_round_trip(
510-
df_compat,
511-
pa,
512-
expected=expected_df,
513-
path="s3://pandas-test/parquet_dir",
514-
write_kwargs={
515-
"partition_cols": partition_col,
516-
"compression": None,
517-
"filesystem": get_s3_fs(),
518-
},
519-
check_like=True,
520-
repeat=1,
521-
)
522-
523497
def test_partition_cols_supported(self, pa, df_full):
524498
# GH #23283
525499
partition_cols = ["bool", "int"]

0 commit comments

Comments
 (0)