From 6d85e9044bd093f23f05e6ac8de6559ac91ef5db Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sun, 7 Jun 2020 16:31:33 +0100 Subject: [PATCH 1/3] Revert backport of #33632 --- pandas/io/common.py | 27 ---------------------- pandas/io/gcs.py | 6 +---- pandas/io/parquet.py | 31 ++++++++++++------------- pandas/io/s3.py | 8 ++----- pandas/tests/io/test_parquet.py | 40 ++++++--------------------------- 5 files changed, 24 insertions(+), 88 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index eaf4bcf203796..9617965915aa5 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -141,33 +141,6 @@ def urlopen(*args, **kwargs): return urllib.request.urlopen(*args, **kwargs) -def get_fs_for_path(filepath: str): - """ - Get appropriate filesystem given a filepath. - Supports s3fs, gcs and local file system. - - Parameters - ---------- - filepath : str - File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj - - Returns - ------- - s3fs.S3FileSystem, gcsfs.GCSFileSystem, None - Appropriate FileSystem to use. None for local filesystem. - """ - if is_s3_url(filepath): - from pandas.io import s3 - - return s3.get_fs() - elif is_gcs_url(filepath): - from pandas.io import gcs - - return gcs.get_fs() - else: - return None - - def get_filepath_or_buffer( filepath_or_buffer: FilePathOrBuffer, encoding: Optional[str] = None, diff --git a/pandas/io/gcs.py b/pandas/io/gcs.py index d2d8fc2d2139f..1f5e0faedc6d2 100644 --- a/pandas/io/gcs.py +++ b/pandas/io/gcs.py @@ -6,10 +6,6 @@ ) -def get_fs(): - return gcsfs.GCSFileSystem() - - def get_filepath_or_buffer( filepath_or_buffer, encoding=None, compression=None, mode=None ): @@ -17,6 +13,6 @@ def get_filepath_or_buffer( if mode is None: mode = "rb" - fs = get_fs() + fs = gcsfs.GCSFileSystem() filepath_or_buffer = fs.open(filepath_or_buffer, mode) return filepath_or_buffer, None, compression, True diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 9c94c913e35cd..ff6e186947ebe 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -8,12 +8,7 @@ from pandas import DataFrame, get_option -from pandas.io.common import ( - get_filepath_or_buffer, - get_fs_for_path, - is_gcs_url, - is_s3_url, -) +from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url def get_engine(engine: str) -> "BaseImpl": @@ -97,15 +92,13 @@ def write( **kwargs, ): self.validate_dataframe(df) - file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") + path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} if index is not None: from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - # write_to_dataset does not support a file-like object when - # a dircetory path is used, so just pass the path string. if partition_cols is not None: self.api.parquet.write_to_dataset( table, @@ -118,20 +111,24 @@ def write( else: self.api.parquet.write_table( table, - file_obj_or_path, + path, compression=compression, coerce_timestamps=coerce_timestamps, **kwargs, ) if should_close: - file_obj_or_path.close() + path.close() def read(self, path, columns=None, **kwargs): - parquet_ds = self.api.parquet.ParquetDataset( - path, filesystem=get_fs_for_path(path), **kwargs - ) - kwargs["columns"] = columns - result = parquet_ds.read_pandas(**kwargs).to_pandas() + path, _, _, should_close = get_filepath_or_buffer(path) + + kwargs["use_pandas_metadata"] = True + result = self.api.parquet.read_table( + path, columns=columns, **kwargs + ).to_pandas() + if should_close: + path.close() + return result @@ -286,7 +283,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs): A file URL can also be a path to a directory that contains multiple partitioned parquet files. Both pyarrow and fastparquet support paths to directories as well as file URLs. A directory path could be: - ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir`` + ``file://localhost/path/to/tables`` If you want to pass in a path object, pandas accepts any ``os.PathLike``. diff --git a/pandas/io/s3.py b/pandas/io/s3.py index 329c861d2386a..976c319f89d47 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -16,10 +16,6 @@ def _strip_schema(url): return result.netloc + result.path -def get_fs(): - return s3fs.S3FileSystem(anon=False) - - def get_file_and_filesystem( filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None ) -> Tuple[IO, Any]: @@ -28,7 +24,7 @@ def get_file_and_filesystem( if mode is None: mode = "rb" - fs = get_fs() + fs = s3fs.S3FileSystem(anon=False) try: file = fs.open(_strip_schema(filepath_or_buffer), mode) except (FileNotFoundError, NoCredentialsError): @@ -38,7 +34,7 @@ def get_file_and_filesystem( # aren't valid for that bucket. # A NoCredentialsError is raised if you don't have creds # for that bucket. - fs = get_fs() + fs = s3fs.S3FileSystem(anon=True) file = fs.open(_strip_schema(filepath_or_buffer), mode) return file, fs diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index d1bdf1209a737..f8a6aba1b387c 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1,6 +1,7 @@ """ test parquet compat """ import datetime from distutils.version import LooseVersion +import locale import os from warnings import catch_warnings @@ -129,7 +130,6 @@ def check_round_trip( read_kwargs=None, expected=None, check_names=True, - check_like=False, repeat=2, ): """Verify parquet serializer and deserializer produce the same results. @@ -149,8 +149,6 @@ def check_round_trip( Expected deserialization result, otherwise will be equal to `df` check_names: list of str, optional Closed set of column names to be compared - check_like: bool, optional - If True, ignore the order of index & columns. repeat: int, optional How many times to repeat the test """ @@ -171,9 +169,7 @@ def compare(repeat): with catch_warnings(record=True): actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal( - expected, actual, check_names=check_names, check_like=check_like - ) + tm.assert_frame_equal(expected, actual, check_names=check_names) if path is None: with tm.ensure_clean() as path: @@ -489,37 +485,15 @@ def test_categorical(self, pa): expected = df.astype(object) check_round_trip(df, pa, expected=expected) + # GH#33077 2020-03-27 + @pytest.mark.xfail( + locale.getlocale()[0] == "zh_CN", + reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'", + ) def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") - @td.skip_if_no("s3fs") - @pytest.mark.parametrize("partition_col", [["A"], []]) - def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): - from pandas.io.s3 import get_fs as get_s3_fs - - # GH #26388 - # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 - # As per pyarrow partitioned columns become 'categorical' dtypes - # and are added to back of dataframe on read - - expected_df = df_compat.copy() - if partition_col: - expected_df[partition_col] = expected_df[partition_col].astype("category") - check_round_trip( - df_compat, - pa, - expected=expected_df, - path="s3://pandas-test/parquet_dir", - write_kwargs={ - "partition_cols": partition_col, - "compression": None, - "filesystem": get_s3_fs(), - }, - check_like=True, - repeat=1, - ) - def test_partition_cols_supported(self, pa, df_full): # GH #23283 partition_cols = ["bool", "int"] From 83f30e4d9e589cdd72e5e8a3bc0201421d348b04 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Mon, 8 Jun 2020 23:57:10 +0100 Subject: [PATCH 2/3] Update test_parquet.py Also skip for it_IT locale --- pandas/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index f8a6aba1b387c..85a580bab8e5e 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -487,7 +487,7 @@ def test_categorical(self, pa): # GH#33077 2020-03-27 @pytest.mark.xfail( - locale.getlocale()[0] == "zh_CN", + locale.getlocale()[0] in ["zh_CN", "it_IT"] reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'", ) def test_s3_roundtrip(self, df_compat, s3_resource, pa): From 2e6f56a2bff9440798f5a473e1a850ac6cc2197e Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Tue, 9 Jun 2020 00:33:19 +0100 Subject: [PATCH 3/3] Update test_parquet.py --- pandas/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 85a580bab8e5e..70a05b93c9cc3 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -487,7 +487,7 @@ def test_categorical(self, pa): # GH#33077 2020-03-27 @pytest.mark.xfail( - locale.getlocale()[0] in ["zh_CN", "it_IT"] + locale.getlocale()[0] in ["zh_CN", "it_IT"], reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'", ) def test_s3_roundtrip(self, df_compat, s3_resource, pa):