Skip to content

Commit 38bf178

Browse files
jorisvandenbosschekszucs
authored andcommitted
ARROW-7413: [Python] Expose and test the partioning discovery
https://issues.apache.org/jira/browse/ARROW-7413 Closes #6151 from jorisvandenbossche/ARROW-7413-partition-discovery and squashes the following commits: 0a2e215 <Joris Van den Bossche> ARROW-7413: Expose and test the partioning discovery Authored-by: Joris Van den Bossche <[email protected]> Signed-off-by: Krisztián Szűcs <[email protected]>
1 parent 831a706 commit 38bf178

File tree

4 files changed

+79
-4
lines changed

4 files changed

+79
-4
lines changed

python/pyarrow/_dataset.pyx

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,16 @@ cdef class PartitionSchemeDiscovery:
128128
def __init__(self):
129129
_forbid_instantiation(self.__class__)
130130

131-
@staticmethod
132-
cdef wrap(const shared_ptr[CPartitionSchemeDiscovery]& sp):
133-
cdef PartitionSchemeDiscovery self
134-
self = PartitionSchemeDiscovery()
131+
cdef init(self, const shared_ptr[CPartitionSchemeDiscovery]& sp):
135132
self.wrapped = sp
136133
self.discovery = sp.get()
134+
135+
@staticmethod
136+
cdef wrap(const shared_ptr[CPartitionSchemeDiscovery]& sp):
137+
cdef PartitionSchemeDiscovery self = PartitionSchemeDiscovery.__new__(
138+
PartitionSchemeDiscovery
139+
)
140+
self.init(sp)
137141
return self
138142

139143
cdef inline shared_ptr[CPartitionSchemeDiscovery] unwrap(self):
@@ -196,6 +200,29 @@ cdef class SchemaPartitionScheme(PartitionScheme):
196200
PartitionScheme.init(self, sp)
197201
self.schema_scheme = <CSchemaPartitionScheme*> sp.get()
198202

203+
@staticmethod
204+
def discover(field_names):
205+
"""
206+
Discover a SchemaPartitionScheme.
207+
208+
Parameters
209+
----------
210+
field_names : list of str
211+
The names to associate with the values from the subdirectory names.
212+
213+
Returns
214+
-------
215+
PartionSchemeDiscovery
216+
To be used in the FileSystemDiscoveryOptions.
217+
"""
218+
cdef:
219+
PartitionSchemeDiscovery discovery
220+
vector[c_string] c_field_names
221+
c_field_names = [tobytes(s) for s in field_names]
222+
discovery = PartitionSchemeDiscovery.wrap(
223+
CSchemaPartitionScheme.MakeDiscovery(c_field_names))
224+
return discovery
225+
199226

200227
cdef class HivePartitionScheme(PartitionScheme):
201228
"""
@@ -244,6 +271,22 @@ cdef class HivePartitionScheme(PartitionScheme):
244271
PartitionScheme.init(self, sp)
245272
self.hive_scheme = <CHivePartitionScheme*> sp.get()
246273

274+
@staticmethod
275+
def discover():
276+
"""
277+
Discover a HivePartitionScheme.
278+
279+
Returns
280+
-------
281+
PartionSchemeDiscovery
282+
To be used in the FileSystemDiscoveryOptions.
283+
"""
284+
cdef:
285+
PartitionSchemeDiscovery discovery
286+
discovery = PartitionSchemeDiscovery.wrap(
287+
CHivePartitionScheme.MakeDiscovery())
288+
return discovery
289+
247290

248291
cdef class FileSystemDiscoveryOptions:
249292
"""

python/pyarrow/dataset.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
OrExpression,
4646
ParquetFileFormat,
4747
PartitionScheme,
48+
PartitionSchemeDiscovery,
4849
ScalarExpression,
4950
Scanner,
5051
ScannerBuilder,

python/pyarrow/includes/libarrow_dataset.pxd

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,10 +301,15 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
301301
cdef cppclass CSchemaPartitionScheme \
302302
"arrow::dataset::SchemaPartitionScheme"(CPartitionScheme):
303303
CSchemaPartitionScheme(shared_ptr[CSchema] schema)
304+
@staticmethod
305+
shared_ptr[CPartitionSchemeDiscovery] MakeDiscovery(
306+
vector[c_string] field_names)
304307

305308
cdef cppclass CHivePartitionScheme \
306309
"arrow::dataset::HivePartitionScheme"(CPartitionScheme):
307310
CHivePartitionScheme(shared_ptr[CSchema] schema)
311+
@staticmethod
312+
shared_ptr[CPartitionSchemeDiscovery] MakeDiscovery()
308313

309314
cdef cppclass CPartitionSchemeOrDiscovery \
310315
"arrow::dataset::PartitionSchemeOrDiscovery":

python/pyarrow/tests/test_dataset.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,3 +352,29 @@ def test_file_system_discovery(mockfs, paths_or_selector):
352352
assert isinstance(table, pa.Table)
353353
assert len(table) == 10
354354
assert table.num_columns == 4
355+
356+
357+
def test_partition_scheme_discovery(mockfs):
358+
paths_or_selector = fs.FileSelector('subdir', recursive=True)
359+
format = ds.ParquetFileFormat()
360+
361+
options = ds.FileSystemDiscoveryOptions('subdir')
362+
schema_discovery = ds.SchemaPartitionScheme.discover(['group', 'key'])
363+
assert isinstance(schema_discovery, ds.PartitionSchemeDiscovery)
364+
options.partition_scheme_discovery = schema_discovery
365+
366+
discovery = ds.FileSystemDataSourceDiscovery(
367+
mockfs, paths_or_selector, format, options
368+
)
369+
inspected_schema = discovery.inspect()
370+
# i64/f64 from data, group/key from "/1/xxx" and "/2/yyy" paths
371+
expected_schema = pa.schema([
372+
("i64", pa.int64()),
373+
("f64", pa.float64()),
374+
("group", pa.int32()),
375+
("key", pa.string()),
376+
])
377+
assert inspected_schema.remove_metadata().equals(expected_schema)
378+
379+
hive_discovery = ds.HivePartitionScheme.discover()
380+
assert isinstance(hive_discovery, ds.PartitionSchemeDiscovery)

0 commit comments

Comments
 (0)