From 41c2828a22a932b9237e4e302961b57c9e50bc9d Mon Sep 17 00:00:00 2001 From: Anjana S Date: Fri, 26 Oct 2018 23:09:41 +0530 Subject: [PATCH 01/15] closes #23283 --- doc/source/whatsnew/v0.24.0.txt | 1 + pandas/io/parquet.py | 14 ++++++++++---- pandas/tests/io/test_parquet.py | 20 ++++++++++++++++++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/doc/source/whatsnew/v0.24.0.txt b/doc/source/whatsnew/v0.24.0.txt index 768868d585721..7afc134ddbf33 100644 --- a/doc/source/whatsnew/v0.24.0.txt +++ b/doc/source/whatsnew/v0.24.0.txt @@ -213,6 +213,7 @@ Other Enhancements - New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`). - Compatibility with Matplotlib 3.0 (:issue:`22790`). - Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`) +- :func:`~DataFrame.to_parquet` now supports writing a DataFrame as a directory of parquet files partitioned by a subset of the columns. (:issue:`23283`). - :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`) .. _whatsnew_0240.api_breaking: diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index aef1d84a19bc7..bdac2a2338630 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -125,9 +125,14 @@ def write(self, df, path, compression='snappy', else: table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - self.api.parquet.write_table( - table, path, compression=compression, - coerce_timestamps=coerce_timestamps, **kwargs) + if 'partition_cols' in kwargs: + self.api.parquet.write_to_dataset( + table, path, compression=compression, + coerce_timestamps=coerce_timestamps, **kwargs) + else: + self.api.parquet.write_table( + table, path, compression=compression, + coerce_timestamps=coerce_timestamps, **kwargs) def read(self, path, columns=None, **kwargs): path, _, _, should_close = get_filepath_or_buffer(path) @@ -252,7 +257,8 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, ---------- df : DataFrame path : string - File path + File path ( Will be used as `root_path` if + `partition_cols` is provided as parameter for 'pyarrow' engine). engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 4c58d8ce29d8b..016a3eb843411 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1,6 +1,8 @@ """ test parquet compat """ import pytest +import tempfile +import shutil import datetime from distutils.version import LooseVersion from warnings import catch_warnings @@ -478,6 +480,24 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): check_round_trip(df_compat, pa, path='s3://pandas-test/pyarrow.parquet') + def test_partition_cols_supported(self, pa_ge_070, df_full): + partition_cols = ['bool', 'int'] + df = df_full + path = tempfile.mkdtemp() + df.to_parquet(path, partition_cols=partition_cols, + compression=None) + import pyarrow.parquet as pq + dataset = pq.ParquetDataset(path, validate_schema=False) + assert len(dataset.pieces) == 2 + assert len(dataset.partitions.partition_names) == 2 + assert dataset.partitions.partition_names == set(partition_cols) + shutil.rmtree(path) + + def test_ignore_partition_cols_lt_070(self, pa_lt_070, df_full): + partition_cols = ['bool', 'int'] + pa = pa_lt_070 + df = df_full + check_round_trip(df, pa, write_kwargs={'partition_cols': partition_cols}) class TestParquetFastParquet(Base): From 0d9f8786671431934be42f30295c66fcd5440ea4 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Sat, 27 Oct 2018 13:19:00 +0530 Subject: [PATCH 02/15] Fix linting issue --- pandas/tests/io/test_parquet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 016a3eb843411..9f3566c73c058 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -488,7 +488,6 @@ def test_partition_cols_supported(self, pa_ge_070, df_full): compression=None) import pyarrow.parquet as pq dataset = pq.ParquetDataset(path, validate_schema=False) - assert len(dataset.pieces) == 2 assert len(dataset.partitions.partition_names) == 2 assert dataset.partitions.partition_names == set(partition_cols) shutil.rmtree(path) @@ -497,7 +496,9 @@ def test_ignore_partition_cols_lt_070(self, pa_lt_070, df_full): partition_cols = ['bool', 'int'] pa = pa_lt_070 df = df_full - check_round_trip(df, pa, write_kwargs={'partition_cols': partition_cols}) + check_round_trip(df, pa, + write_kwargs={'partition_cols': partition_cols}) + class TestParquetFastParquet(Base): From 16366815b4a9fc0ee51b01c53cceff5395f711c3 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Mon, 29 Oct 2018 16:13:09 +0530 Subject: [PATCH 03/15] Update documentation using with to clean directory support for fastparquet --- doc/source/io.rst | 2 ++ pandas/core/frame.py | 14 ++++++++++--- pandas/io/parquet.py | 35 +++++++++++++++++++++---------- pandas/tests/io/test_parquet.py | 32 +++++++++++++++++++--------- pandas/tests/util/test_testing.py | 9 ++++++++ pandas/util/testing.py | 17 +++++++++++++++ 6 files changed, 85 insertions(+), 24 deletions(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index 56da4dbea8706..470015e35c092 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4574,6 +4574,8 @@ Several caveats. * Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype. * Non supported types include ``Period`` and actual Python object types. These will raise a helpful error message on an attempt at serialization. +* ``partition_cols`` will be used for partitioning the dataset, where the dataset will be written to multiple + files in the path specified. Therefore, the path specified, must be a directory path. You can specify an ``engine`` to direct the serialization. This can be one of ``pyarrow``, or ``fastparquet``, or ``auto``. If the engine is NOT specified, then the ``pd.options.io.parquet.engine`` option is checked; if this is also ``auto``, diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 61721ce4c82e7..8e80ff7594ad6 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -1970,7 +1970,7 @@ def to_feather(self, fname): to_feather(self, fname) def to_parquet(self, fname, engine='auto', compression='snappy', - index=None, **kwargs): + index=None, partition_cols=None, **kwargs): """ Write a DataFrame to the binary parquet format. @@ -1984,7 +1984,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy', Parameters ---------- fname : str - String file path. + File path or Root Directory path. Will be used as Root Directory + path while writing a partitioned dataset. engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` @@ -1998,6 +1999,12 @@ def to_parquet(self, fname, engine='auto', compression='snappy', the behavior depends on the chosen engine. .. versionadded:: 0.24.0 + partition_cols : list, optional, default None + Column names by which to partition the dataset + Columns are partitioned in the order they are given + The behaviour applies only to pyarrow >= 0.7.0 and fastparquet + For other versions, this argument will be ignored. + .. versionadded:: 0.24.0 **kwargs Additional arguments passed to the parquet library. See @@ -2027,7 +2034,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy', """ from pandas.io.parquet import to_parquet to_parquet(self, fname, engine, - compression=compression, index=index, **kwargs) + compression=compression, index=index, + partition_cols=partition_cols, **kwargs) @Substitution(header='Write out the column names. If a list of strings ' 'is given, it is assumed to be aliases for the ' diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index bdac2a2338630..291bd8fd942b1 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -104,7 +104,8 @@ def __init__(self): self.api = pyarrow def write(self, df, path, compression='snappy', - coerce_timestamps='ms', index=None, **kwargs): + coerce_timestamps='ms', index=None, partition_cols=None, + **kwargs): self.validate_dataframe(df) # Only validate the index if we're writing it. @@ -125,10 +126,11 @@ def write(self, df, path, compression='snappy', else: table = self.api.Table.from_pandas(df, **from_pandas_kwargs) - if 'partition_cols' in kwargs: + if partition_cols is not None: self.api.parquet.write_to_dataset( table, path, compression=compression, - coerce_timestamps=coerce_timestamps, **kwargs) + coerce_timestamps=coerce_timestamps, + partition_cols=partition_cols, **kwargs) else: self.api.parquet.write_table( table, path, compression=compression, @@ -211,12 +213,16 @@ def __init__(self): ) self.api = fastparquet - def write(self, df, path, compression='snappy', index=None, **kwargs): + def write(self, df, path, compression='snappy', index=None, + partition_cols=None, **kwargs): self.validate_dataframe(df) # thriftpy/protocol/compact.py:339: # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. + if partition_cols is not None: + kwargs['file_scheme'] = 'hive' + if is_s3_url(path): # path is s3:// so we need to open the s3file in 'wb' mode. # TODO: Support 'ab' @@ -229,7 +235,8 @@ def write(self, df, path, compression='snappy', index=None, **kwargs): with catch_warnings(record=True): self.api.write(path, df, compression=compression, - write_index=index, **kwargs) + write_index=index, partition_on=partition_cols, + **kwargs) def read(self, path, columns=None, **kwargs): if is_s3_url(path): @@ -249,16 +256,15 @@ def read(self, path, columns=None, **kwargs): def to_parquet(df, path, engine='auto', compression='snappy', index=None, - **kwargs): + partition_cols=None, **kwargs): """ Write a DataFrame to the parquet format. Parameters ---------- - df : DataFrame - path : string - File path ( Will be used as `root_path` if - `partition_cols` is provided as parameter for 'pyarrow' engine). + path : str + File path or Root Directory path. Will be used as Root Directory path + while writing a partitioned dataset. engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` @@ -272,11 +278,18 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, engine's default behavior will be used. .. versionadded 0.24.0 + partition_cols : list, optional + Column names by which to partition the dataset + Columns are partitioned in the order they are given + The behaviour applies only to pyarrow >= 0.7.0 and fastparquet + For other versions, this argument will be ignored. + .. versionadded:: 0.24.0 kwargs Additional keyword arguments passed to the engine """ impl = get_engine(engine) - return impl.write(df, path, compression=compression, index=index, **kwargs) + return impl.write(df, path, compression=compression, index=index, + partition_cols=partition_cols, **kwargs) def read_parquet(path, engine='auto', columns=None, **kwargs): diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 9f3566c73c058..84db3ee1dd21a 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -1,8 +1,7 @@ """ test parquet compat """ +import os import pytest -import tempfile -import shutil import datetime from distutils.version import LooseVersion from warnings import catch_warnings @@ -481,18 +480,19 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): path='s3://pandas-test/pyarrow.parquet') def test_partition_cols_supported(self, pa_ge_070, df_full): + # GH #23283 partition_cols = ['bool', 'int'] df = df_full - path = tempfile.mkdtemp() - df.to_parquet(path, partition_cols=partition_cols, - compression=None) - import pyarrow.parquet as pq - dataset = pq.ParquetDataset(path, validate_schema=False) - assert len(dataset.partitions.partition_names) == 2 - assert dataset.partitions.partition_names == set(partition_cols) - shutil.rmtree(path) + with tm.ensure_clean_dir() as path: + df.to_parquet(path, partition_cols=partition_cols, + compression=None) + import pyarrow.parquet as pq + dataset = pq.ParquetDataset(path, validate_schema=False) + assert len(dataset.partitions.partition_names) == 2 + assert dataset.partitions.partition_names == set(partition_cols) def test_ignore_partition_cols_lt_070(self, pa_lt_070, df_full): + # GH #23283 partition_cols = ['bool', 'int'] pa = pa_lt_070 df = df_full @@ -564,3 +564,15 @@ def test_s3_roundtrip(self, df_compat, s3_resource, fp): # GH #19134 check_round_trip(df_compat, fp, path='s3://pandas-test/fastparquet.parquet') + + def test_partition_cols_supported(self, fp, df_full): + # GH #23283 + partition_cols = ['bool', 'int'] + df = df_full + with tm.ensure_clean_dir() as path: + df.to_parquet(path, partition_cols=partition_cols, + compression=None) + assert os.path.exists(path) + import fastparquet + actual_partition_cols = fastparquet.ParquetFile(path, False).cats + assert len(actual_partition_cols) == 2 diff --git a/pandas/tests/util/test_testing.py b/pandas/tests/util/test_testing.py index d968005a25006..b1b7c704f752b 100644 --- a/pandas/tests/util/test_testing.py +++ b/pandas/tests/util/test_testing.py @@ -875,3 +875,12 @@ def test_datapath_missing(datapath, request): ) assert result == expected + + +def test_create_temp_directory(): + temppath = '' + with tm.ensure_clean_dir() as path: + assert os.path.exists(path) + assert os.path.isdir(path) + temppath = path + assert not os.path.exists(temppath) diff --git a/pandas/util/testing.py b/pandas/util/testing.py index 44163479dfd27..72ee44c757b2c 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -772,6 +772,23 @@ def ensure_clean(filename=None, return_filelike=False): print("Exception on removing file: {error}".format(error=e)) +@contextmanager +def ensure_clean_dir(): + """ + Get a temporary directory path and agrees to remove on close. + + Yields + ---------- + Temporary directory path + """ + directory_name = tempfile.mkdtemp(suffix='') + try: + yield directory_name + finally: + import shutil + shutil.rmtree(directory_name) + + # ----------------------------------------------------------------------------- # Comparators From 14a25806c1527b60956bbb3c0e4691087c610abd Mon Sep 17 00:00:00 2001 From: Anjana S Date: Mon, 29 Oct 2018 18:40:01 +0530 Subject: [PATCH 04/15] Documentation changes Throws value error for older versions of pyarrow and if partitions_on is used in fastparquet --- pandas/io/parquet.py | 7 +++++++ pandas/tests/io/test_parquet.py | 14 ++++++++++++-- pandas/util/testing.py | 9 ++++++--- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 291bd8fd942b1..58c22486d745d 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -113,6 +113,10 @@ def write(self, df, path, compression='snappy', self._validate_write_lt_070(df) path, _, _, _ = get_filepath_or_buffer(path, mode='wb') + if partition_cols is not None and self._pyarrow_lt_070: + raise ValueError("Partitioning of parquet files are only " + "supported with pyarrow >= 0.7.0") + if index is None: from_pandas_kwargs = {} else: @@ -220,6 +224,9 @@ def write(self, df, path, compression='snappy', index=None, # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. + if 'partition_on' in kwargs: + raise ValueError("must use 'partition_cols' instead of 'partition_on'") + if partition_cols is not None: kwargs['file_scheme'] = 'hive' diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 84db3ee1dd21a..8d8173cde8da9 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -496,8 +496,10 @@ def test_ignore_partition_cols_lt_070(self, pa_lt_070, df_full): partition_cols = ['bool', 'int'] pa = pa_lt_070 df = df_full - check_round_trip(df, pa, - write_kwargs={'partition_cols': partition_cols}) + with tm.ensure_clean_dir() as path: + df.to_parquet(path, partition_cols=partition_cols, + compression=None) + pytest.raises(ValueError) class TestParquetFastParquet(Base): @@ -576,3 +578,11 @@ def test_partition_cols_supported(self, fp, df_full): import fastparquet actual_partition_cols = fastparquet.ParquetFile(path, False).cats assert len(actual_partition_cols) == 2 + + def test_partition_on_not_supported(self, fp, df_full): + # GH #23283 + partition_cols = ['bool', 'int'] + df = df_full + with pytest.raises(ValueError): + df.to_parquet("", compression=None, + partition_on=partition_cols) diff --git a/pandas/util/testing.py b/pandas/util/testing.py index 72ee44c757b2c..7812c16078578 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -778,15 +778,18 @@ def ensure_clean_dir(): Get a temporary directory path and agrees to remove on close. Yields - ---------- + ------ Temporary directory path """ directory_name = tempfile.mkdtemp(suffix='') try: yield directory_name finally: - import shutil - shutil.rmtree(directory_name) + try: + import shutil + shutil.rmtree(directory_name) + except Exception as e: + print("Exception on removing folder: {error}".format(error=e)) # ----------------------------------------------------------------------------- From 7bc337b759e410692c49bbd7b38e41922d34d162 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Mon, 29 Oct 2018 22:24:48 +0530 Subject: [PATCH 05/15] not raise error when partion_on is used in fastparquet add a blank line before the versionadded --- pandas/core/frame.py | 1 + pandas/io/parquet.py | 3 ++- pandas/tests/io/test_parquet.py | 27 +++++++++++++++------------ 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 8e80ff7594ad6..78111939ccdcd 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -2004,6 +2004,7 @@ def to_parquet(self, fname, engine='auto', compression='snappy', Columns are partitioned in the order they are given The behaviour applies only to pyarrow >= 0.7.0 and fastparquet For other versions, this argument will be ignored. + .. versionadded:: 0.24.0 **kwargs diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 58c22486d745d..03dd5ae9f6522 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -225,7 +225,8 @@ def write(self, df, path, compression='snappy', index=None, # Use tobytes() instead. if 'partition_on' in kwargs: - raise ValueError("must use 'partition_cols' instead of 'partition_on'") + partition_cols = kwargs['partition_on'] + del kwargs['partition_on'] if partition_cols is not None: kwargs['file_scheme'] = 'hive' diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 8d8173cde8da9..30ae535a40a88 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -491,15 +491,14 @@ def test_partition_cols_supported(self, pa_ge_070, df_full): assert len(dataset.partitions.partition_names) == 2 assert dataset.partitions.partition_names == set(partition_cols) - def test_ignore_partition_cols_lt_070(self, pa_lt_070, df_full): + def test_partition_cols_not_supported_pa_lt_70(self, pa_lt_070, df_full): # GH #23283 partition_cols = ['bool', 'int'] - pa = pa_lt_070 df = df_full - with tm.ensure_clean_dir() as path: - df.to_parquet(path, partition_cols=partition_cols, - compression=None) - pytest.raises(ValueError) + with pytest.raises(ValueError): + with tm.ensure_clean_dir() as path: + df.to_parquet(path, partition_cols=partition_cols, + compression=None) class TestParquetFastParquet(Base): @@ -572,17 +571,21 @@ def test_partition_cols_supported(self, fp, df_full): partition_cols = ['bool', 'int'] df = df_full with tm.ensure_clean_dir() as path: - df.to_parquet(path, partition_cols=partition_cols, - compression=None) + df.to_parquet(path, engine="fastparquet", + partition_cols=partition_cols, compression=None) assert os.path.exists(path) import fastparquet actual_partition_cols = fastparquet.ParquetFile(path, False).cats assert len(actual_partition_cols) == 2 - def test_partition_on_not_supported(self, fp, df_full): + def test_partition_on_supported(self, fp, df_full): # GH #23283 partition_cols = ['bool', 'int'] df = df_full - with pytest.raises(ValueError): - df.to_parquet("", compression=None, - partition_on=partition_cols) + with tm.ensure_clean_dir() as path: + df.to_parquet(path, engine="fastparquet", compression=None, + partition_on=partition_cols) + assert os.path.exists(path) + import fastparquet + actual_partition_cols = fastparquet.ParquetFile(path, False).cats + assert len(actual_partition_cols) == 2 From 6670adf1e78986c27c974751c4940555485173a7 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Sun, 4 Nov 2018 10:55:24 +0530 Subject: [PATCH 06/15] using pop for array element deletion passing exception during folder removal removal of additional variable --- pandas/io/parquet.py | 3 +-- pandas/tests/util/test_testing.py | 4 +--- pandas/util/testing.py | 6 +++--- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 03dd5ae9f6522..648ee4fbd9e7f 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -225,8 +225,7 @@ def write(self, df, path, compression='snappy', index=None, # Use tobytes() instead. if 'partition_on' in kwargs: - partition_cols = kwargs['partition_on'] - del kwargs['partition_on'] + partition_cols = kwargs.pop('partition_on') if partition_cols is not None: kwargs['file_scheme'] = 'hive' diff --git a/pandas/tests/util/test_testing.py b/pandas/tests/util/test_testing.py index b1b7c704f752b..668059793dd80 100644 --- a/pandas/tests/util/test_testing.py +++ b/pandas/tests/util/test_testing.py @@ -878,9 +878,7 @@ def test_datapath_missing(datapath, request): def test_create_temp_directory(): - temppath = '' with tm.ensure_clean_dir() as path: assert os.path.exists(path) assert os.path.isdir(path) - temppath = path - assert not os.path.exists(temppath) + assert not os.path.exists(path) diff --git a/pandas/util/testing.py b/pandas/util/testing.py index 7812c16078578..11e8837119e82 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -51,6 +51,7 @@ from pandas._libs import testing as _testing from pandas.io.common import urlopen +from shutil import rmtree N = 30 @@ -786,10 +787,9 @@ def ensure_clean_dir(): yield directory_name finally: try: - import shutil - shutil.rmtree(directory_name) + rmtree(directory_name) except Exception as e: - print("Exception on removing folder: {error}".format(error=e)) + pass # ----------------------------------------------------------------------------- From 112d6e92bcc02c2bb7e4cc85f0ba0ec9309a5ff4 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Sun, 4 Nov 2018 10:55:24 +0530 Subject: [PATCH 07/15] using pop for array element deletion passing exception during folder removal removal of additional variable --- pandas/util/testing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pandas/util/testing.py b/pandas/util/testing.py index c566b0d715ada..21e5dca5d0ba6 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -35,6 +35,7 @@ from pandas.core.algorithms import take_1d from pandas.core.arrays import ExtensionArray, IntervalArray, PeriodArray import pandas.core.common as com +from shutil import rmtree from pandas.io.common import urlopen from pandas.io.formats.printing import pprint_thing From 441f879b5d3a0d94f65c619dac7fd0173a7c0d4a Mon Sep 17 00:00:00 2001 From: Anjana S Date: Sun, 4 Nov 2018 14:23:50 +0530 Subject: [PATCH 08/15] fix order of imports --- pandas/util/testing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pandas/util/testing.py b/pandas/util/testing.py index 21e5dca5d0ba6..b5aad1638b456 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -6,6 +6,7 @@ import locale import os import re +from shutil import rmtree import string import subprocess import sys @@ -27,6 +28,7 @@ is_datetimelike_v_object, is_extension_array_dtype, is_interval_dtype, is_list_like, is_number, is_sequence, needs_i8_conversion) from pandas.core.dtypes.missing import array_equivalent + import pandas as pd from pandas import ( Categorical, CategoricalIndex, DataFrame, DatetimeIndex, Index, @@ -35,11 +37,9 @@ from pandas.core.algorithms import take_1d from pandas.core.arrays import ExtensionArray, IntervalArray, PeriodArray import pandas.core.common as com -from shutil import rmtree from pandas.io.common import urlopen from pandas.io.formats.printing import pprint_thing -from shutil import rmtree N = 30 K = 4 From a5164b874487beb2b5d3246cd8fb8e530732a0de Mon Sep 17 00:00:00 2001 From: Anjana S Date: Mon, 5 Nov 2018 16:48:58 +0530 Subject: [PATCH 09/15] fix failing codecheck --- pandas/util/testing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/util/testing.py b/pandas/util/testing.py index b5aad1638b456..f0dcfda2f52ad 100644 --- a/pandas/util/testing.py +++ b/pandas/util/testing.py @@ -775,7 +775,7 @@ def ensure_clean_dir(): finally: try: rmtree(directory_name) - except Exception as e: + except Exception: pass From 1f0978f57506ef71079080b3e57419229283694b Mon Sep 17 00:00:00 2001 From: Anjana S Date: Mon, 5 Nov 2018 18:12:02 +0530 Subject: [PATCH 10/15] Updated documentation Fix"Should raise error on using partition_cols and partition_on together" --- doc/source/io.rst | 29 +++++++++++++++++++++++++++-- doc/source/whatsnew/v0.24.0.txt | 2 +- pandas/core/frame.py | 4 ++-- pandas/io/parquet.py | 11 ++++++++--- pandas/tests/io/test_parquet.py | 10 ++++++++++ 5 files changed, 48 insertions(+), 8 deletions(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index 17cdd296f77ef..b57fdfad551b4 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4574,8 +4574,6 @@ Several caveats. * Categorical dtypes can be serialized to parquet, but will de-serialize as ``object`` dtype. * Non supported types include ``Period`` and actual Python object types. These will raise a helpful error message on an attempt at serialization. -* ``partition_cols`` will be used for partitioning the dataset, where the dataset will be written to multiple - files in the path specified. Therefore, the path specified, must be a directory path. You can specify an ``engine`` to direct the serialization. This can be one of ``pyarrow``, or ``fastparquet``, or ``auto``. If the engine is NOT specified, then the ``pd.options.io.parquet.engine`` option is checked; if this is also ``auto``, @@ -4670,6 +4668,33 @@ Passing ``index=True`` will *always* write the index, even if that's not the underlying engine's default behavior. +Partitioning Parquet files +'''''''''''''''''''''''''' + +Parquet supports partitioning of data based on the values of one or more columns. + +.. ipython:: python + + df = pd.DataFrame({'a': [0, 0, 1, 1], 'b': [0, 1, 0, 1]}) + df.to_parquet(fname='test', engine='pyarrow', partition_cols=['a'], compression=None) + +The `fname` specifies the parent directory to which data will be saved. +The `partition_cols` are the column names by which the dataset will be partitioned. +Columns are partitioned in the order they are given. The partition splits are +determined by the unique values in the partition columns. +The above example creates a partitioned dataset that may look like: + +:: + + test/ + a=0/ + 0bac803e32dc42ae83fddfd029cbdebc.parquet + ... + a=1/ + e6ab24a4f45147b49b54a662f0c412a3.parquet + ... + + .. _io.sql: SQL Queries diff --git a/doc/source/whatsnew/v0.24.0.txt b/doc/source/whatsnew/v0.24.0.txt index 1475c749dc21d..7fc038f2d60ad 100644 --- a/doc/source/whatsnew/v0.24.0.txt +++ b/doc/source/whatsnew/v0.24.0.txt @@ -235,7 +235,7 @@ Other Enhancements - New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`). - Compatibility with Matplotlib 3.0 (:issue:`22790`). - Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`) -- :func:`~DataFrame.to_parquet` now supports writing a DataFrame as a directory of parquet files partitioned by a subset of the columns. (:issue:`23283`). +- With the pyarrow engine, :func:`~DataFrame.to_parquet` now supports writing a DataFrame as a directory of parquet files partitioned by a subset of the columns. (:issue:`23283`). - :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`) .. _whatsnew_0240.api_breaking: diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 023f26f421e8b..64968b8a3eb17 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -2002,8 +2002,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy', partition_cols : list, optional, default None Column names by which to partition the dataset Columns are partitioned in the order they are given - The behaviour applies only to pyarrow >= 0.7.0 and fastparquet - For other versions, this argument will be ignored. + The behaviour applies only to pyarrow >= 0.7.0 and fastparquet. + Raises a ValueError for other versions. .. versionadded:: 0.24.0 diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 73163868858e0..e991dfc246147 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -227,7 +227,12 @@ def write(self, df, path, compression='snappy', index=None, # Use tobytes() instead. if 'partition_on' in kwargs: - partition_cols = kwargs.pop('partition_on') + if partition_cols is None: + partition_cols = kwargs.pop('partition_on') + else: + raise ValueError("Cannot use both partition_on and " + "partition_cols. Use partition_cols for " + "partitioning data") if partition_cols is not None: kwargs['file_scheme'] = 'hive' @@ -290,8 +295,8 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, partition_cols : list, optional Column names by which to partition the dataset Columns are partitioned in the order they are given - The behaviour applies only to pyarrow >= 0.7.0 and fastparquet - For other versions, this argument will be ignored. + The behaviour applies only to pyarrow >= 0.7.0 and fastparquet. + Raises a ValueError for other versions. .. versionadded:: 0.24.0 kwargs Additional keyword arguments passed to the engine diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 30ae535a40a88..ab9ab4ee5e005 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -589,3 +589,13 @@ def test_partition_on_supported(self, fp, df_full): import fastparquet actual_partition_cols = fastparquet.ParquetFile(path, False).cats assert len(actual_partition_cols) == 2 + + def test_error_on_using_partition_cols_and_partition_on(self, fp, df_full): + # GH #23283 + partition_cols = ['bool', 'int'] + df = df_full + with pytest.raises(ValueError): + with tm.ensure_clean_dir() as path: + df.to_parquet(path, engine="fastparquet", compression=None, + partition_on=partition_cols, + partition_cols=partition_cols) From ee7707fdf9836b9fc7988c12c2f7b94fb1944b32 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Tue, 6 Nov 2018 09:22:06 +0530 Subject: [PATCH 11/15] Removed < 0.7.0 documentation for pyarrow in partition support code --- pandas/core/frame.py | 2 -- pandas/io/parquet.py | 15 +++++---------- pandas/tests/io/test_parquet.py | 11 +---------- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 64968b8a3eb17..fe4243035e82f 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -2002,8 +2002,6 @@ def to_parquet(self, fname, engine='auto', compression='snappy', partition_cols : list, optional, default None Column names by which to partition the dataset Columns are partitioned in the order they are given - The behaviour applies only to pyarrow >= 0.7.0 and fastparquet. - Raises a ValueError for other versions. .. versionadded:: 0.24.0 diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index f3d6f4052bd1c..e9683cf340408 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -106,10 +106,6 @@ def write(self, df, path, compression='snappy', self.validate_dataframe(df) path, _, _, _ = get_filepath_or_buffer(path, mode='wb') - if partition_cols is not None and self._pyarrow_lt_070: - raise ValueError("Partitioning of parquet files are only " - "supported with pyarrow >= 0.7.0") - if index is None: from_pandas_kwargs = {} else: @@ -239,12 +235,11 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, engine's default behavior will be used. .. versionadded 0.24.0 - partition_cols : list, optional - Column names by which to partition the dataset - Columns are partitioned in the order they are given - The behaviour applies only to pyarrow >= 0.7.0 and fastparquet. - Raises a ValueError for other versions. - .. versionadded:: 0.24.0 + partition_cols : list, optional, default None + Column names by which to partition the dataset + Columns are partitioned in the order they are given + + .. versionadded:: 0.24.0 kwargs Additional keyword arguments passed to the engine """ diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index a872323084c91..6024fccb15c76 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -455,7 +455,7 @@ def test_s3_roundtrip(self, df_compat, s3_resource, pa): check_round_trip(df_compat, pa, path='s3://pandas-test/pyarrow.parquet') - def test_partition_cols_supported(self, pa_ge_070, df_full): + def test_partition_cols_supported(self, pa, df_full): # GH #23283 partition_cols = ['bool', 'int'] df = df_full @@ -467,15 +467,6 @@ def test_partition_cols_supported(self, pa_ge_070, df_full): assert len(dataset.partitions.partition_names) == 2 assert dataset.partitions.partition_names == set(partition_cols) - def test_partition_cols_not_supported_pa_lt_70(self, pa_lt_070, df_full): - # GH #23283 - partition_cols = ['bool', 'int'] - df = df_full - with pytest.raises(ValueError): - with tm.ensure_clean_dir() as path: - df.to_parquet(path, partition_cols=partition_cols, - compression=None) - class TestParquetFastParquet(Base): From 79f16152d5db169d33f425a80b8f8b420376a271 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Tue, 6 Nov 2018 11:05:34 +0530 Subject: [PATCH 12/15] documentation changes for version change --- doc/source/io.rst | 19 +++++++++++-------- doc/source/whatsnew/v0.24.0.txt | 2 +- pandas/core/frame.py | 2 ++ pandas/io/parquet.py | 2 ++ 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index b57fdfad551b4..e721889b995f0 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4671,9 +4671,12 @@ underlying engine's default behavior. Partitioning Parquet files '''''''''''''''''''''''''' +.. versionadded:: 0.24.0 + Parquet supports partitioning of data based on the values of one or more columns. .. ipython:: python + :suppress: df = pd.DataFrame({'a': [0, 0, 1, 1], 'b': [0, 1, 0, 1]}) df.to_parquet(fname='test', engine='pyarrow', partition_cols=['a'], compression=None) @@ -4684,15 +4687,15 @@ Columns are partitioned in the order they are given. The partition splits are determined by the unique values in the partition columns. The above example creates a partitioned dataset that may look like: -:: +.. code-block:: text - test/ - a=0/ - 0bac803e32dc42ae83fddfd029cbdebc.parquet - ... - a=1/ - e6ab24a4f45147b49b54a662f0c412a3.parquet - ... + test + ├── a=0 + │ ├── 0bac803e32dc42ae83fddfd029cbdebc.parquet + │ └── ... + └── a=1 + ├── e6ab24a4f45147b49b54a662f0c412a3.parquet + └── ... .. _io.sql: diff --git a/doc/source/whatsnew/v0.24.0.txt b/doc/source/whatsnew/v0.24.0.txt index 09d4695368b27..1e17fee2b7049 100644 --- a/doc/source/whatsnew/v0.24.0.txt +++ b/doc/source/whatsnew/v0.24.0.txt @@ -235,7 +235,7 @@ Other Enhancements - New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`). - Compatibility with Matplotlib 3.0 (:issue:`22790`). - Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`) -- With the pyarrow engine, :func:`~DataFrame.to_parquet` now supports writing a DataFrame as a directory of parquet files partitioned by a subset of the columns. (:issue:`23283`). +- :func:`~DataFrame.to_parquet` now supports writing a ``DataFrame`` as a directory of parquet files partitioned by a subset of the columns, when ``engine = 'pyarrow'``. (:issue:`23283`). - :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`) .. _whatsnew_0240.api_breaking: diff --git a/pandas/core/frame.py b/pandas/core/frame.py index fe4243035e82f..cb8a690a6e191 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -1986,6 +1986,8 @@ def to_parquet(self, fname, engine='auto', compression='snappy', fname : str File path or Root Directory path. Will be used as Root Directory path while writing a partitioned dataset. + + .. versionchanged:: 0.24.0 engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index e9683cf340408..6b31af4d1f64f 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -222,6 +222,8 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, path : str File path or Root Directory path. Will be used as Root Directory path while writing a partitioned dataset. + + .. versionchanged:: 0.24.0 engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` From 514c5c01a56d2f85052cd4ae3a5da5ad8b301fc7 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Tue, 6 Nov 2018 20:03:44 +0530 Subject: [PATCH 13/15] Cleanup file in while generating doc --- doc/source/io.rst | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/doc/source/io.rst b/doc/source/io.rst index e721889b995f0..13828200f61cd 100644 --- a/doc/source/io.rst +++ b/doc/source/io.rst @@ -4676,7 +4676,6 @@ Partitioning Parquet files Parquet supports partitioning of data based on the values of one or more columns. .. ipython:: python - :suppress: df = pd.DataFrame({'a': [0, 0, 1, 1], 'b': [0, 1, 0, 1]}) df.to_parquet(fname='test', engine='pyarrow', partition_cols=['a'], compression=None) @@ -4697,6 +4696,14 @@ The above example creates a partitioned dataset that may look like: ├── e6ab24a4f45147b49b54a662f0c412a3.parquet └── ... +.. ipython:: python + :suppress: + + from shutil import rmtree + try: + rmtree('test') + except Exception: + pass .. _io.sql: From eb86de0be8502a04f52eb491ec09d339920be453 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Fri, 9 Nov 2018 00:04:28 +0530 Subject: [PATCH 14/15] Text changes, Style changes --- doc/source/whatsnew/v0.24.0.txt | 2 +- pandas/io/parquet.py | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/doc/source/whatsnew/v0.24.0.txt b/doc/source/whatsnew/v0.24.0.txt index 1e17fee2b7049..efb850418f0aa 100644 --- a/doc/source/whatsnew/v0.24.0.txt +++ b/doc/source/whatsnew/v0.24.0.txt @@ -235,7 +235,7 @@ Other Enhancements - New attribute :attr:`__git_version__` will return git commit sha of current build (:issue:`21295`). - Compatibility with Matplotlib 3.0 (:issue:`22790`). - Added :meth:`Interval.overlaps`, :meth:`IntervalArray.overlaps`, and :meth:`IntervalIndex.overlaps` for determining overlaps between interval-like objects (:issue:`21998`) -- :func:`~DataFrame.to_parquet` now supports writing a ``DataFrame`` as a directory of parquet files partitioned by a subset of the columns, when ``engine = 'pyarrow'``. (:issue:`23283`). +- :func:`~DataFrame.to_parquet` now supports writing a ``DataFrame`` as a directory of parquet files partitioned by a subset of the columns when ``engine = 'pyarrow'`` (:issue:`23283`) - :meth:`Timestamp.tz_localize`, :meth:`DatetimeIndex.tz_localize`, and :meth:`Series.tz_localize` have gained the ``nonexistent`` argument for alternative handling of nonexistent times. See :ref:`timeseries.timezone_nonexsistent` (:issue:`8917`) .. _whatsnew_0240.api_breaking: diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index 6b31af4d1f64f..f24f2068e1374 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -169,13 +169,12 @@ def write(self, df, path, compression='snappy', index=None, # DeprecationWarning: tostring() is deprecated. # Use tobytes() instead. - if 'partition_on' in kwargs: - if partition_cols is None: - partition_cols = kwargs.pop('partition_on') - else: - raise ValueError("Cannot use both partition_on and " - "partition_cols. Use partition_cols for " - "partitioning data") + if 'partition_on' in kwargs and partition_cols is not None: + raise ValueError("Cannot use both partition_on and " + "partition_cols. Use partition_cols for " + "partitioning data") + elif 'partition_on' in kwargs: + partition_cols = kwargs.pop('partition_on') if partition_cols is not None: kwargs['file_scheme'] = 'hive' From 8b455473733352e10c321d4f08b9fadc286b4c84 Mon Sep 17 00:00:00 2001 From: Anjana S Date: Sat, 10 Nov 2018 11:00:41 +0530 Subject: [PATCH 15/15] added empty line after versionadded --- pandas/core/frame.py | 2 ++ pandas/io/parquet.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/pandas/core/frame.py b/pandas/core/frame.py index cb8a690a6e191..8f96eb73aeb74 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -1988,6 +1988,7 @@ def to_parquet(self, fname, engine='auto', compression='snappy', path while writing a partitioned dataset. .. versionchanged:: 0.24.0 + engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` @@ -2001,6 +2002,7 @@ def to_parquet(self, fname, engine='auto', compression='snappy', the behavior depends on the chosen engine. .. versionadded:: 0.24.0 + partition_cols : list, optional, default None Column names by which to partition the dataset Columns are partitioned in the order they are given diff --git a/pandas/io/parquet.py b/pandas/io/parquet.py index f24f2068e1374..3d72b1ec3a47f 100644 --- a/pandas/io/parquet.py +++ b/pandas/io/parquet.py @@ -223,6 +223,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, while writing a partitioned dataset. .. versionchanged:: 0.24.0 + engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto' Parquet library to use. If 'auto', then the option ``io.parquet.engine`` is used. The default ``io.parquet.engine`` @@ -236,11 +237,13 @@ def to_parquet(df, path, engine='auto', compression='snappy', index=None, engine's default behavior will be used. .. versionadded 0.24.0 + partition_cols : list, optional, default None Column names by which to partition the dataset Columns are partitioned in the order they are given .. versionadded:: 0.24.0 + kwargs Additional keyword arguments passed to the engine """