Skip to content

Commit 11f633d

Browse files
authored
Enable non-categorical hive-partition columns in read_parquet (dask#10353)
1 parent 67e6489 commit 11f633d

File tree

2 files changed

+57
-15
lines changed

2 files changed

+57
-15
lines changed

dask/dataframe/io/parquet/arrow.py

+28-15
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ def read_partition(
652652
# to categorigal manually for integer types.
653653
if partitions and isinstance(partitions, list):
654654
for partition in partitions:
655-
if df[partition.name].dtype.name != "category":
655+
if len(partition.keys) and df[partition.name].dtype.name != "category":
656656
# We read directly from fragments, so the partition
657657
# columns are already in our dataframe. We just
658658
# need to convert non-categorical types.
@@ -1123,15 +1123,23 @@ def _collect_dataset_info(
11231123
# names of partitioned columns.
11241124
#
11251125
partition_obj, partition_names = [], []
1126-
if (
1127-
ds.partitioning
1128-
and ds.partitioning.dictionaries
1129-
and all(arr is not None for arr in ds.partitioning.dictionaries)
1130-
):
1126+
if ds.partitioning and ds.partitioning.schema:
11311127
partition_names = list(ds.partitioning.schema.names)
11321128
for i, name in enumerate(partition_names):
1129+
dictionary = (
1130+
ds.partitioning.dictionaries[i]
1131+
if ds.partitioning.dictionaries
1132+
else None
1133+
)
11331134
partition_obj.append(
1134-
PartitionObj(name, ds.partitioning.dictionaries[i].to_pandas())
1135+
PartitionObj(
1136+
name,
1137+
(
1138+
pd.Series([], dtype="object")
1139+
if dictionary is None
1140+
else dictionary.to_pandas()
1141+
),
1142+
)
11351143
)
11361144

11371145
# Check the `aggregate_files` setting
@@ -1229,7 +1237,7 @@ def _create_dd_meta(cls, dataset_info):
12291237
_partitions = [p for p in partitions if p not in physical_column_names]
12301238
if not _partitions:
12311239
partitions = []
1232-
dataset_info["partitions"] = None
1240+
dataset_info["partitions"] = []
12331241
dataset_info["partition_keys"] = {}
12341242
dataset_info["partition_names"] = partitions
12351243
elif len(_partitions) != len(partitions):
@@ -1266,6 +1274,8 @@ def _create_dd_meta(cls, dataset_info):
12661274
if partition_obj:
12671275
# Update meta dtypes for partitioned columns
12681276
for partition in partition_obj:
1277+
if not len(partition.keys):
1278+
continue
12691279
if isinstance(index, list) and partition.name == index[0]:
12701280
# Index from directory structure
12711281
meta.index = pd.CategoricalIndex(
@@ -1766,14 +1776,17 @@ def _read_table(
17661776
for partition in partitions:
17671777
if partition.name not in arrow_table.schema.names:
17681778
# We read from file paths, so the partition
1769-
# columns are NOT in our table yet.
1779+
# columns may NOT be in our table yet.
17701780
cat = keys_dict.get(partition.name, None)
1771-
cat_ind = np.full(
1772-
len(arrow_table), partition.keys.get_loc(cat), dtype="i4"
1773-
)
1774-
arr = pa.DictionaryArray.from_arrays(
1775-
cat_ind, pa.array(partition.keys)
1776-
)
1781+
if not len(partition.keys):
1782+
arr = pa.array(np.full(len(arrow_table), cat))
1783+
else:
1784+
cat_ind = np.full(
1785+
len(arrow_table), partition.keys.get_loc(cat), dtype="i4"
1786+
)
1787+
arr = pa.DictionaryArray.from_arrays(
1788+
cat_ind, pa.array(partition.keys)
1789+
)
17771790
arrow_table = arrow_table.append_column(partition.name, arr)
17781791

17791792
return arrow_table

dask/dataframe/io/tests/test_parquet.py

+29
Original file line numberDiff line numberDiff line change
@@ -4934,3 +4934,32 @@ def test_dtype_backend_categoricals(tmp_path):
49344934
pdf = pd.read_parquet(outdir, engine="pyarrow", dtype_backend="pyarrow")
49354935
# Set sort_results=False because of pandas bug up to 2.0.1
49364936
assert_eq(ddf, pdf, sort_results=PANDAS_GT_202)
4937+
4938+
4939+
@PYARROW_MARK
4940+
@pytest.mark.parametrize("filters", [None, [[("b", "==", "dog")]]])
4941+
def test_non_categorical_partitioning_pyarrow(tmpdir, filters):
4942+
from pyarrow.dataset import partitioning as pd_partitioning
4943+
4944+
df1 = pd.DataFrame({"a": range(100), "b": ["cat", "dog"] * 50})
4945+
ddf1 = dd.from_pandas(df1, npartitions=2)
4946+
ddf1.to_parquet(
4947+
path=tmpdir, partition_on=["b"], write_index=False, engine="pyarrow"
4948+
)
4949+
4950+
schema = pa.schema([("b", pa.string())])
4951+
partitioning = dict(flavor="hive", schema=schema)
4952+
ddf = dd.read_parquet(
4953+
tmpdir,
4954+
dataset={"partitioning": partitioning},
4955+
filters=filters,
4956+
engine="pyarrow",
4957+
)
4958+
pdf = pd.read_parquet(
4959+
tmpdir,
4960+
partitioning=pd_partitioning(**partitioning),
4961+
filters=filters,
4962+
engine="pyarrow",
4963+
)
4964+
assert_eq(ddf, pdf, check_index=False)
4965+
assert ddf["b"].dtype != "category"

0 commit comments

Comments
 (0)