Skip to content

Commit 55c259d

Browse files
anjsudhPingviinituutti
authored andcommitted
ENH: Support for partition_cols in to_parquet (pandas-dev#23321)
* closes pandas-dev#23283
1 parent 6f56203 commit 55c259d

File tree

7 files changed

+167
-15
lines changed

7 files changed

+167
-15
lines changed

doc/source/io.rst

+37
Original file line numberDiff line numberDiff line change
@@ -4673,6 +4673,43 @@ Passing ``index=True`` will *always* write the index, even if that's not the
46734673
underlying engine's default behavior.
46744674

46754675

4676+
Partitioning Parquet files
4677+
''''''''''''''''''''''''''
4678+
4679+
.. versionadded:: 0.24.0
4680+
4681+
Parquet supports partitioning of data based on the values of one or more columns.
4682+
4683+
.. ipython:: python
4684+
4685+
df = pd.DataFrame({'a': [0, 0, 1, 1], 'b': [0, 1, 0, 1]})
4686+
df.to_parquet(fname='test', engine='pyarrow', partition_cols=['a'], compression=None)
4687+
4688+
The `fname` specifies the parent directory to which data will be saved.
4689+
The `partition_cols` are the column names by which the dataset will be partitioned.
4690+
Columns are partitioned in the order they are given. The partition splits are
4691+
determined by the unique values in the partition columns.
4692+
The above example creates a partitioned dataset that may look like:
4693+
4694+
.. code-block:: text
4695+
4696+
test
4697+
├── a=0
4698+
│ ├── 0bac803e32dc42ae83fddfd029cbdebc.parquet
4699+
│ └── ...
4700+
└── a=1
4701+
├── e6ab24a4f45147b49b54a662f0c412a3.parquet
4702+
└── ...
4703+
4704+
.. ipython:: python
4705+
:suppress:
4706+
4707+
from shutil import rmtree
4708+
try:
4709+
rmtree('test')
4710+
except Exception:
4711+
pass
4712+
46764713
.. _io.sql:
46774714

46784715
SQL Queries

doc/source/whatsnew/v0.24.0.txt

+1
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ Other Enhancements
236236
- New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`).
237237
- Compatibility with Matplotlib 3.0 (:issue:`22790`).
238238
- Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`)
239+
- :func:`~DataFrame.to_parquet` now supports writing a ``DataFrame`` as a directory of parquet files partitioned by a subset of the columns when ``engine = 'pyarrow'`` (:issue:`23283`)
239240
- :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`)
240241

241242
.. _whatsnew_0240.api_breaking:

pandas/core/frame.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -1970,7 +1970,7 @@ def to_feather(self, fname):
19701970
to_feather(self, fname)
19711971

19721972
def to_parquet(self, fname, engine='auto', compression='snappy',
1973-
index=None, **kwargs):
1973+
index=None, partition_cols=None, **kwargs):
19741974
"""
19751975
Write a DataFrame to the binary parquet format.
19761976
@@ -1984,7 +1984,11 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
19841984
Parameters
19851985
----------
19861986
fname : str
1987-
String file path.
1987+
File path or Root Directory path. Will be used as Root Directory
1988+
path while writing a partitioned dataset.
1989+
1990+
.. versionchanged:: 0.24.0
1991+
19881992
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
19891993
Parquet library to use. If 'auto', then the option
19901994
``io.parquet.engine`` is used. The default ``io.parquet.engine``
@@ -1999,6 +2003,12 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
19992003
20002004
.. versionadded:: 0.24.0
20012005
2006+
partition_cols : list, optional, default None
2007+
Column names by which to partition the dataset
2008+
Columns are partitioned in the order they are given
2009+
2010+
.. versionadded:: 0.24.0
2011+
20022012
**kwargs
20032013
Additional arguments passed to the parquet library. See
20042014
:ref:`pandas io <io.parquet>` for more details.
@@ -2027,7 +2037,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy',
20272037
"""
20282038
from pandas.io.parquet import to_parquet
20292039
to_parquet(self, fname, engine,
2030-
compression=compression, index=index, **kwargs)
2040+
compression=compression, index=index,
2041+
partition_cols=partition_cols, **kwargs)
20312042

20322043
@Substitution(header='Write out the column names. If a list of strings '
20332044
'is given, it is assumed to be aliases for the '

pandas/io/parquet.py

+41-12
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,25 @@ def __init__(self):
101101
self.api = pyarrow
102102

103103
def write(self, df, path, compression='snappy',
104-
coerce_timestamps='ms', index=None, **kwargs):
104+
coerce_timestamps='ms', index=None, partition_cols=None,
105+
**kwargs):
105106
self.validate_dataframe(df)
106107
path, _, _, _ = get_filepath_or_buffer(path, mode='wb')
107108

108109
if index is None:
109110
from_pandas_kwargs = {}
110111
else:
111112
from_pandas_kwargs = {'preserve_index': index}
112-
113113
table = self.api.Table.from_pandas(df, **from_pandas_kwargs)
114-
self.api.parquet.write_table(
115-
table, path, compression=compression,
116-
coerce_timestamps=coerce_timestamps, **kwargs)
114+
if partition_cols is not None:
115+
self.api.parquet.write_to_dataset(
116+
table, path, compression=compression,
117+
coerce_timestamps=coerce_timestamps,
118+
partition_cols=partition_cols, **kwargs)
119+
else:
120+
self.api.parquet.write_table(
121+
table, path, compression=compression,
122+
coerce_timestamps=coerce_timestamps, **kwargs)
117123

118124
def read(self, path, columns=None, **kwargs):
119125
path, _, _, should_close = get_filepath_or_buffer(path)
@@ -156,12 +162,23 @@ def __init__(self):
156162
)
157163
self.api = fastparquet
158164

159-
def write(self, df, path, compression='snappy', index=None, **kwargs):
165+
def write(self, df, path, compression='snappy', index=None,
166+
partition_cols=None, **kwargs):
160167
self.validate_dataframe(df)
161168
# thriftpy/protocol/compact.py:339:
162169
# DeprecationWarning: tostring() is deprecated.
163170
# Use tobytes() instead.
164171

172+
if 'partition_on' in kwargs and partition_cols is not None:
173+
raise ValueError("Cannot use both partition_on and "
174+
"partition_cols. Use partition_cols for "
175+
"partitioning data")
176+
elif 'partition_on' in kwargs:
177+
partition_cols = kwargs.pop('partition_on')
178+
179+
if partition_cols is not None:
180+
kwargs['file_scheme'] = 'hive'
181+
165182
if is_s3_url(path):
166183
# path is s3:// so we need to open the s3file in 'wb' mode.
167184
# TODO: Support 'ab'
@@ -174,7 +191,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs):
174191

175192
with catch_warnings(record=True):
176193
self.api.write(path, df, compression=compression,
177-
write_index=index, **kwargs)
194+
write_index=index, partition_on=partition_cols,
195+
**kwargs)
178196

179197
def read(self, path, columns=None, **kwargs):
180198
if is_s3_url(path):
@@ -194,15 +212,18 @@ def read(self, path, columns=None, **kwargs):
194212

195213

196214
def to_parquet(df, path, engine='auto', compression='snappy', index=None,
197-
**kwargs):
215+
partition_cols=None, **kwargs):
198216
"""
199217
Write a DataFrame to the parquet format.
200218
201219
Parameters
202220
----------
203-
df : DataFrame
204-
path : string
205-
File path
221+
path : str
222+
File path or Root Directory path. Will be used as Root Directory path
223+
while writing a partitioned dataset.
224+
225+
.. versionchanged:: 0.24.0
226+
206227
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
207228
Parquet library to use. If 'auto', then the option
208229
``io.parquet.engine`` is used. The default ``io.parquet.engine``
@@ -216,11 +237,19 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None,
216237
engine's default behavior will be used.
217238
218239
.. versionadded 0.24.0
240+
241+
partition_cols : list, optional, default None
242+
Column names by which to partition the dataset
243+
Columns are partitioned in the order they are given
244+
245+
.. versionadded:: 0.24.0
246+
219247
kwargs
220248
Additional keyword arguments passed to the engine
221249
"""
222250
impl = get_engine(engine)
223-
return impl.write(df, path, compression=compression, index=index, **kwargs)
251+
return impl.write(df, path, compression=compression, index=index,
252+
partition_cols=partition_cols, **kwargs)
224253

225254

226255
def read_parquet(path, engine='auto', columns=None, **kwargs):

pandas/tests/io/test_parquet.py

+47
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
""" test parquet compat """
2+
import os
23

34
import pytest
45
import datetime
@@ -454,6 +455,18 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa):
454455
check_round_trip(df_compat, pa,
455456
path='s3://pandas-test/pyarrow.parquet')
456457

458+
def test_partition_cols_supported(self, pa, df_full):
459+
# GH #23283
460+
partition_cols = ['bool', 'int']
461+
df = df_full
462+
with tm.ensure_clean_dir() as path:
463+
df.to_parquet(path, partition_cols=partition_cols,
464+
compression=None)
465+
import pyarrow.parquet as pq
466+
dataset = pq.ParquetDataset(path, validate_schema=False)
467+
assert len(dataset.partitions.partition_names) == 2
468+
assert dataset.partitions.partition_names == set(partition_cols)
469+
457470

458471
class TestParquetFastParquet(Base):
459472

@@ -519,3 +532,37 @@ def test_s3_roundtrip(self, df_compat, s3_resource, fp):
519532
# GH #19134
520533
check_round_trip(df_compat, fp,
521534
path='s3://pandas-test/fastparquet.parquet')
535+
536+
def test_partition_cols_supported(self, fp, df_full):
537+
# GH #23283
538+
partition_cols = ['bool', 'int']
539+
df = df_full
540+
with tm.ensure_clean_dir() as path:
541+
df.to_parquet(path, engine="fastparquet",
542+
partition_cols=partition_cols, compression=None)
543+
assert os.path.exists(path)
544+
import fastparquet
545+
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
546+
assert len(actual_partition_cols) == 2
547+
548+
def test_partition_on_supported(self, fp, df_full):
549+
# GH #23283
550+
partition_cols = ['bool', 'int']
551+
df = df_full
552+
with tm.ensure_clean_dir() as path:
553+
df.to_parquet(path, engine="fastparquet", compression=None,
554+
partition_on=partition_cols)
555+
assert os.path.exists(path)
556+
import fastparquet
557+
actual_partition_cols = fastparquet.ParquetFile(path, False).cats
558+
assert len(actual_partition_cols) == 2
559+
560+
def test_error_on_using_partition_cols_and_partition_on(self, fp, df_full):
561+
# GH #23283
562+
partition_cols = ['bool', 'int']
563+
df = df_full
564+
with pytest.raises(ValueError):
565+
with tm.ensure_clean_dir() as path:
566+
df.to_parquet(path, engine="fastparquet", compression=None,
567+
partition_on=partition_cols,
568+
partition_cols=partition_cols)

pandas/tests/util/test_testing.py

+7
Original file line numberDiff line numberDiff line change
@@ -876,3 +876,10 @@ def test_datapath_missing(datapath, request):
876876
)
877877

878878
assert result == expected
879+
880+
881+
def test_create_temp_directory():
882+
with tm.ensure_clean_dir() as path:
883+
assert os.path.exists(path)
884+
assert os.path.isdir(path)
885+
assert not os.path.exists(path)

pandas/util/testing.py

+20
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import locale
77
import os
88
import re
9+
from shutil import rmtree
910
import string
1011
import subprocess
1112
import sys
@@ -761,6 +762,25 @@ def ensure_clean(filename=None, return_filelike=False):
761762
print("Exception on removing file: {error}".format(error=e))
762763

763764

765+
@contextmanager
766+
def ensure_clean_dir():
767+
"""
768+
Get a temporary directory path and agrees to remove on close.
769+
770+
Yields
771+
------
772+
Temporary directory path
773+
"""
774+
directory_name = tempfile.mkdtemp(suffix='')
775+
try:
776+
yield directory_name
777+
finally:
778+
try:
779+
rmtree(directory_name)
780+
except Exception:
781+
pass
782+
783+
764784
# -----------------------------------------------------------------------------
765785
# Comparators
766786

0 commit comments

Comments
 (0)