From ab263040ca83524b3646407b34843a8dd19a0442 Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Fri, 10 Nov 2017 16:41:23 +0100 Subject: [PATCH 1/8] Pass kwargs from read_parquet() to the underlying engines. This allows e.g. to specify filters for predicate pushdown to fastparquet. --- doc/source/whatsnew/v0.21.1.txt | 1 + pandas/io/parquet.py | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/doc/source/whatsnew/v0.21.1.txt b/doc/source/whatsnew/v0.21.1.txt index 185f08514641f..e9e133fd21e28 100644 --- a/doc/source/whatsnew/v0.21.1.txt +++ b/doc/source/whatsnew/v0.21.1.txt @@ -85,6 +85,7 @@ I/O - Bug in :func:`read_csv` for handling null values in index columns when specifying ``na_filter=False`` (:issue:`5239`) - Bug in :meth:`DataFrame.to_csv` when the table had ``MultiIndex`` columns, and a list of strings was passed in for ``header`` (:issue:`5539`) - :func:`read_parquet` now allows to specify the columns to read from a parquet file (:issue:`18154`) +- :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`TODO`) Plotting ^^^^^^^^ diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index ef95e32cc241e..8ad4660589250 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -76,9 +76,9 @@ def write(self, df, path, compression='snappy', table, path, compression=compression, coerce_timestamps=coerce_timestamps, **kwargs) - def read(self, path, columns=None): + def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - return self.api.parquet.read_table(path, columns=columns).to_pandas() + return self.api.parquet.read_table(path, columns=columns, **kwargs).to_pandas() class FastParquetImpl(object): @@ -115,9 +115,9 @@ def write(self, df, path, compression='snappy', **kwargs): self.api.write(path, df, compression=compression, **kwargs) - def read(self, path, columns=None): + def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - return self.api.ParquetFile(path).to_pandas(columns=columns) + return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs) def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): @@ -205,4 +205,4 @@ def read_parquet(path, engine='auto', columns=None, **kwargs): """ impl = get_engine(engine) - return impl.read(path, columns=columns) + return impl.read(path, columns=columns, **kwargs) From 66e34fc6d68500deb85f9e71e73f24b9795f31e2 Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Fri, 10 Nov 2017 16:49:58 +0100 Subject: [PATCH 2/8] Add PR number to whatsnew file. --- doc/source/whatsnew/v0.21.1.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/whatsnew/v0.21.1.txt b/doc/source/whatsnew/v0.21.1.txt index e9e133fd21e28..68ddce145403f 100644 --- a/doc/source/whatsnew/v0.21.1.txt +++ b/doc/source/whatsnew/v0.21.1.txt @@ -85,7 +85,7 @@ I/O - Bug in :func:`read_csv` for handling null values in index columns when specifying ``na_filter=False`` (:issue:`5239`) - Bug in :meth:`DataFrame.to_csv` when the table had ``MultiIndex`` columns, and a list of strings was passed in for ``header`` (:issue:`5539`) - :func:`read_parquet` now allows to specify the columns to read from a parquet file (:issue:`18154`) -- :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`TODO`) +- :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`18216`) Plotting ^^^^^^^^ From 248a334b14378e0e8c75fba3804a73378973b2c5 Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Fri, 10 Nov 2017 16:50:33 +0100 Subject: [PATCH 3/8] Fix PEP8 issue. --- pandas/io/parquet.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 8ad4660589250..2da70d3352309 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -78,7 +78,8 @@ def write(self, df, path, compression='snappy', def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - return self.api.parquet.read_table(path, columns=columns, **kwargs).to_pandas() + return self.api.parquet.read_table(path, columns=columns, + **kwargs).to_pandas() class FastParquetImpl(object): From 1c045a54fb1fc31a09d1d99b3ac42414dbd917b7 Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Mon, 13 Nov 2017 13:00:29 +0100 Subject: [PATCH 4/8] Fix wrong tests, which called read_parquet with a compression parameter (which only makes sense for writes). --- pandas/tests/io/test_parquet.py | 42 ++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 9a4edf38e2ef4..273d97287dca9 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -105,7 +105,7 @@ def test_options_py(df_compat, pa): with pd.option_context('io.parquet.engine', 'pyarrow'): df.to_parquet(path) - result = read_parquet(path, compression=None) + result = read_parquet(path) tm.assert_frame_equal(result, df) @@ -118,7 +118,7 @@ def test_options_fp(df_compat, fp): with pd.option_context('io.parquet.engine', 'fastparquet'): df.to_parquet(path, compression=None) - result = read_parquet(path, compression=None) + result = read_parquet(path) tm.assert_frame_equal(result, df) @@ -130,7 +130,7 @@ def test_options_auto(df_compat, fp, pa): with pd.option_context('io.parquet.engine', 'auto'): df.to_parquet(path) - result = read_parquet(path, compression=None) + result = read_parquet(path) tm.assert_frame_equal(result, df) @@ -162,7 +162,7 @@ def test_cross_engine_pa_fp(df_cross_compat, pa, fp): with tm.ensure_clean() as path: df.to_parquet(path, engine=pa, compression=None) - result = read_parquet(path, engine=fp, compression=None) + result = read_parquet(path, engine=fp) tm.assert_frame_equal(result, df) @@ -174,7 +174,7 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): with tm.ensure_clean() as path: df.to_parquet(path, engine=fp, compression=None) - result = read_parquet(path, engine=pa, compression=None) + result = read_parquet(path, engine=pa) tm.assert_frame_equal(result, df) @@ -188,10 +188,13 @@ def check_error_on_write(self, df, engine, exc): with tm.ensure_clean() as path: to_parquet(df, path, engine, compression=None) - def check_round_trip(self, df, engine, expected=None, **kwargs): - + def check_round_trip(self, df, engine, expected=None, + extra_write_kwargs=None, **kwargs): + write_kwargs = kwargs.copy() + if extra_write_kwargs is not None: + write_kwargs.update(extra_write_kwargs) with tm.ensure_clean() as path: - df.to_parquet(path, engine, **kwargs) + df.to_parquet(path, engine, **write_kwargs) result = read_parquet(path, engine, **kwargs) if expected is None: @@ -199,7 +202,7 @@ def check_round_trip(self, df, engine, expected=None, **kwargs): tm.assert_frame_equal(result, expected) # repeat - to_parquet(df, path, engine, **kwargs) + to_parquet(df, path, engine, **write_kwargs) result = pd.read_parquet(path, engine, **kwargs) if expected is None: @@ -222,7 +225,8 @@ def test_columns_dtypes(self, engine): # unicode df.columns = [u'foo', u'bar'] - self.check_round_trip(df, engine, compression=None) + self.check_round_trip(df, engine, + extra_write_kwargs={'compression': None}) def test_columns_dtypes_invalid(self, engine): @@ -246,7 +250,8 @@ def test_columns_dtypes_invalid(self, engine): def test_write_with_index(self, engine): df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, compression=None) + self.check_round_trip(df, engine, + extra_write_kwargs={'compression': None}) # non-default index for index in [[2, 3, 4], @@ -280,7 +285,8 @@ def test_compression(self, engine, compression): pytest.importorskip('brotli') df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, compression=compression) + self.check_round_trip(df, engine, + extra_write_kwargs={'compression': compression}) def test_read_columns(self, engine): # GH18154 @@ -289,7 +295,8 @@ def test_read_columns(self, engine): expected = pd.DataFrame({'string': list('abc')}) self.check_round_trip(df, engine, expected=expected, - compression=None, columns=["string"]) + extra_write_kwargs={'compression': None}, + columns=["string"]) class TestParquetPyArrow(Base): @@ -377,7 +384,7 @@ def test_basic(self, fp): 'timedelta': pd.timedelta_range('1 day', periods=3), }) - self.check_round_trip(df, fp, compression=None) + self.check_round_trip(df, fp, extra_write_kwargs={'compression': None}) @pytest.mark.skip(reason="not supported") def test_duplicate_columns(self, fp): @@ -390,7 +397,8 @@ def test_duplicate_columns(self, fp): def test_bool_with_none(self, fp): df = pd.DataFrame({'a': [True, None, False]}) expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') - self.check_round_trip(df, fp, expected=expected, compression=None) + self.check_round_trip(df, fp, expected=expected, + extra_write_kwargs={'compression': None}) def test_unsupported(self, fp): @@ -406,7 +414,7 @@ def test_categorical(self, fp): if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"): pytest.skip("CategoricalDtype not supported for older fp") df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) - self.check_round_trip(df, fp, compression=None) + self.check_round_trip(df, fp, extra_write_kwargs={'compression': None}) def test_datetime_tz(self, fp): # doesn't preserve tz @@ -416,4 +424,4 @@ def test_datetime_tz(self, fp): # warns on the coercion with catch_warnings(record=True): self.check_round_trip(df, fp, df.astype('datetime64[ns]'), - compression=None) + extra_write_kwargs={'compression': None}) From 61748d878b42d3edfc0fb65b52dc522b128f814b Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Mon, 13 Nov 2017 18:20:11 +0100 Subject: [PATCH 5/8] Actually pass kwargs from to_parquet to fastparquet. --- pandas/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 2da70d3352309..4a13d2c9db944 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -176,7 +176,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): if df.columns.inferred_type not in valid_types: raise ValueError("parquet must have string column names") - return impl.write(df, path, compression=compression) + return impl.write(df, path, compression=compression, **kwargs) def read_parquet(path, engine='auto', columns=None, **kwargs): From 243172d80097e52eb032aab10f3967dd8598db5e Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Mon, 13 Nov 2017 18:20:29 +0100 Subject: [PATCH 6/8] Test filtering the row groups for fastparquet. --- pandas/tests/io/test_parquet.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 273d97287dca9..7386c44eb94bf 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -425,3 +425,12 @@ def test_datetime_tz(self, fp): with catch_warnings(record=True): self.check_round_trip(df, fp, df.astype('datetime64[ns]'), extra_write_kwargs={'compression': None}) + + def test_filter_row_groups(self, fp): + d = {'a': list(range(0, 3))} + df = pd.DataFrame(d) + with tm.ensure_clean() as path: + df.to_parquet(path, fp, compression=None, + row_group_offsets=1) + result = read_parquet(path, fp, filters=[('a', '==', 0)]) + assert len(result) == 1 From 6187a7c0be9cb6082068289a3e94812313108556 Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Tue, 14 Nov 2017 14:19:17 +0100 Subject: [PATCH 7/8] Refactor check_round_trip test method. --- pandas/tests/io/test_parquet.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 7386c44eb94bf..96c5cd3c2b6e4 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -189,13 +189,14 @@ def check_error_on_write(self, df, engine, exc): to_parquet(df, path, engine, compression=None) def check_round_trip(self, df, engine, expected=None, - extra_write_kwargs=None, **kwargs): - write_kwargs = kwargs.copy() - if extra_write_kwargs is not None: - write_kwargs.update(extra_write_kwargs) + write_kwargs=None, read_kwargs=None): + if write_kwargs is None: + write_kwargs = {} + if read_kwargs is None: + read_kwargs = {} with tm.ensure_clean() as path: df.to_parquet(path, engine, **write_kwargs) - result = read_parquet(path, engine, **kwargs) + result = read_parquet(path, engine, **read_kwargs) if expected is None: expected = df @@ -203,7 +204,7 @@ def check_round_trip(self, df, engine, expected=None, # repeat to_parquet(df, path, engine, **write_kwargs) - result = pd.read_parquet(path, engine, **kwargs) + result = pd.read_parquet(path, engine, **read_kwargs) if expected is None: expected = df @@ -225,8 +226,7 @@ def test_columns_dtypes(self, engine): # unicode df.columns = [u'foo', u'bar'] - self.check_round_trip(df, engine, - extra_write_kwargs={'compression': None}) + self.check_round_trip(df, engine, write_kwargs={'compression': None}) def test_columns_dtypes_invalid(self, engine): @@ -250,8 +250,7 @@ def test_columns_dtypes_invalid(self, engine): def test_write_with_index(self, engine): df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, - extra_write_kwargs={'compression': None}) + self.check_round_trip(df, engine, write_kwargs={'compression': None}) # non-default index for index in [[2, 3, 4], @@ -286,7 +285,7 @@ def test_compression(self, engine, compression): df = pd.DataFrame({'A': [1, 2, 3]}) self.check_round_trip(df, engine, - extra_write_kwargs={'compression': compression}) + write_kwargs={'compression': compression}) def test_read_columns(self, engine): # GH18154 @@ -295,8 +294,8 @@ def test_read_columns(self, engine): expected = pd.DataFrame({'string': list('abc')}) self.check_round_trip(df, engine, expected=expected, - extra_write_kwargs={'compression': None}, - columns=["string"]) + write_kwargs={'compression': None}, + read_kwargs = {'columns': ['string']}) class TestParquetPyArrow(Base): @@ -384,7 +383,7 @@ def test_basic(self, fp): 'timedelta': pd.timedelta_range('1 day', periods=3), }) - self.check_round_trip(df, fp, extra_write_kwargs={'compression': None}) + self.check_round_trip(df, fp, write_kwargs={'compression': None}) @pytest.mark.skip(reason="not supported") def test_duplicate_columns(self, fp): @@ -398,7 +397,7 @@ def test_bool_with_none(self, fp): df = pd.DataFrame({'a': [True, None, False]}) expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') self.check_round_trip(df, fp, expected=expected, - extra_write_kwargs={'compression': None}) + write_kwargs={'compression': None}) def test_unsupported(self, fp): @@ -414,7 +413,7 @@ def test_categorical(self, fp): if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"): pytest.skip("CategoricalDtype not supported for older fp") df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) - self.check_round_trip(df, fp, extra_write_kwargs={'compression': None}) + self.check_round_trip(df, fp, write_kwargs={'compression': None}) def test_datetime_tz(self, fp): # doesn't preserve tz @@ -424,7 +423,7 @@ def test_datetime_tz(self, fp): # warns on the coercion with catch_warnings(record=True): self.check_round_trip(df, fp, df.astype('datetime64[ns]'), - extra_write_kwargs={'compression': None}) + write_kwargs={'compression': None}) def test_filter_row_groups(self, fp): d = {'a': list(range(0, 3))} From edbd937dc3e642fd497315bb86e929985a4e5bb5 Mon Sep 17 00:00:00 2001 From: Cornelius Riemenschneider Date: Tue, 14 Nov 2017 14:47:08 +0100 Subject: [PATCH 8/8] Fix linting errors. --- pandas/tests/io/test_parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 96c5cd3c2b6e4..e7bcff22371b7 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -295,7 +295,7 @@ def test_read_columns(self, engine): expected = pd.DataFrame({'string': list('abc')}) self.check_round_trip(df, engine, expected=expected, write_kwargs={'compression': None}, - read_kwargs = {'columns': ['string']}) + read_kwargs={'columns': ['string']}) class TestParquetPyArrow(Base): @@ -423,7 +423,7 @@ def test_datetime_tz(self, fp): # warns on the coercion with catch_warnings(record=True): self.check_round_trip(df, fp, df.astype('datetime64[ns]'), - write_kwargs={'compression': None}) + write_kwargs={'compression': None}) def test_filter_row_groups(self, fp): d = {'a': list(range(0, 3))}