From fdb3d7c8cfe8ebb5f7cdd61c0246b40d08d7977a Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 14:51:49 +0800 Subject: [PATCH 01/12] Pass filesystem to parquet.write_table and parquet.write_to_dataset --- pandas/io/parquet.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index de9a14c82b3cb..29d262712e2d4 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -111,12 +111,17 @@ def write( table, path, compression=compression, + filesystem=get_fs_for_path(path), partition_cols=partition_cols, **kwargs, ) else: self.api.parquet.write_table( - table, file_obj_or_path, compression=compression, **kwargs + table, + file_obj_or_path, + compression=compression, + filesystem=get_fs_for_path(path), + **kwargs ) if should_close: file_obj_or_path.close() From a39e9009d7dceb0cf05cef96d841f0d3890b43fa Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 15:26:34 +0800 Subject: [PATCH 02/12] Fixed style issues --- pandas/io/parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 29d262712e2d4..b37189eab6189 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -118,10 +118,10 @@ def write( else: self.api.parquet.write_table( table, - file_obj_or_path, + file_obj_or_path, compression=compression, filesystem=get_fs_for_path(path), - **kwargs + **kwargs, ) if should_close: file_obj_or_path.close() From b30aebb8784f6ae39ce5353087df3ba5cebaec26 Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 16:04:44 +0800 Subject: [PATCH 03/12] Simple refactor --- pandas/io/parquet.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index b37189eab6189..da8c28bcf4d8f 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -104,6 +104,8 @@ def write( from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) + + fs = get_fs_for_path(path) # write_to_dataset does not support a file-like object when # a directory path is used, so just pass the path string. if partition_cols is not None: @@ -111,7 +113,7 @@ def write( table, path, compression=compression, - filesystem=get_fs_for_path(path), + filesystem=fs, partition_cols=partition_cols, **kwargs, ) @@ -120,7 +122,7 @@ def write( table, file_obj_or_path, compression=compression, - filesystem=get_fs_for_path(path), + filesystem=fs, **kwargs, ) if should_close: From eacd613c29b1fc47f03e06a8a48ddf3864d2773d Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 16:09:07 +0800 Subject: [PATCH 04/12] Style fix --- pandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index da8c28bcf4d8f..2087ef19cf864 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -104,7 +104,7 @@ def write( from_pandas_kwargs["preserve_index"] = index table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - + fs = get_fs_for_path(path) # write_to_dataset does not support a file-like object when # a directory path is used, so just pass the path string. From 38a50fcefa1a22cf4728580160acedbb51ba063a Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 21:56:23 +0800 Subject: [PATCH 05/12] Handle user providing filesystem in kwargs --- pandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 2087ef19cf864..ed30352b9448c 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -105,7 +105,7 @@ def write( table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - fs = get_fs_for_path(path) + fs = kwargs.get('filesystem', get_fs_for_path(path)) # write_to_dataset does not support a file-like object when # a directory path is used, so just pass the path string. if partition_cols is not None: From 7bd1ef78628d97e4ba21734e6bd4a32c78ef0b0d Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 22:10:04 +0800 Subject: [PATCH 06/12] Update whatsnew --- doc/source/whatsnew/v1.1.0.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/whatsnew/v1.1.0.rst b/doc/source/whatsnew/v1.1.0.rst index f216418c3a8b0..4c8bf62490d82 100644 --- a/doc/source/whatsnew/v1.1.0.rst +++ b/doc/source/whatsnew/v1.1.0.rst @@ -991,6 +991,7 @@ I/O - Bug in :meth:`~SQLDatabase.execute` was raising a ``ProgrammingError`` for some DB-API drivers when the SQL statement contained the `%` character and no parameters were present (:issue:`34211`) - Bug in :meth:`~pandas.io.stata.StataReader` which resulted in categorical variables with difference dtypes when reading data using an iterator. (:issue:`31544`) - :meth:`HDFStore.keys` has now an optional `include` parameter that allows the retrieval of all native HDF5 table names (:issue:`29916`) +- `pandas.io.parquet.PyArrowImpl` now infers `filesystem` using the provided `path` if `filesystem` is not provided via `kwargs`. (:issue:`34841`) Plotting ^^^^^^^^ From 24004206365d545c066cd56f19478ee8c95df2cb Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 22:37:16 +0800 Subject: [PATCH 07/12] Remove filesystem from kwargs if it exists --- pandas/io/parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index ed30352b9448c..732413a0b26fb 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -105,7 +105,8 @@ def write( table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - fs = kwargs.get('filesystem', get_fs_for_path(path)) + fs = kwargs.pop('filesystem', get_fs_for_path(path)) + # write_to_dataset does not support a file-like object when # a directory path is used, so just pass the path string. if partition_cols is not None: @@ -122,7 +123,6 @@ def write( table, file_obj_or_path, compression=compression, - filesystem=fs, **kwargs, ) if should_close: From 5ce4ffd50a304ea3f210d46581969e9005c5c277 Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 22:51:09 +0800 Subject: [PATCH 08/12] Add tests without the filesystem in the kwargs --- pandas/tests/io/test_parquet.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index efd34c58d7d19..0f8b001a95a72 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -568,6 +568,26 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): repeat=1, ) + @td.skip_if_no("s3fs") + @pytest.mark.parametrize("partition_col", [["A"], []]) + def test_s3_roundtrip_for_dir_infer_fs(self, df_compat, s3_resource, pa, + partition_col): + expected_df = df_compat.copy() + if partition_col: + expected_df[partition_col] = expected_df[partition_col].astype("category") + check_round_trip( + df_compat, + pa, + expected=expected_df, + path="s3://pandas-test/parquet_dir", + write_kwargs={ + "partition_cols": partition_col, + "compression": None, + }, + check_like=True, + repeat=1, + ) + @tm.network @td.skip_if_no("pyarrow") def test_parquet_read_from_url(self, df_compat): From 649ce2bd49fd6d8ecd7b867bfa8f232253ec890d Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 22:54:41 +0800 Subject: [PATCH 09/12] Remove whitespace --- pandas/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 0f8b001a95a72..ccb5001fa061e 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -587,7 +587,7 @@ def test_s3_roundtrip_for_dir_infer_fs(self, df_compat, s3_resource, pa, check_like=True, repeat=1, ) - + @tm.network @td.skip_if_no("pyarrow") def test_parquet_read_from_url(self, df_compat): From ff444f49027f66da9e33fe83d7895b29579f8961 Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 23:02:17 +0800 Subject: [PATCH 10/12] Really remove the whitespace --- pandas/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index ccb5001fa061e..1c5ed9c8e9b2d 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -587,7 +587,7 @@ def test_s3_roundtrip_for_dir_infer_fs(self, df_compat, s3_resource, pa, check_like=True, repeat=1, ) - + @tm.network @td.skip_if_no("pyarrow") def test_parquet_read_from_url(self, df_compat): From 720b6c089730b18b5f566760bec42b18a7660fa1 Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 23:16:20 +0800 Subject: [PATCH 11/12] Apply black --- pandas/io/parquet.py | 7 ++----- pandas/tests/io/test_parquet.py | 10 ++++------ 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 732413a0b26fb..530bb6e326e94 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -105,7 +105,7 @@ def write( table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - fs = kwargs.pop('filesystem', get_fs_for_path(path)) + fs = kwargs.pop("filesystem", get_fs_for_path(path)) # write_to_dataset does not support a file-like object when # a directory path is used, so just pass the path string. @@ -120,10 +120,7 @@ def write( ) else: self.api.parquet.write_table( - table, - file_obj_or_path, - compression=compression, - **kwargs, + table, file_obj_or_path, compression=compression, **kwargs, ) if should_close: file_obj_or_path.close() diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 1c5ed9c8e9b2d..3eacb76b5131c 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -570,8 +570,9 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): @td.skip_if_no("s3fs") @pytest.mark.parametrize("partition_col", [["A"], []]) - def test_s3_roundtrip_for_dir_infer_fs(self, df_compat, s3_resource, pa, - partition_col): + def test_s3_roundtrip_for_dir_infer_fs( + self, df_compat, s3_resource, pa, partition_col + ): expected_df = df_compat.copy() if partition_col: expected_df[partition_col] = expected_df[partition_col].astype("category") @@ -580,10 +581,7 @@ def test_s3_roundtrip_for_dir_infer_fs(self, df_compat, s3_resource, pa, pa, expected=expected_df, path="s3://pandas-test/parquet_dir", - write_kwargs={ - "partition_cols": partition_col, - "compression": None, - }, + write_kwargs={"partition_cols": partition_col, "compression": None,}, check_like=True, repeat=1, ) From a918c6ec2adc4938e284d55b6dbd57d92eeeb619 Mon Sep 17 00:00:00 2001 From: Yuan Chuan Kee Date: Wed, 17 Jun 2020 23:27:29 +0800 Subject: [PATCH 12/12] Remove comma for PEP8 --- pandas/tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 3eacb76b5131c..b6ed8e8b3652a 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -581,7 +581,7 @@ def test_s3_roundtrip_for_dir_infer_fs( pa, expected=expected_df, path="s3://pandas-test/parquet_dir", - write_kwargs={"partition_cols": partition_col, "compression": None,}, + write_kwargs={"partition_cols": partition_col, "compression": None}, check_like=True, repeat=1, )