Skip to content

Commit 1d30b31

Browse files
Bring I/O support to Dask for everything supported (#955)
* Bring I/O support to Dask for everything supported * Resovles #940 * Refactor code to be modular so that Dask can use the implementation originally intended for Ray. * Builds on #754 * Remove a large amount of duplicate logic in the Column Store family of readers * Removes unnecessary classes and instead creates anonymous classes that mix-in the necessary components of the readers and call `.read` on the anonymous class. * An interesting performance issue came up with `HDFStore` and the `read_hdf` * Related to pandas-dev/pandas#12236 * With Ray, multiple workers can read hdf5 files, but it is about 4x slower than defaulting to pandas * Dask cannot read the hdf5 files in-parallel without seg-faults * Dask and Ray now support the same I/O * Performance was tested and it was discovered that Dask can be improved by setting `n_workers` to the number of CPUs on the machine. A new issue was created to track the performance tuning: #954
1 parent 898889c commit 1d30b31

File tree

15 files changed

+466
-618
lines changed

15 files changed

+466
-618
lines changed

modin/backends/pandas/parsers.py

+56
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,59 @@ def parse(fname, **kwargs):
123123
pandas_df.dtypes,
124124
partition_columns,
125125
]
126+
127+
128+
class PandasParquetParser(PandasParser):
129+
@staticmethod
130+
def parse(fname, **kwargs):
131+
import pyarrow.parquet as pq
132+
133+
num_splits = kwargs.pop("num_splits", None)
134+
columns = kwargs.get("columns", None)
135+
if num_splits is None:
136+
return pandas.read_parquet(fname, **kwargs)
137+
kwargs["use_pandas_metadata"] = True
138+
df = pq.read_table(fname, **kwargs).to_pandas()
139+
if columns is not None:
140+
df = df[columns]
141+
# Append the length of the index here to build it externally
142+
return _split_result_for_readers(0, num_splits, df) + [len(df.index), df.dtypes]
143+
144+
145+
class PandasHDFParser(PandasParser): # pragma: no cover
146+
@staticmethod
147+
def parse(fname, **kwargs):
148+
kwargs["key"] = kwargs.pop("_key", None)
149+
num_splits = kwargs.pop("num_splits", None)
150+
if num_splits is None:
151+
return pandas.read_hdf(fname, **kwargs)
152+
df = pandas.read_hdf(fname, **kwargs)
153+
# Append the length of the index here to build it externally
154+
return _split_result_for_readers(0, num_splits, df) + [len(df.index), df.dtypes]
155+
156+
157+
class PandasFeatherParser(PandasParser):
158+
@staticmethod
159+
def parse(fname, **kwargs):
160+
from pyarrow import feather
161+
162+
num_splits = kwargs.pop("num_splits", None)
163+
if num_splits is None:
164+
return pandas.read_feather(fname, **kwargs)
165+
df = feather.read_feather(fname, **kwargs)
166+
# Append the length of the index here to build it externally
167+
return _split_result_for_readers(0, num_splits, df) + [len(df.index), df.dtypes]
168+
169+
170+
class PandasSQLParser(PandasParser):
171+
@staticmethod
172+
def parse(sql, con, index_col, **kwargs):
173+
num_splits = kwargs.pop("num_splits", None)
174+
if num_splits is None:
175+
return pandas.read_sql(sql, con, index_col=index_col, **kwargs)
176+
df = pandas.read_sql(sql, con, index_col=index_col, **kwargs)
177+
if index_col is None:
178+
index = len(df)
179+
else:
180+
index = df.index
181+
return _split_result_for_readers(1, num_splits, df) + [index, df.dtypes]

modin/engines/base/io/__init__.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,19 @@
33
from modin.engines.base.io.text.json_reader import JSONReader
44
from modin.engines.base.io.file_reader import FileReader
55
from modin.engines.base.io.text.text_file_reader import TextFileReader
6+
from modin.engines.base.io.column_stores.parquet_reader import ParquetReader
7+
from modin.engines.base.io.column_stores.hdf_reader import HDFReader
8+
from modin.engines.base.io.column_stores.feather_reader import FeatherReader
9+
from modin.engines.base.io.sql.sql_reader import SQLReader
610

7-
__all__ = ["BaseIO", "CSVReader", "JSONReader", "FileReader", "TextFileReader"]
11+
__all__ = [
12+
"BaseIO",
13+
"CSVReader",
14+
"JSONReader",
15+
"FileReader",
16+
"TextFileReader",
17+
"ParquetReader",
18+
"HDFReader",
19+
"FeatherReader",
20+
"SQLReader",
21+
]

