Skip to content

Revert backport of #33632: Parquet & s3 I/O changes #34632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions pandas/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,33 +141,6 @@ def urlopen(*args, **kwargs):
return urllib.request.urlopen(*args, **kwargs)


def get_fs_for_path(filepath: str):
"""
Get appropriate filesystem given a filepath.
Supports s3fs, gcs and local file system.
Parameters
----------
filepath : str
File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj
Returns
-------
s3fs.S3FileSystem, gcsfs.GCSFileSystem, None
Appropriate FileSystem to use. None for local filesystem.
"""
if is_s3_url(filepath):
from pandas.io import s3

return s3.get_fs()
elif is_gcs_url(filepath):
from pandas.io import gcs

return gcs.get_fs()
else:
return None


def get_filepath_or_buffer(
filepath_or_buffer: FilePathOrBuffer,
encoding: Optional[str] = None,
Expand Down
6 changes: 1 addition & 5 deletions pandas/io/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@
)


def get_fs():
return gcsfs.GCSFileSystem()


def get_filepath_or_buffer(
filepath_or_buffer, encoding=None, compression=None, mode=None
):

if mode is None:
mode = "rb"

fs = get_fs()
fs = gcsfs.GCSFileSystem()
filepath_or_buffer = fs.open(filepath_or_buffer, mode)
return filepath_or_buffer, None, compression, True
31 changes: 14 additions & 17 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@

from pandas import DataFrame, get_option

from pandas.io.common import (
get_filepath_or_buffer,
get_fs_for_path,
is_gcs_url,
is_s3_url,
)
from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url


def get_engine(engine: str) -> "BaseImpl":
Expand Down Expand Up @@ -97,15 +92,13 @@ def write(
**kwargs,
):
self.validate_dataframe(df)
file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")
path, _, _, should_close = get_filepath_or_buffer(path, mode="wb")

from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)}
if index is not None:
from_pandas_kwargs["preserve_index"] = index

table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
# write_to_dataset does not support a file-like object when
# a dircetory path is used, so just pass the path string.
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table,
Expand All @@ -118,20 +111,24 @@ def write(
else:
self.api.parquet.write_table(
table,
file_obj_or_path,
path,
compression=compression,
coerce_timestamps=coerce_timestamps,
**kwargs,
)
if should_close:
file_obj_or_path.close()
path.close()

def read(self, path, columns=None, **kwargs):
parquet_ds = self.api.parquet.ParquetDataset(
path, filesystem=get_fs_for_path(path), **kwargs
)
kwargs["columns"] = columns
result = parquet_ds.read_pandas(**kwargs).to_pandas()
path, _, _, should_close = get_filepath_or_buffer(path)

kwargs["use_pandas_metadata"] = True
result = self.api.parquet.read_table(
path, columns=columns, **kwargs
).to_pandas()
if should_close:
path.close()

return result


Expand Down Expand Up @@ -286,7 +283,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs):
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``
``file://localhost/path/to/tables``
If you want to pass in a path object, pandas accepts any
``os.PathLike``.
Expand Down
8 changes: 2 additions & 6 deletions pandas/io/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ def _strip_schema(url):
return result.netloc + result.path


def get_fs():
return s3fs.S3FileSystem(anon=False)


def get_file_and_filesystem(
filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None
) -> Tuple[IO, Any]:
Expand All @@ -28,7 +24,7 @@ def get_file_and_filesystem(
if mode is None:
mode = "rb"

fs = get_fs()
fs = s3fs.S3FileSystem(anon=False)
try:
file = fs.open(_strip_schema(filepath_or_buffer), mode)
except (FileNotFoundError, NoCredentialsError):
Expand All @@ -38,7 +34,7 @@ def get_file_and_filesystem(
# aren't valid for that bucket.
# A NoCredentialsError is raised if you don't have creds
# for that bucket.
fs = get_fs()
fs = s3fs.S3FileSystem(anon=True)
file = fs.open(_strip_schema(filepath_or_buffer), mode)
return file, fs

Expand Down
40 changes: 7 additions & 33 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" test parquet compat """
import datetime
from distutils.version import LooseVersion
import locale
import os
from warnings import catch_warnings

Expand Down Expand Up @@ -129,7 +130,6 @@ def check_round_trip(
read_kwargs=None,
expected=None,
check_names=True,
check_like=False,
repeat=2,
):
"""Verify parquet serializer and deserializer produce the same results.
Expand All @@ -149,8 +149,6 @@ def check_round_trip(
Expected deserialization result, otherwise will be equal to `df`
check_names: list of str, optional
Closed set of column names to be compared
check_like: bool, optional
If True, ignore the order of index & columns.
repeat: int, optional
How many times to repeat the test
"""
Expand All @@ -171,9 +169,7 @@ def compare(repeat):
with catch_warnings(record=True):
actual = read_parquet(path, **read_kwargs)

tm.assert_frame_equal(
expected, actual, check_names=check_names, check_like=check_like
)
tm.assert_frame_equal(expected, actual, check_names=check_names)

if path is None:
with tm.ensure_clean() as path:
Expand Down Expand Up @@ -489,37 +485,15 @@ def test_categorical(self, pa):
expected = df.astype(object)
check_round_trip(df, pa, expected=expected)

# GH#33077 2020-03-27
@pytest.mark.xfail(
locale.getlocale()[0] in ["zh_CN", "it_IT"],
reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'",
)
def test_s3_roundtrip(self, df_compat, s3_resource, pa):
# GH #19134
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")

@td.skip_if_no("s3fs")
@pytest.mark.parametrize("partition_col", [["A"], []])
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
from pandas.io.s3 import get_fs as get_s3_fs

# GH #26388
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716
# As per pyarrow partitioned columns become 'categorical' dtypes
# and are added to back of dataframe on read

expected_df = df_compat.copy()
if partition_col:
expected_df[partition_col] = expected_df[partition_col].astype("category")
check_round_trip(
df_compat,
pa,
expected=expected_df,
path="s3://pandas-test/parquet_dir",
write_kwargs={
"partition_cols": partition_col,
"compression": None,
"filesystem": get_s3_fs(),
},
check_like=True,
repeat=1,
)

def test_partition_cols_supported(self, pa, df_full):
# GH #23283
partition_cols = ["bool", "int"]
Expand Down