diff --git a/doc/source/whatsnew/v0.21.1.txt b/doc/source/whatsnew/v0.21.1.txt index 185f08514641f..68ddce145403f 100644 --- a/doc/source/whatsnew/v0.21.1.txt +++ b/doc/source/whatsnew/v0.21.1.txt @@ -85,6 +85,7 @@ I/O - Bug in :func:`read_csv` for handling null values in index columns when specifying ``na_filter=False`` (:issue:`5239`) - Bug in :meth:`DataFrame.to_csv` when the table had ``MultiIndex`` columns, and a list of strings was passed in for ``header`` (:issue:`5539`) - :func:`read_parquet` now allows to specify the columns to read from a parquet file (:issue:`18154`) +- :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`18216`) Plotting ^^^^^^^^ diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index ef95e32cc241e..4a13d2c9db944 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -76,9 +76,10 @@ def write(self, df, path, compression='snappy', table, path, compression=compression, coerce_timestamps=coerce_timestamps, **kwargs) - def read(self, path, columns=None): + def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - return self.api.parquet.read_table(path, columns=columns).to_pandas() + return self.api.parquet.read_table(path, columns=columns, + **kwargs).to_pandas() class FastParquetImpl(object): @@ -115,9 +116,9 @@ def write(self, df, path, compression='snappy', **kwargs): self.api.write(path, df, compression=compression, **kwargs) - def read(self, path, columns=None): + def read(self, path, columns=None, **kwargs): path, _, _ = get_filepath_or_buffer(path) - return self.api.ParquetFile(path).to_pandas(columns=columns) + return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs) def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): @@ -175,7 +176,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs): if df.columns.inferred_type not in valid_types: raise ValueError("parquet must have string column names") - return impl.write(df, path, compression=compression) + return impl.write(df, path, compression=compression, **kwargs) def read_parquet(path, engine='auto', columns=None, **kwargs): @@ -205,4 +206,4 @@ def read_parquet(path, engine='auto', columns=None, **kwargs): """ impl = get_engine(engine) - return impl.read(path, columns=columns) + return impl.read(path, columns=columns, **kwargs) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 9a4edf38e2ef4..e7bcff22371b7 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -105,7 +105,7 @@ def test_options_py(df_compat, pa): with pd.option_context('io.parquet.engine', 'pyarrow'): df.to_parquet(path) - result = read_parquet(path, compression=None) + result = read_parquet(path) tm.assert_frame_equal(result, df) @@ -118,7 +118,7 @@ def test_options_fp(df_compat, fp): with pd.option_context('io.parquet.engine', 'fastparquet'): df.to_parquet(path, compression=None) - result = read_parquet(path, compression=None) + result = read_parquet(path) tm.assert_frame_equal(result, df) @@ -130,7 +130,7 @@ def test_options_auto(df_compat, fp, pa): with pd.option_context('io.parquet.engine', 'auto'): df.to_parquet(path) - result = read_parquet(path, compression=None) + result = read_parquet(path) tm.assert_frame_equal(result, df) @@ -162,7 +162,7 @@ def test_cross_engine_pa_fp(df_cross_compat, pa, fp): with tm.ensure_clean() as path: df.to_parquet(path, engine=pa, compression=None) - result = read_parquet(path, engine=fp, compression=None) + result = read_parquet(path, engine=fp) tm.assert_frame_equal(result, df) @@ -174,7 +174,7 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): with tm.ensure_clean() as path: df.to_parquet(path, engine=fp, compression=None) - result = read_parquet(path, engine=pa, compression=None) + result = read_parquet(path, engine=pa) tm.assert_frame_equal(result, df) @@ -188,19 +188,23 @@ def check_error_on_write(self, df, engine, exc): with tm.ensure_clean() as path: to_parquet(df, path, engine, compression=None) - def check_round_trip(self, df, engine, expected=None, **kwargs): - + def check_round_trip(self, df, engine, expected=None, + write_kwargs=None, read_kwargs=None): + if write_kwargs is None: + write_kwargs = {} + if read_kwargs is None: + read_kwargs = {} with tm.ensure_clean() as path: - df.to_parquet(path, engine, **kwargs) - result = read_parquet(path, engine, **kwargs) + df.to_parquet(path, engine, **write_kwargs) + result = read_parquet(path, engine, **read_kwargs) if expected is None: expected = df tm.assert_frame_equal(result, expected) # repeat - to_parquet(df, path, engine, **kwargs) - result = pd.read_parquet(path, engine, **kwargs) + to_parquet(df, path, engine, **write_kwargs) + result = pd.read_parquet(path, engine, **read_kwargs) if expected is None: expected = df @@ -222,7 +226,7 @@ def test_columns_dtypes(self, engine): # unicode df.columns = [u'foo', u'bar'] - self.check_round_trip(df, engine, compression=None) + self.check_round_trip(df, engine, write_kwargs={'compression': None}) def test_columns_dtypes_invalid(self, engine): @@ -246,7 +250,7 @@ def test_columns_dtypes_invalid(self, engine): def test_write_with_index(self, engine): df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, compression=None) + self.check_round_trip(df, engine, write_kwargs={'compression': None}) # non-default index for index in [[2, 3, 4], @@ -280,7 +284,8 @@ def test_compression(self, engine, compression): pytest.importorskip('brotli') df = pd.DataFrame({'A': [1, 2, 3]}) - self.check_round_trip(df, engine, compression=compression) + self.check_round_trip(df, engine, + write_kwargs={'compression': compression}) def test_read_columns(self, engine): # GH18154 @@ -289,7 +294,8 @@ def test_read_columns(self, engine): expected = pd.DataFrame({'string': list('abc')}) self.check_round_trip(df, engine, expected=expected, - compression=None, columns=["string"]) + write_kwargs={'compression': None}, + read_kwargs={'columns': ['string']}) class TestParquetPyArrow(Base): @@ -377,7 +383,7 @@ def test_basic(self, fp): 'timedelta': pd.timedelta_range('1 day', periods=3), }) - self.check_round_trip(df, fp, compression=None) + self.check_round_trip(df, fp, write_kwargs={'compression': None}) @pytest.mark.skip(reason="not supported") def test_duplicate_columns(self, fp): @@ -390,7 +396,8 @@ def test_duplicate_columns(self, fp): def test_bool_with_none(self, fp): df = pd.DataFrame({'a': [True, None, False]}) expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') - self.check_round_trip(df, fp, expected=expected, compression=None) + self.check_round_trip(df, fp, expected=expected, + write_kwargs={'compression': None}) def test_unsupported(self, fp): @@ -406,7 +413,7 @@ def test_categorical(self, fp): if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"): pytest.skip("CategoricalDtype not supported for older fp") df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) - self.check_round_trip(df, fp, compression=None) + self.check_round_trip(df, fp, write_kwargs={'compression': None}) def test_datetime_tz(self, fp): # doesn't preserve tz @@ -416,4 +423,13 @@ def test_datetime_tz(self, fp): # warns on the coercion with catch_warnings(record=True): self.check_round_trip(df, fp, df.astype('datetime64[ns]'), - compression=None) + write_kwargs={'compression': None}) + + def test_filter_row_groups(self, fp): + d = {'a': list(range(0, 3))} + df = pd.DataFrame(d) + with tm.ensure_clean() as path: + df.to_parquet(path, fp, compression=None, + row_group_offsets=1) + result = read_parquet(path, fp, filters=[('a', '==', 0)]) + assert len(result) == 1