modin/engines/base/io/column_stores/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import numpy as np
2+
import pandas
3+
4+
from modin.data_management.utils import compute_chunksize
5+
from modin.engines.base.io.file_reader import FileReader
6+
7+
8+
class ColumnStoreReader(FileReader):
9+
@classmethod
10+
def call_deploy(cls, fname, col_partitions, **kwargs):
11+
from modin.pandas import DEFAULT_NPARTITIONS
12+
13+
return np.array(
14+
[
15+
cls.deploy(
16+
cls.parse,
17+
DEFAULT_NPARTITIONS + 2,
18+
dict(
19+
fname=fname,
20+
columns=cols,
21+
num_splits=DEFAULT_NPARTITIONS,
22+
**kwargs
23+
),
24+
)
25+
for cols in col_partitions
26+
]
27+
).T
28+
29+
@classmethod
30+
def build_partition(cls, partition_ids, row_lengths, column_widths):
31+
return np.array(
32+
[
33+
[
34+
cls.frame_partition_cls(
35+
partition_ids[i][j],
36+
length=row_lengths[i],
37+
width=column_widths[j],
38+
)
39+
for j in range(len(partition_ids[i]))
40+
]
41+
for i in range(len(partition_ids))
42+
]
43+
)
44+
45+
@classmethod
46+
def build_index(cls, partition_ids):
47+
from modin.pandas import DEFAULT_NPARTITIONS
48+
49+
index_len = cls.materialize(partition_ids[-2][0])
50+
index = pandas.RangeIndex(index_len)
51+
index_chunksize = compute_chunksize(
52+
pandas.DataFrame(index=index), DEFAULT_NPARTITIONS, axis=0
53+
)
54+
if index_chunksize > index_len:
55+
row_lengths = [index_len] + [0 for _ in range(DEFAULT_NPARTITIONS - 1)]
56+
else:
57+
row_lengths = [
58+
index_chunksize
59+
if i != DEFAULT_NPARTITIONS - 1
60+
else index_len - (index_chunksize * (DEFAULT_NPARTITIONS - 1))
61+
for i in range(DEFAULT_NPARTITIONS)
62+
]
63+
return index, row_lengths
64+
65+
@classmethod
66+
def build_columns(cls, columns):
67+
from modin.pandas import DEFAULT_NPARTITIONS
68+
69+
column_splits = (
70+
len(columns) // DEFAULT_NPARTITIONS
71+
if len(columns) % DEFAULT_NPARTITIONS == 0
72+
else len(columns) // DEFAULT_NPARTITIONS + 1
73+
)
74+
col_partitions = [
75+
columns[i : i + column_splits]
76+
for i in range(0, len(columns), column_splits)
77+
]
78+
column_widths = [len(c) for c in col_partitions]
79+
return col_partitions, column_widths
80+
81+
@classmethod
82+
def build_dtypes(cls, partition_ids, columns):
83+
# Compute dtypes concatenating the results from each of the columns splits
84+
# determined above. This creates a pandas Series that contains a dtype for every
85+
# column.
86+
dtypes = pandas.concat(cls.materialize(list(partition_ids)), axis=0)
87+
dtypes.index = columns
88+
return dtypes
89+
90+
@classmethod
91+
def build_query_compiler(cls, path, columns, **kwargs):
92+
col_partitions, column_widths = cls.build_columns(columns)
93+
partition_ids = cls.call_deploy(path, col_partitions, **kwargs)
94+
index, row_lens = cls.build_index(partition_ids)
95+
remote_parts = cls.build_partition(partition_ids[:-2], row_lens, column_widths)
96+
dtypes = cls.build_dtypes(partition_ids[-1], columns)
97+
new_query_compiler = cls.query_compiler_cls(
98+
cls.frame_cls(
99+
remote_parts, index, columns, row_lens, column_widths, dtypes=dtypes,
100+
)
101+
)
102+
return new_query_compiler
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader
2+
3+
4+
class FeatherReader(ColumnStoreReader):
5+
@classmethod
6+
def read(cls, path, columns=None, **kwargs):
7+
"""Read a pandas.DataFrame from Feather format.
8+
Ray DataFrame only supports pyarrow engine for now.
9+
10+
Args:
11+
path: The filepath of the feather file.
12+
We only support local files for now.
13+
multi threading is set to True by default
14+
columns: not supported by pandas api, but can be passed here to read only
15+
specific columns
16+
17+
Notes:
18+
pyarrow feather is used. Please refer to the documentation here
19+
https://arrow.apache.org/docs/python/api.html#feather-format
20+
"""
21+
if columns is None:
22+
from pyarrow.feather import FeatherReader
23+
24+
fr = FeatherReader(path)
25+
columns = [fr.get_column_name(i) for i in range(fr.num_columns)]
26+
return cls.build_query_compiler(path, columns, use_threads=False)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import pandas
2+
3+
from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader
4+
from modin.error_message import ErrorMessage
5+
6+
7+
class HDFReader(ColumnStoreReader): # pragma: no cover
8+
@classmethod
9+
def _validate_hdf_format(cls, path_or_buf):
10+
s = pandas.HDFStore(path_or_buf)
11+
groups = s.groups()
12+
if len(groups) == 0:
13+
raise ValueError("No dataset in HDF5 file.")
14+
candidate_only_group = groups[0]
15+
format = getattr(candidate_only_group._v_attrs, "table_type", None)
16+
s.close()
17+
return format
18+
19+
@classmethod
20+
def read(cls, path_or_buf, **kwargs):
21+
"""Load a h5 file from the file path or buffer, returning a DataFrame.
22+
23+
Args:
24+
path: string, buffer or path object
25+
Path to the file to open, or an open :class:`pandas.HDFStore` object.
26+
kwargs: Pass into pandas.read_hdf function.
27+
28+
Returns:
29+
DataFrame constructed from the h5 file.
30+
"""
31+
if cls._validate_hdf_format(path_or_buf=path_or_buf) is None:
32+
ErrorMessage.default_to_pandas(
33+
"File format seems to be `fixed`. For better distribution consider "
34+
"saving the file in `table` format. df.to_hdf(format=`table`)."
35+
)
36+
return cls.single_worker_read(path_or_buf, **kwargs)
37+
38+
columns = kwargs.pop("columns", None)
39+
# Have to do this because of Dask's keyword arguments
40+
kwargs["_key"] = kwargs.pop("key", None)
41+
if not columns:
42+
start = kwargs.pop("start", None)
43+
stop = kwargs.pop("stop", None)
44+
empty_pd_df = pandas.read_hdf(path_or_buf, start=0, stop=0, **kwargs)
45+
if start is not None:
46+
kwargs["start"] = start
47+
if stop is not None:
48+
kwargs["stop"] = stop
49+
columns = empty_pd_df.columns
50+
return cls.build_query_compiler(path_or_buf, columns, **kwargs)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import os
2+
3+
from modin.engines.base.io.column_stores.column_store_reader import ColumnStoreReader
4+
from modin.error_message import ErrorMessage
5+
6+
7+
class ParquetReader(ColumnStoreReader):
8+
@classmethod
9+
def read(cls, path, engine, columns, **kwargs):
10+
"""Load a parquet object from the file path, returning a DataFrame.
11+
Ray DataFrame only supports pyarrow engine for now.
12+
13+
Args:
14+
path: The filepath of the parquet file.
15+
We only support local files for now.
16+
engine: Ray only support pyarrow reader.
17+
This argument doesn't do anything for now.
18+
kwargs: Pass into parquet's read_pandas function.
19+
20+
Notes:
21+
ParquetFile API is used. Please refer to the documentation here
22+
https://arrow.apache.org/docs/python/parquet.html
23+
"""
24+
from pyarrow.parquet import ParquetFile, ParquetDataset
25+
from modin.pandas.io import PQ_INDEX_REGEX
26+
27+
if os.path.isdir(path):
28+
partitioned_columns = set()
29+
directory = True
30+
original_path = path
31+
# We do a tree walk of the path directory because partitioned
32+
# parquet directories have a unique column at each directory level.
33+
# Thus, we can use os.walk(), which does a dfs search, to walk
34+
# through the different columns that the data is partitioned on
35+
for (root, dir_names, files) in os.walk(path):
36+
if dir_names:
37+
partitioned_columns.add(dir_names[0].split("=")[0])
38+
if files:
39+
# Metadata files, git files, .DSStore
40+
if files[0][0] == ".":
41+
continue
42+
path = os.path.join(root, files[0])
43+
break
44+
partitioned_columns = list(partitioned_columns)
45+
if len(partitioned_columns):
46+
ErrorMessage.default_to_pandas("Partitioned Columns in Parquet")
47+
return cls.single_worker_read(
48+
original_path, engine=engine, columns=columns, **kwargs
49+
)
50+
else:
51+
directory = False
52+
53+
if not columns:
54+
if directory:
55+
# Path of the sample file that we will read to get the remaining columns
56+
pd = ParquetDataset(path)
57+
column_names = pd.schema.names
58+
else:
59+
pf = ParquetFile(path)
60+
column_names = pf.metadata.schema.names
61+
columns = [name for name in column_names if not PQ_INDEX_REGEX.match(name)]
62+
return cls.build_query_compiler(path, columns, **kwargs)

modin/engines/base/io/sql/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)