From aa94fe70430e722b92128f1a23f7dbac93030cbb Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 01:49:06 +0100 Subject: [PATCH 01/23] parquet init --- pandas/io/common.py | 12 +++++++++++- pandas/io/gcs.py | 6 +++++- pandas/io/parquet.py | 15 ++++++++------- pandas/io/s3.py | 8 ++++++-- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index ff527de79c387..05412983f4e15 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -158,6 +158,16 @@ def urlopen(*args, **kwargs): return urllib.request.urlopen(*args, **kwargs) +def get_fs_for_path(filepath): + if is_s3_url(filepath): + from pandas.io import s3 + return s3.get_fs() + elif is_gcs_url(filepath): + from pandas.io import gcs + return gcs.get_fs() + else: + return None + def get_filepath_or_buffer( filepath_or_buffer: FilePathOrBuffer, encoding: Optional[str] = None, @@ -192,7 +202,7 @@ def get_filepath_or_buffer( compression = "gzip" reader = BytesIO(req.read()) req.close() - return reader, encoding, compression, True + return reader, encoding, compression, True, None if is_s3_url(filepath_or_buffer): from pandas.io import s3 diff --git a/pandas/io/gcs.py b/pandas/io/gcs.py index 1f5e0faedc6d2..d2d8fc2d2139f 100644 --- a/pandas/io/gcs.py +++ b/pandas/io/gcs.py @@ -6,6 +6,10 @@ ) +def get_fs(): + return gcsfs.GCSFileSystem() + + def get_filepath_or_buffer( filepath_or_buffer, encoding=None, compression=None, mode=None ): @@ -13,6 +17,6 @@ def get_filepath_or_buffer( if mode is None: mode = "rb" - fs = gcsfs.GCSFileSystem() + fs = get_fs() filepath_or_buffer = fs.open(filepath_or_buffer, mode) return filepath_or_buffer, None, compression, True diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 33747d2a6dd83..29676f36e31a8 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -8,7 +8,7 @@ from pandas import DataFrame, get_option -from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url +from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url, get_fs_for_path def get_engine(engine: str) -> "BaseImpl": @@ -111,14 +111,15 @@ def write( self.api.parquet.write_table(table, path, compression=compression, **kwargs) def read(self, path, columns=None, **kwargs): - path, _, _, should_close = get_filepath_or_buffer(path) - + filepath, _, _, should_close = get_filepath_or_buffer(path) + print("here") + print(path) + parquet_ds = self.api.parquet.ParquetDataset(path, filesystem=get_fs_for_path(path), **kwargs) kwargs["use_pandas_metadata"] = True - result = self.api.parquet.read_table( - path, columns=columns, **kwargs - ).to_pandas() + result = parquet_ds.read(columns=columns, **kwargs).to_pandas() + if should_close: - path.close() + filepath.close() return result diff --git a/pandas/io/s3.py b/pandas/io/s3.py index 976c319f89d47..329c861d2386a 100644 --- a/pandas/io/s3.py +++ b/pandas/io/s3.py @@ -16,6 +16,10 @@ def _strip_schema(url): return result.netloc + result.path +def get_fs(): + return s3fs.S3FileSystem(anon=False) + + def get_file_and_filesystem( filepath_or_buffer: FilePathOrBuffer, mode: Optional[str] = None ) -> Tuple[IO, Any]: @@ -24,7 +28,7 @@ def get_file_and_filesystem( if mode is None: mode = "rb" - fs = s3fs.S3FileSystem(anon=False) + fs = get_fs() try: file = fs.open(_strip_schema(filepath_or_buffer), mode) except (FileNotFoundError, NoCredentialsError): @@ -34,7 +38,7 @@ def get_file_and_filesystem( # aren't valid for that bucket. # A NoCredentialsError is raised if you don't have creds # for that bucket. - fs = s3fs.S3FileSystem(anon=True) + fs = get_fs() file = fs.open(_strip_schema(filepath_or_buffer), mode) return file, fs From a30c71a0c776e2180a560a1c5165dd0728848fc2 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 12:54:22 +0100 Subject: [PATCH 02/23] Doc Str --- pandas/io/common.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/io/common.py b/pandas/io/common.py index 05412983f4e15..a7e15a791228d 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -159,6 +159,7 @@ def urlopen(*args, **kwargs): def get_fs_for_path(filepath): + """Get appropriate filesystem given a filepath. Support s3fs, gcs and local disk fs""" if is_s3_url(filepath): from pandas.io import s3 return s3.get_fs() @@ -168,6 +169,7 @@ def get_fs_for_path(filepath): else: return None + def get_filepath_or_buffer( filepath_or_buffer: FilePathOrBuffer, encoding: Optional[str] = None, From b2747eba75640e0b5475723d5c1d119a7ea62eae Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 12:56:56 +0100 Subject: [PATCH 03/23] Simplify read --- pandas/io/parquet.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 29676f36e31a8..b9203c0426180 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -111,16 +111,8 @@ def write( self.api.parquet.write_table(table, path, compression=compression, **kwargs) def read(self, path, columns=None, **kwargs): - filepath, _, _, should_close = get_filepath_or_buffer(path) - print("here") - print(path) parquet_ds = self.api.parquet.ParquetDataset(path, filesystem=get_fs_for_path(path), **kwargs) - kwargs["use_pandas_metadata"] = True - result = parquet_ds.read(columns=columns, **kwargs).to_pandas() - - if should_close: - filepath.close() - + result = parquet_ds.read(columns=columns, use_pandas_metadata=True).to_pandas() return result From a51757ac9c57293f6e3d38408365eb7442ed34d7 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 13:43:23 +0100 Subject: [PATCH 04/23] Fix writer with partition --- pandas/io/parquet.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index b9203c0426180..a2175e1e66865 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -92,8 +92,7 @@ def write( **kwargs, ): self.validate_dataframe(df) - path, _, _, _ = get_filepath_or_buffer(path, mode="wb") - + file_obj, _, _, _ = get_filepath_or_buffer(path, mode="wb") from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} if index is not None: from_pandas_kwargs["preserve_index"] = index @@ -108,11 +107,12 @@ def write( **kwargs, ) else: - self.api.parquet.write_table(table, path, compression=compression, **kwargs) + self.api.parquet.write_table(table, file_obj, compression=compression, **kwargs) def read(self, path, columns=None, **kwargs): parquet_ds = self.api.parquet.ParquetDataset(path, filesystem=get_fs_for_path(path), **kwargs) - result = parquet_ds.read(columns=columns, use_pandas_metadata=True).to_pandas() + kwargs["columns"] = columns + result = parquet_ds.read_pandas(**kwargs).to_pandas() return result From 968f3b6b6a19f80a192f23c2b98904b628d2330d Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 13:45:05 +0100 Subject: [PATCH 05/23] Test case --- pandas/tests/io/test_parquet.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 94cf16c20e6c4..bec1025d08d03 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -20,6 +20,7 @@ read_parquet, to_parquet, ) +from pandas.io.s3 import get_fs as get_s3_fs try: import pyarrow # noqa @@ -130,7 +131,9 @@ def check_round_trip( write_kwargs=None, read_kwargs=None, expected=None, + check_dtype=True, check_names=True, + check_like=False, repeat=2, ): """Verify parquet serializer and deserializer produce the same results. @@ -169,7 +172,8 @@ def compare(repeat): with catch_warnings(record=True): actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names) + tm.assert_frame_equal(expected, actual, check_dtype=check_dtype, + check_names=check_names, check_like=check_like, check_categorical=False) if path is None: with tm.ensure_clean() as path: @@ -537,6 +541,13 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") + def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): + # GH #19134 + check_round_trip(df_compat, pa, path="s3://pandas-test/parquet_dir", + write_kwargs={"partition_cols": ["A"], + "compression": None, + "filesystem": get_s3_fs()}, check_like=True, check_dtype=False, repeat=1) + def test_partition_cols_supported(self, pa, df_full): # GH #23283 partition_cols = ["bool", "int"] From 789f4cab3be20d847fbb01b9dd59af2efca78b13 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 15:59:36 +0100 Subject: [PATCH 06/23] Clean up test case --- pandas/tests/io/test_parquet.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index bec1025d08d03..0968a60c8c525 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -131,7 +131,6 @@ def check_round_trip( write_kwargs=None, read_kwargs=None, expected=None, - check_dtype=True, check_names=True, check_like=False, repeat=2, @@ -153,6 +152,8 @@ def check_round_trip( Expected deserialization result, otherwise will be equal to `df` check_names: list of str, optional Closed set of column names to be compared + check_like: bool, optional + If True, ignore the order of index & columns. repeat: int, optional How many times to repeat the test """ @@ -172,8 +173,7 @@ def compare(repeat): with catch_warnings(record=True): actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_dtype=check_dtype, - check_names=check_names, check_like=check_like, check_categorical=False) + tm.assert_frame_equal(expected, actual, check_names=check_names, check_like=check_like) if path is None: with tm.ensure_clean() as path: @@ -542,11 +542,18 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): - # GH #19134 - check_round_trip(df_compat, pa, path="s3://pandas-test/parquet_dir", - write_kwargs={"partition_cols": ["A"], + # GH #26388 + # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 + # As per pyarrow partitioned columns become 'categorical' dtypes + # and are added to back of dataframe on read + + partition_col = "A" + expected_df = df_compat.copy() + expected_df[partition_col] = expected_df[partition_col].astype('category') + check_round_trip(df_compat, pa, expected=expected_df, path="s3://pandas-test/parquet_dir", + write_kwargs={"partition_cols": partition_col, "compression": None, - "filesystem": get_s3_fs()}, check_like=True, check_dtype=False, repeat=1) + "filesystem": get_s3_fs()}, check_like=True, repeat=1) def test_partition_cols_supported(self, pa, df_full): # GH #23283 From 040763edeeab03921060b946113f3c5cb0810901 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 16:55:13 +0100 Subject: [PATCH 07/23] Add whatsnew --- doc/source/whatsnew/v1.0.3.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/source/whatsnew/v1.0.3.rst b/doc/source/whatsnew/v1.0.3.rst index 26d06433bda0c..d4680354c2367 100644 --- a/doc/source/whatsnew/v1.0.3.rst +++ b/doc/source/whatsnew/v1.0.3.rst @@ -23,6 +23,9 @@ Fixed regressions Bug fixes ~~~~~~~~~ +**I/O** +- :func:`read_parquet` now support an s3 directory (:issue:`26388`) + Contributors ~~~~~~~~~~~~ From 40f58893508a6bb42e3d09237a257ceb321809e7 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 17:00:44 +0100 Subject: [PATCH 08/23] Clean ups --- doc/source/whatsnew/v1.0.3.rst | 2 +- pandas/io/common.py | 7 ++++++- pandas/io/parquet.py | 15 ++++++++++++--- pandas/tests/io/test_parquet.py | 23 +++++++++++++++++------ 4 files changed, 36 insertions(+), 11 deletions(-) diff --git a/doc/source/whatsnew/v1.0.3.rst b/doc/source/whatsnew/v1.0.3.rst index d4680354c2367..ecf14c4c0fa2c 100644 --- a/doc/source/whatsnew/v1.0.3.rst +++ b/doc/source/whatsnew/v1.0.3.rst @@ -24,7 +24,7 @@ Bug fixes ~~~~~~~~~ **I/O** -- :func:`read_parquet` now support an s3 directory (:issue:`26388`) +- :func:`read_parquet` now supports an s3 directory (:issue:`26388`) Contributors ~~~~~~~~~~~~ diff --git a/pandas/io/common.py b/pandas/io/common.py index a7e15a791228d..494a0545ccd81 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -159,12 +159,17 @@ def urlopen(*args, **kwargs): def get_fs_for_path(filepath): - """Get appropriate filesystem given a filepath. Support s3fs, gcs and local disk fs""" + """ + Get appropriate filesystem given a filepath. + Support s3fs, gcs and local disk fs + """ if is_s3_url(filepath): from pandas.io import s3 + return s3.get_fs() elif is_gcs_url(filepath): from pandas.io import gcs + return gcs.get_fs() else: return None diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index a2175e1e66865..e8bc6ce425201 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -8,7 +8,12 @@ from pandas import DataFrame, get_option -from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url, get_fs_for_path +from pandas.io.common import ( + get_filepath_or_buffer, + is_gcs_url, + is_s3_url, + get_fs_for_path, +) def get_engine(engine: str) -> "BaseImpl": @@ -107,10 +112,14 @@ def write( **kwargs, ) else: - self.api.parquet.write_table(table, file_obj, compression=compression, **kwargs) + self.api.parquet.write_table( + table, file_obj, compression=compression, **kwargs + ) def read(self, path, columns=None, **kwargs): - parquet_ds = self.api.parquet.ParquetDataset(path, filesystem=get_fs_for_path(path), **kwargs) + parquet_ds = self.api.parquet.ParquetDataset( + path, filesystem=get_fs_for_path(path), **kwargs + ) kwargs["columns"] = columns result = parquet_ds.read_pandas(**kwargs).to_pandas() return result diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 0968a60c8c525..4f92f02b36038 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -173,7 +173,9 @@ def compare(repeat): with catch_warnings(record=True): actual = read_parquet(path, **read_kwargs) - tm.assert_frame_equal(expected, actual, check_names=check_names, check_like=check_like) + tm.assert_frame_equal( + expected, actual, check_names=check_names, check_like=check_like + ) if path is None: with tm.ensure_clean() as path: @@ -549,11 +551,20 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): partition_col = "A" expected_df = df_compat.copy() - expected_df[partition_col] = expected_df[partition_col].astype('category') - check_round_trip(df_compat, pa, expected=expected_df, path="s3://pandas-test/parquet_dir", - write_kwargs={"partition_cols": partition_col, - "compression": None, - "filesystem": get_s3_fs()}, check_like=True, repeat=1) + expected_df[partition_col] = expected_df[partition_col].astype("category") + check_round_trip( + df_compat, + pa, + expected=expected_df, + path="s3://pandas-test/parquet_dir", + write_kwargs={ + "partition_cols": partition_col, + "compression": None, + "filesystem": get_s3_fs(), + }, + check_like=True, + repeat=1, + ) def test_partition_cols_supported(self, pa, df_full): # GH #23283 From 753d647aaad818211265fa12a374a4b8f8329e1e Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 17:03:44 +0100 Subject: [PATCH 09/23] Clean ups --- pandas/io/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index 494a0545ccd81..6f8a36aa2e0cd 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -209,7 +209,7 @@ def get_filepath_or_buffer( compression = "gzip" reader = BytesIO(req.read()) req.close() - return reader, encoding, compression, True, None + return reader, encoding, compression, True if is_s3_url(filepath_or_buffer): from pandas.io import s3 From e4dcdc3e9c70fd24084213ced5a49853b141c3cc Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 17:07:19 +0100 Subject: [PATCH 10/23] Update whatsnew --- doc/source/whatsnew/v1.0.3.rst | 3 --- doc/source/whatsnew/v1.1.0.rst | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/doc/source/whatsnew/v1.0.3.rst b/doc/source/whatsnew/v1.0.3.rst index ecf14c4c0fa2c..26d06433bda0c 100644 --- a/doc/source/whatsnew/v1.0.3.rst +++ b/doc/source/whatsnew/v1.0.3.rst @@ -23,9 +23,6 @@ Fixed regressions Bug fixes ~~~~~~~~~ -**I/O** -- :func:`read_parquet` now supports an s3 directory (:issue:`26388`) - Contributors ~~~~~~~~~~~~ diff --git a/doc/source/whatsnew/v1.1.0.rst b/doc/source/whatsnew/v1.1.0.rst index a797090a83444..a732516cc7592 100644 --- a/doc/source/whatsnew/v1.1.0.rst +++ b/doc/source/whatsnew/v1.1.0.rst @@ -580,6 +580,7 @@ I/O - Bug in :func:`pandas.io.json.json_normalize` where location specified by `record_path` doesn't point to an array. (:issue:`26284`) - :func:`pandas.read_hdf` has a more explicit error message when loading an unsupported HDF file (:issue:`9539`) +- :func:`read_parquet` now supports an s3 directory (:issue:`26388`) Plotting ^^^^^^^^ From bb21431c5cf9c8d95a15e608d127113149baab55 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 19:09:15 +0100 Subject: [PATCH 11/23] Add skip if no --- pandas/io/parquet.py | 2 +- pandas/tests/io/test_parquet.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index e8bc6ce425201..fcedce6710aa5 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -10,9 +10,9 @@ from pandas.io.common import ( get_filepath_or_buffer, + get_fs_for_path, is_gcs_url, is_s3_url, - get_fs_for_path, ) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 4f92f02b36038..3f3bd6f0d463e 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -543,6 +543,7 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") + @td.skip_if_no("s3fs") def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): # GH #26388 # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 From fb38932b17999f110f4508f1bb1091c3b8425bf9 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 19:57:45 +0100 Subject: [PATCH 12/23] Fix import --- pandas/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 3f3bd6f0d463e..c2405f7d84397 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -20,7 +20,6 @@ read_parquet, to_parquet, ) -from pandas.io.s3 import get_fs as get_s3_fs try: import pyarrow # noqa @@ -545,6 +544,7 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): @td.skip_if_no("s3fs") def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): + from pandas.io.s3 import get_fs as get_s3_fs # GH #26388 # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 # As per pyarrow partitioned columns become 'categorical' dtypes From c29befdd7655bcbd81193f20429a22e7b2ccc6b2 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 21:40:11 +0100 Subject: [PATCH 13/23] Removed fixed xfail --- pandas/tests/io/test_parquet.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index c2405f7d84397..7e5d39a6bec15 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -533,11 +533,6 @@ def test_categorical(self, pa): expected = df.astype(object) check_round_trip(df, pa, expected=expected) - # GH#33077 2020-03-27 - @pytest.mark.xfail( - locale.getlocale()[0] == "zh_CN", - reason="dateutil cannot parse e.g. '五, 27 3月 2020 21:45:38 GMT'", - ) def test_s3_roundtrip(self, df_compat, s3_resource, pa): # GH #19134 check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") @@ -545,6 +540,7 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): @td.skip_if_no("s3fs") def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): from pandas.io.s3 import get_fs as get_s3_fs + # GH #26388 # https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_parquet.py#L2716 # As per pyarrow partitioned columns become 'categorical' dtypes From 4f78fc552ace43960ee1f4228a82645b9b6c77d5 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 18 Apr 2020 22:39:30 +0100 Subject: [PATCH 14/23] remove import --- pandas/tests/io/test_parquet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 7e5d39a6bec15..ebc7deb235b30 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1,7 +1,6 @@ """ test parquet compat """ import datetime from distutils.version import LooseVersion -import locale import os from warnings import catch_warnings From 4b2828b8bff2be86672e0bc20199158a08e3e786 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Tue, 21 Apr 2020 21:29:21 +0100 Subject: [PATCH 15/23] Merge master --- pandas/tests/io/parser/test_network.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/pandas/tests/io/parser/test_network.py b/pandas/tests/io/parser/test_network.py index b7164477c31f2..b83691cb6fb56 100644 --- a/pandas/tests/io/parser/test_network.py +++ b/pandas/tests/io/parser/test_network.py @@ -54,8 +54,8 @@ def tips_df(datapath): @pytest.mark.usefixtures("s3_resource") @td.skip_if_not_us_locale() class TestS3: + @td.skip_if_no("s3fs") def test_parse_public_s3_bucket(self, tips_df): - pytest.importorskip("s3fs") # more of an integration test due to the not-public contents portion # can probably mock this though. @@ -159,7 +159,7 @@ def test_parse_public_s3_bucket_nrows_python(self, tips_df): assert not df.empty tm.assert_frame_equal(tips_df.iloc[:10], df) - def test_s3_fails(self): + def test_read_s3_fails(self): with pytest.raises(IOError): read_csv("s3://nyqpug/asdf.csv") @@ -168,6 +168,23 @@ def test_s3_fails(self): with pytest.raises(IOError): read_csv("s3://cant_get_it/file.csv") + def test_write_s3_csv_fails(self, tips_df): + # GH 32486 + # Attempting to write to an invalid S3 path should raise + with pytest.raises( + FileNotFoundError, match="The specified bucket does not exist" + ): + tips_df.to_csv("s3://an_s3_bucket_data_doesnt_exit/not_real.csv") + + @td.skip_if_no("pyarrow") + def test_write_s3_parquet_fails(self, tips_df): + # GH 27679 + # Attempting to write to an invalid S3 path should raise + with pytest.raises( + FileNotFoundError, match="The specified bucket does not exist" + ): + tips_df.to_parquet("s3://an_s3_bucket_data_doesnt_exit/not_real.parquet") + def test_read_csv_handles_boto_s3_object(self, s3_resource, tips_file): # see gh-16135 From dabfe58d913622cc70896649ffefff1f518b9b7f Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Tue, 21 Apr 2020 23:59:38 +0100 Subject: [PATCH 16/23] Add further test case --- pandas/io/parquet.py | 8 +++++--- pandas/tests/io/test_parquet.py | 7 ++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index e21c9ac2dac55..ae154757dd283 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -97,7 +97,7 @@ def write( **kwargs, ): self.validate_dataframe(df) - path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") + file_obj, _, _, should_close = get_filepath_or_buffer(path, mode="wb") from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} if index is not None: @@ -113,9 +113,11 @@ def write( **kwargs, ) else: - self.api.parquet.write_table(table, path, compression=compression, **kwargs) + self.api.parquet.write_table( + table, file_obj, compression=compression, **kwargs + ) if should_close: - path.close() + file_obj.close() def read(self, path, columns=None, **kwargs): parquet_ds = self.api.parquet.ParquetDataset( diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index ebc7deb235b30..1cebcb4e22c5c 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -537,7 +537,8 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") @td.skip_if_no("s3fs") - def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): + @pytest.mark.parametrize("partition_col", [["A"], []]) + def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): from pandas.io.s3 import get_fs as get_s3_fs # GH #26388 @@ -545,9 +546,9 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa): # As per pyarrow partitioned columns become 'categorical' dtypes # and are added to back of dataframe on read - partition_col = "A" expected_df = df_compat.copy() - expected_df[partition_col] = expected_df[partition_col].astype("category") + if partition_col: + expected_df[partition_col] = expected_df[partition_col].astype("category") check_round_trip( df_compat, pa, From dea95f3a96a04d6e27ec3c0e0ca3bd7cf8ee8422 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 25 Apr 2020 01:25:52 +0100 Subject: [PATCH 17/23] Update parquet.py Add clarifying comment --- pandas/io/parquet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index ae154757dd283..a2b19281bf0ba 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -104,6 +104,8 @@ def write( from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) + # write_to_dataset does not support a file-like object when + # a dircetory path is used, so just pass the path string. if partition_cols is not None: self.api.parquet.write_to_dataset( table, From ae76e42a277b743950f1d3fc6768ddfc10988ec6 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sat, 25 Apr 2020 01:26:59 +0100 Subject: [PATCH 18/23] Update parquet.py --- pandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index a2b19281bf0ba..0198b40408a0c 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -104,7 +104,7 @@ def write( from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - # write_to_dataset does not support a file-like object when + # write_to_dataset does not support a file-like object when # a dircetory path is used, so just pass the path string. if partition_cols is not None: self.api.parquet.write_to_dataset( From 4b4832625f42cdac458297dfc62bd0bace772f5d Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sun, 26 Apr 2020 12:52:23 +0100 Subject: [PATCH 19/23] Add whatsnew 2 --- doc/source/whatsnew/v1.1.0.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/whatsnew/v1.1.0.rst b/doc/source/whatsnew/v1.1.0.rst index 12de9df88c9de..e54e29c02c01f 100644 --- a/doc/source/whatsnew/v1.1.0.rst +++ b/doc/source/whatsnew/v1.1.0.rst @@ -586,6 +586,7 @@ I/O - Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`) - Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`) - :func:`read_parquet` now supports an s3 directory (:issue:`26388`) +- Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`) Plotting ^^^^^^^^ From 211c36e6703723c32d5e8c877b50ef0b58c2243b Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sun, 26 Apr 2020 13:09:13 +0100 Subject: [PATCH 20/23] Rename var --- pandas/io/parquet.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index ae154757dd283..770d484d9dff0 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -97,7 +97,7 @@ def write( **kwargs, ): self.validate_dataframe(df) - file_obj, _, _, should_close = get_filepath_or_buffer(path, mode="wb") + file_obj_or_path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} if index is not None: @@ -114,10 +114,10 @@ def write( ) else: self.api.parquet.write_table( - table, file_obj, compression=compression, **kwargs + table, file_obj_or_path, compression=compression, **kwargs ) if should_close: - file_obj.close() + file_obj_or_path.close() def read(self, path, columns=None, **kwargs): parquet_ds = self.api.parquet.ParquetDataset( From 4897a3270a087f8fd4d6f2d288eda1f37c91d6fb Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sun, 26 Apr 2020 13:19:11 +0100 Subject: [PATCH 21/23] Improve get_fs_for_path docstring --- pandas/io/common.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pandas/io/common.py b/pandas/io/common.py index 35e66f3f70886..8349acafca1e3 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -150,10 +150,20 @@ def urlopen(*args, **kwargs): return urllib.request.urlopen(*args, **kwargs) -def get_fs_for_path(filepath): +def get_fs_for_path(filepath: str): """ Get appropriate filesystem given a filepath. - Support s3fs, gcs and local disk fs + Supports s3fs, gcs and local file system. + + Parameters + ---------- + filepath : str + File path. e.g s3://bucket/object, /local/path, gcs://pandas/obj + + Returns + ------- + s3fs.S3FileSystem, gcsfs.GCSFileSystem, None + Appropriate FileSystem to use. None for local filesystem. """ if is_s3_url(filepath): from pandas.io import s3 From 5bc63272b430b5d9c8b73aa09bfd569372b1edae Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sun, 26 Apr 2020 21:53:08 +0100 Subject: [PATCH 22/23] Add doc example --- pandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index c149f57f3257d..0a9daea105b64 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -278,7 +278,7 @@ def read_parquet(path, engine: str = "auto", columns=None, **kwargs): A file URL can also be a path to a directory that contains multiple partitioned parquet files. Both pyarrow and fastparquet support paths to directories as well as file URLs. A directory path could be: - ``file://localhost/path/to/tables`` + ``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir`` If you want to pass in a path object, pandas accepts any ``os.PathLike``. From ca89c216bc23e79c98450908cf18c72694eb31e5 Mon Sep 17 00:00:00 2001 From: alimcmaster1 Date: Sun, 26 Apr 2020 21:53:46 +0100 Subject: [PATCH 23/23] Make whatsnew clearer --- doc/source/whatsnew/v1.1.0.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v1.1.0.rst b/doc/source/whatsnew/v1.1.0.rst index e54e29c02c01f..fc4c4c965774a 100644 --- a/doc/source/whatsnew/v1.1.0.rst +++ b/doc/source/whatsnew/v1.1.0.rst @@ -585,7 +585,7 @@ I/O unsupported HDF file (:issue:`9539`) - Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`) - Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`) -- :func:`read_parquet` now supports an s3 directory (:issue:`26388`) +- Bug in :meth:`read_parquet` was raising a ``FileNotFoundError`` when passed an s3 directory path. (:issue:`26388`) - Bug in :meth:`~DataFrame.to_parquet` was throwing an ``AttributeError`` when writing a partitioned parquet file to s3 (:issue:`27596`) Plotting