-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
Pass kwargs from read_parquet() to the underlying engines. #18216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
ab26304
66e34fc
248a334
1c045a5
61748d8
243172d
6187a7c
edbd937
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,7 +105,7 @@ def test_options_py(df_compat, pa): | |
with pd.option_context('io.parquet.engine', 'pyarrow'): | ||
df.to_parquet(path) | ||
|
||
result = read_parquet(path, compression=None) | ||
result = read_parquet(path) | ||
tm.assert_frame_equal(result, df) | ||
|
||
|
||
|
@@ -118,7 +118,7 @@ def test_options_fp(df_compat, fp): | |
with pd.option_context('io.parquet.engine', 'fastparquet'): | ||
df.to_parquet(path, compression=None) | ||
|
||
result = read_parquet(path, compression=None) | ||
result = read_parquet(path) | ||
tm.assert_frame_equal(result, df) | ||
|
||
|
||
|
@@ -130,7 +130,7 @@ def test_options_auto(df_compat, fp, pa): | |
with pd.option_context('io.parquet.engine', 'auto'): | ||
df.to_parquet(path) | ||
|
||
result = read_parquet(path, compression=None) | ||
result = read_parquet(path) | ||
tm.assert_frame_equal(result, df) | ||
|
||
|
||
|
@@ -162,7 +162,7 @@ def test_cross_engine_pa_fp(df_cross_compat, pa, fp): | |
with tm.ensure_clean() as path: | ||
df.to_parquet(path, engine=pa, compression=None) | ||
|
||
result = read_parquet(path, engine=fp, compression=None) | ||
result = read_parquet(path, engine=fp) | ||
tm.assert_frame_equal(result, df) | ||
|
||
|
||
|
@@ -174,7 +174,7 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp): | |
with tm.ensure_clean() as path: | ||
df.to_parquet(path, engine=fp, compression=None) | ||
|
||
result = read_parquet(path, engine=pa, compression=None) | ||
result = read_parquet(path, engine=pa) | ||
tm.assert_frame_equal(result, df) | ||
|
||
|
||
|
@@ -188,18 +188,21 @@ def check_error_on_write(self, df, engine, exc): | |
with tm.ensure_clean() as path: | ||
to_parquet(df, path, engine, compression=None) | ||
|
||
def check_round_trip(self, df, engine, expected=None, **kwargs): | ||
|
||
def check_round_trip(self, df, engine, expected=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would refactor this helper function to have the following signature: def check_round_trip(self, df, engine, expected=None, write_kwargs=None, read_kwargs=None) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is definitly the way to go. |
||
extra_write_kwargs=None, **kwargs): | ||
write_kwargs = kwargs.copy() | ||
if extra_write_kwargs is not None: | ||
write_kwargs.update(extra_write_kwargs) | ||
with tm.ensure_clean() as path: | ||
df.to_parquet(path, engine, **kwargs) | ||
df.to_parquet(path, engine, **write_kwargs) | ||
result = read_parquet(path, engine, **kwargs) | ||
|
||
if expected is None: | ||
expected = df | ||
tm.assert_frame_equal(result, expected) | ||
|
||
# repeat | ||
to_parquet(df, path, engine, **kwargs) | ||
to_parquet(df, path, engine, **write_kwargs) | ||
result = pd.read_parquet(path, engine, **kwargs) | ||
|
||
if expected is None: | ||
|
@@ -222,7 +225,8 @@ def test_columns_dtypes(self, engine): | |
|
||
# unicode | ||
df.columns = [u'foo', u'bar'] | ||
self.check_round_trip(df, engine, compression=None) | ||
self.check_round_trip(df, engine, | ||
extra_write_kwargs={'compression': None}) | ||
|
||
def test_columns_dtypes_invalid(self, engine): | ||
|
||
|
@@ -246,7 +250,8 @@ def test_columns_dtypes_invalid(self, engine): | |
def test_write_with_index(self, engine): | ||
|
||
df = pd.DataFrame({'A': [1, 2, 3]}) | ||
self.check_round_trip(df, engine, compression=None) | ||
self.check_round_trip(df, engine, | ||
extra_write_kwargs={'compression': None}) | ||
|
||
# non-default index | ||
for index in [[2, 3, 4], | ||
|
@@ -280,7 +285,8 @@ def test_compression(self, engine, compression): | |
pytest.importorskip('brotli') | ||
|
||
df = pd.DataFrame({'A': [1, 2, 3]}) | ||
self.check_round_trip(df, engine, compression=compression) | ||
self.check_round_trip(df, engine, | ||
extra_write_kwargs={'compression': compression}) | ||
|
||
def test_read_columns(self, engine): | ||
# GH18154 | ||
|
@@ -289,7 +295,8 @@ def test_read_columns(self, engine): | |
|
||
expected = pd.DataFrame({'string': list('abc')}) | ||
self.check_round_trip(df, engine, expected=expected, | ||
compression=None, columns=["string"]) | ||
extra_write_kwargs={'compression': None}, | ||
columns=["string"]) | ||
|
||
|
||
class TestParquetPyArrow(Base): | ||
|
@@ -377,7 +384,7 @@ def test_basic(self, fp): | |
'timedelta': pd.timedelta_range('1 day', periods=3), | ||
}) | ||
|
||
self.check_round_trip(df, fp, compression=None) | ||
self.check_round_trip(df, fp, extra_write_kwargs={'compression': None}) | ||
|
||
@pytest.mark.skip(reason="not supported") | ||
def test_duplicate_columns(self, fp): | ||
|
@@ -390,7 +397,8 @@ def test_duplicate_columns(self, fp): | |
def test_bool_with_none(self, fp): | ||
df = pd.DataFrame({'a': [True, None, False]}) | ||
expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16') | ||
self.check_round_trip(df, fp, expected=expected, compression=None) | ||
self.check_round_trip(df, fp, expected=expected, | ||
extra_write_kwargs={'compression': None}) | ||
|
||
def test_unsupported(self, fp): | ||
|
||
|
@@ -406,7 +414,7 @@ def test_categorical(self, fp): | |
if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"): | ||
pytest.skip("CategoricalDtype not supported for older fp") | ||
df = pd.DataFrame({'a': pd.Categorical(list('abc'))}) | ||
self.check_round_trip(df, fp, compression=None) | ||
self.check_round_trip(df, fp, extra_write_kwargs={'compression': None}) | ||
|
||
def test_datetime_tz(self, fp): | ||
# doesn't preserve tz | ||
|
@@ -416,4 +424,13 @@ def test_datetime_tz(self, fp): | |
# warns on the coercion | ||
with catch_warnings(record=True): | ||
self.check_round_trip(df, fp, df.astype('datetime64[ns]'), | ||
compression=None) | ||
extra_write_kwargs={'compression': None}) | ||
|
||
def test_filter_row_groups(self, fp): | ||
d = {'a': list(range(0, 3))} | ||
df = pd.DataFrame(d) | ||
with tm.ensure_clean() as path: | ||
df.to_parquet(path, fp, compression=None, | ||
row_group_offsets=1) | ||
result = read_parquet(path, fp, filters=[('a', '==', 0)]) | ||
assert len(result) == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason you are removing the kw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, compression only exists for writes, as you specify the compression when writing the data, on read you have to un-compress with whatever algorithm was used when writing the file.
Before my patch, the kw was silently dropped, now it caused exceptions, because neither backend uses it.