Skip to content

Commit 68d8c95

Browse files
authored
Pass on file_scheme to fastparquet (dask#2714)
* Pass on file_scheme to fastparquet * test drill cat in dask dataframe * directly columns
1 parent f015b5f commit 68d8c95

File tree

2 files changed

+27
-3
lines changed

2 files changed

+27
-3
lines changed

dask/dataframe/io/parquet.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ def _read_fastparquet(fs, paths, myopen, columns=None, filters=None,
103103

104104
dsk = {(name, i): (_read_parquet_row_group, myopen, pf.row_group_filename(rg),
105105
index_col, all_columns, rg, out_type == Series,
106-
categories, pf.schema, pf.cats, pf.dtypes)
106+
categories, pf.schema, pf.cats, pf.dtypes,
107+
pf.file_scheme)
107108
for i, rg in enumerate(rgs)}
108109

109110
if not dsk:
@@ -131,15 +132,15 @@ def _read_fastparquet(fs, paths, myopen, columns=None, filters=None,
131132

132133

133134
def _read_parquet_row_group(open, fn, index, columns, rg, series, categories,
134-
schema, cs, dt, *args):
135+
schema, cs, dt, scheme, *args):
135136
if not isinstance(columns, (tuple, list)):
136137
columns = (columns,)
137138
series = True
138139
if index and index not in columns:
139140
columns = columns + type(columns)([index])
140141
df, views = _pre_allocate(rg.num_rows, columns, categories, index, cs, dt)
141142
read_row_group_file(fn, rg, columns, categories, schema, cs,
142-
open=open, assign=views)
143+
open=open, assign=views, scheme=scheme)
143144

144145
if series:
145146
return df[df.columns[0]]

dask/dataframe/io/tests/test_parquet.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,3 +513,26 @@ def test_timestamp96(fn):
513513
assert pf._schema[1].type == fastparquet.parquet_thrift.Type.INT96
514514
out = dd.read_parquet(fn).compute()
515515
assert_eq(out, df)
516+
517+
518+
def test_drill_scheme(fn):
519+
N = 5
520+
df1 = pd.DataFrame({c: np.random.random(N)
521+
for i, c in enumerate(['a', 'b', 'c'])})
522+
df2 = pd.DataFrame({c: np.random.random(N)
523+
for i, c in enumerate(['a', 'b', 'c'])})
524+
files = []
525+
for d in ['test_data1', 'test_data2']:
526+
dn = os.path.join(fn, d)
527+
if not os.path.exists(dn):
528+
os.mkdir(dn)
529+
files.append(os.path.join(dn, 'data1.parq'))
530+
531+
fastparquet.write(files[0], df1)
532+
fastparquet.write(files[1], df2)
533+
534+
df = dd.read_parquet(files)
535+
assert 'dir0' in df.columns
536+
out = df.compute()
537+
assert 'dir0' in out
538+
assert (np.unique(out.dir0) == ['test_data1', 'test_data2']).all()

0 commit comments

Comments
 (0)