Skip to content

Support for partition_cols in to_parquet #23321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Nov 10, 2018
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4574,6 +4574,8 @@ Several caveats.
* Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype.
* Non supported types include ``Period`` and actual Python object types. These will raise a helpful error message
on an attempt at serialization.
* ``partition_cols`` will be used for partitioning the dataset, where the dataset will be written to multiple
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of the items in this lists feel more like limitations of pandas / these engines. Requiring that path be a directory when partition_cols is set doesn't seem to fit here.

I think this is important / different enough to deserve a new small section below "Handling Indexes", with

  1. A description of what partition_cols requires (list of column names, directory for file path)
  2. A description of why you might want to use partition_cols
  3. A small example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

files in the path specified. Therefore, the path specified, must be a directory path.

You can specify an ``engine`` to direct the serialization. This can be one of ``pyarrow``, or ``fastparquet``, or ``auto``.
If the engine is NOT specified, then the ``pd.options.io.parquet.engine`` option is checked; if this is also ``auto``,
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.24.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ Other Enhancements
- New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`).
- Compatibility with Matplotlib 3.0 (:issue:`22790`).
- Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`)
- :func:`~DataFrame.to_parquet` now supports writing a DataFrame as a directory of parquet files partitioned by a subset of the columns. (:issue:`23283`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably best to mention "with the pyarrow engine (this was previously supported with fastparquet)."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

- :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`)

.. _whatsnew_0240.api_breaking:
Expand Down
15 changes: 12 additions & 3 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1970,7 +1970,7 @@ def to_feather(self, fname):
to_feather(self, fname)

def to_parquet(self, fname, engine='auto', compression='snappy',
index=None, **kwargs):
index=None, partition_cols=None, **kwargs):
"""
Write a DataFrame to the binary parquet format.

Expand All @@ -1984,7 +1984,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
Parameters
----------
fname : str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side issue. we use path elsewhere for IO routines. We should change this as well (out of scope here). would have to deprecate (the name) unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we actually use path on the top-level .to_parquet, not sure how this is named this way.

String file path.
File path or Root Directory path. Will be used as Root Directory
path while writing a partitioned dataset.
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -1997,6 +1998,13 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
If ``False``, they will not be written to the file. If ``None``,
the behavior depends on the chosen engine.

.. versionadded:: 0.24.0
partition_cols : list, optional, default None
Column names by which to partition the dataset
Columns are partitioned in the order they are given
The behaviour applies only to pyarrow >= 0.7.0 and fastparquet
For other versions, this argument will be ignored.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it actually ignored for older pyarrows? I would have hoped it would raise when pyarrow gets the unrecognized argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it seems like we raise. Could you update this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


.. versionadded:: 0.24.0

**kwargs
Expand Down Expand Up @@ -2027,7 +2035,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
"""
from pandas.io.parquet import to_parquet
to_parquet(self, fname, engine,
compression=compression, index=index, **kwargs)
compression=compression, index=index,
partition_cols=partition_cols, **kwargs)

@Substitution(header='Write out the column names. If a list of strings '
'is given, it is assumed to be aliases for the '
Expand Down
48 changes: 37 additions & 11 deletions pandas/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,19 @@ def __init__(self):
self.api = pyarrow

def write(self, df, path, compression='snappy',
coerce_timestamps='ms', index=None, **kwargs):
coerce_timestamps='ms', index=None, partition_cols=None,
**kwargs):
self.validate_dataframe(df)

# Only validate the index if we're writing it.
if self._pyarrow_lt_070 and index is not False:
self._validate_write_lt_070(df)
path, _, _, _ = get_filepath_or_buffer(path, mode='wb')

if partition_cols is not None and self._pyarrow_lt_070:
raise ValueError("Partitioning of parquet files are only "
"supported with pyarrow >= 0.7.0")

if index is None:
from_pandas_kwargs = {}
else:
Expand All @@ -127,9 +132,15 @@ def write(self, df, path, compression='snappy',

else:
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)
if partition_cols is not None:
self.api.parquet.write_to_dataset(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps,
partition_cols=partition_cols, **kwargs)
else:
self.api.parquet.write_table(
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)

def read(self, path, columns=None, **kwargs):
path, _, _, should_close = get_filepath_or_buffer(path)
Expand Down Expand Up @@ -208,12 +219,19 @@ def __init__(self):
)
self.api = fastparquet

def write(self, df, path, compression='snappy', index=None, **kwargs):
def write(self, df, path, compression='snappy', index=None,
partition_cols=None, **kwargs):
self.validate_dataframe(df)
# thriftpy/protocol/compact.py:339:
# DeprecationWarning: tostring() is deprecated.
# Use tobytes() instead.

if 'partition_on' in kwargs:
partition_cols = kwargs.pop('partition_on')

if partition_cols is not None:
kwargs['file_scheme'] = 'hive'

if is_s3_url(path):
# path is s3:// so we need to open the s3file in 'wb' mode.
# TODO: Support 'ab'
Expand All @@ -226,7 +244,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs):

with catch_warnings(record=True):
self.api.write(path, df, compression=compression,
write_index=index, **kwargs)
write_index=index, partition_on=partition_cols,
**kwargs)

def read(self, path, columns=None, **kwargs):
if is_s3_url(path):
Expand All @@ -246,15 +265,15 @@ def read(self, path, columns=None, **kwargs):


def to_parquet(df, path, engine='auto', compression='snappy', index=None,
**kwargs):
partition_cols=None, **kwargs):
"""
Write a DataFrame to the parquet format.

Parameters
----------
df : DataFrame
path : string
File path
path : str
File path or Root Directory path. Will be used as Root Directory path
while writing a partitioned dataset.
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. If 'auto', then the option
``io.parquet.engine`` is used. The default ``io.parquet.engine``
Expand All @@ -268,11 +287,18 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None,
engine's default behavior will be used.

.. versionadded 0.24.0
partition_cols : list, optional
Column names by which to partition the dataset
Columns are partitioned in the order they are given
The behaviour applies only to pyarrow >= 0.7.0 and fastparquet
For other versions, this argument will be ignored.
.. versionadded:: 0.24.0
kwargs
Additional keyword arguments passed to the engine
"""
impl = get_engine(engine)
return impl.write(df, path, compression=compression, index=index, **kwargs)
return impl.write(df, path, compression=compression, index=index,
partition_cols=partition_cols, **kwargs)


def read_parquet(path, engine='auto', columns=None, **kwargs):
Expand Down
46 changes: 46 additions & 0 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" test parquet compat """
import os

import pytest
import datetime
Expand Down Expand Up @@ -478,6 +479,27 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa):
check_round_trip(df_compat, pa,
path='s3://pandas-test/pyarrow.parquet')

def test_partition_cols_supported(self, pa_ge_070, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, partition_cols=partition_cols,
compression=None)
import pyarrow.parquet as pq
dataset = pq.ParquetDataset(path, validate_schema=False)
assert len(dataset.partitions.partition_names) == 2
assert dataset.partitions.partition_names == set(partition_cols)

def test_partition_cols_not_supported_pa_lt_70(self, pa_lt_070, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with pytest.raises(ValueError):
with tm.ensure_clean_dir() as path:
df.to_parquet(path, partition_cols=partition_cols,
compression=None)


class TestParquetFastParquet(Base):

Expand Down Expand Up @@ -543,3 +565,27 @@ def test_s3_roundtrip(self, df_compat, s3_resource, fp):
# GH #19134
check_round_trip(df_compat, fp,
path='s3://pandas-test/fastparquet.parquet')

def test_partition_cols_supported(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet",
partition_cols=partition_cols, compression=None)
assert os.path.exists(path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
assert len(actual_partition_cols) == 2

def test_partition_on_supported(self, fp, df_full):
# GH #23283
partition_cols = ['bool', 'int']
df = df_full
with tm.ensure_clean_dir() as path:
df.to_parquet(path, engine="fastparquet", compression=None,
partition_on=partition_cols)
assert os.path.exists(path)
import fastparquet
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
assert len(actual_partition_cols) == 2
7 changes: 7 additions & 0 deletions pandas/tests/util/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,3 +876,10 @@ def test_datapath_missing(datapath, request):
)

assert result == expected


def test_create_temp_directory():
with tm.ensure_clean_dir() as path:
assert os.path.exists(path)
assert os.path.isdir(path)
assert not os.path.exists(path)
20 changes: 20 additions & 0 deletions pandas/util/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import locale
import os
import re
from shutil import rmtree
import string
import subprocess
import sys
Expand Down Expand Up @@ -759,6 +760,25 @@ def ensure_clean(filename=None, return_filelike=False):
print("Exception on removing file: {error}".format(error=e))


@contextmanager
def ensure_clean_dir():
"""
Get a temporary directory path and agrees to remove on close.

Yields
------
Temporary directory path
"""
directory_name = tempfile.mkdtemp(suffix='')
try:
yield directory_name
finally:
try:
rmtree(directory_name)
except Exception:
pass


# -----------------------------------------------------------------------------
# Comparators

Expand Down