-
Notifications
You must be signed in to change notification settings - Fork 21
Add example of a sklearn like pipeline #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
6d93058
42d454a
33c39c8
3f0ab97
c338eec
d458d3a
c87ebf4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
""" | ||
This is an example of how a (possibly) lazy data frame may be used | ||
in a sklearn-like pipeline. | ||
|
||
The example is motivated by the prospect of a fully lazy, ONNX-based | ||
data frame implementation. The concept is that calls to `fit` are | ||
eager. They compute some state that is later meant to be transferred | ||
into the ONNX graph/model. That transfer happens when calling | ||
`transform`. The logic within the `transform` methods is "traced" | ||
lazily and the resulting lazy object is then exported to ONNX. | ||
""" | ||
from __future__ import annotations | ||
|
||
from typing import Any, TYPE_CHECKING, Self | ||
|
||
from dataframe_api.dataframe_object import DataFrame | ||
|
||
#: Dummy type alias for a standard compliant array object | ||
Array = Any | ||
|
||
|
||
class Scaler: | ||
"""Apply a standardization scaling factor to `column_names`.""" | ||
scalings_: dict[str, float] | ||
|
||
def __init__(self, column_names: list[str]): | ||
self.column_names = column_names | ||
|
||
def fit(self, df: DataFrame) -> Self: | ||
"""Compute scaling factors from given data frame. | ||
|
||
Calling this function requires collecting values. | ||
""" | ||
self.scalings_ = {} | ||
for column_name in df.column_names: | ||
if not column_name in self.column_names: | ||
continue | ||
self.scalings_[column_name] = df.get_column_by_name(column_name).std() | ||
|
||
return self | ||
|
||
def transform(self, df: DataFrame) -> DataFrame: | ||
"""Apply the "trained" scaling values. | ||
|
||
This function is guaranteed to not collect values. | ||
""" | ||
df = copy_df(df) | ||
for column_name in df.column_names: | ||
if not column_name in self.column_names: | ||
continue | ||
column = df.get_column_by_name(column_name) / self.scalings_[column_name] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kkraus14 just to clarify about scalar issue - should this be written as column = df.get_column_by_name(column_name) / float(self.scalings_[column_name]) instead, so that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, that would force materialization unnecessarily. I.E. how this works in cuDF today is that the Adding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for explaining, OK with not using 'float' here Sorry to belabour the point, but the "lazy scalar" here is the part which I'm just not getting A fairly common pattern I used to work with when I did data science was:
E.g.: train a model somewhere, and save some weights. Then, say on a device (like a smart watch), make predictions using that model (and the saved weights) By the time you're doing inference ( So how can something calculated during the I think this is what @cbourjau was getting to as well, saying that for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's an implementation detail of the library in question? The output of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue I have is that I would typically do:
If the "something triggers materialization" happens during What I want to do is:
The way the code is written in this PR achieves this by doing if hasattr(scalings, 'collect'):
scalings = scalings.collect() during the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that's the only option. I think we want a It wouldn't be super ergonomic to write it as This solution is kinda circling back to a small part of Marco's lazy/eager design, without the separate classes or expression objects. Other thought on Polars there: it could also choose to do an optimization pass, where e.g. it could figure out for a block of code with multiple There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
btw yesterday I saw a colleague write this
. If I was -1 on automated collection before, now I'm more like -10 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that in I've added an example of this in #307, as well as a proposal to add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If we have a 1:1 relationship between the dataframe and the array API standard then we have successfully kicked the can down the road (i.e. tech-stack) into the array API standard. Calling The above-described semantics would work well with ONNX as far as the dataframe API is concerned. We are left with the same question of explicit materialization on the array API level, though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm afraid not, sorry: #307 (comment) |
||
df.assign(column) | ||
cbourjau marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return df | ||
|
||
class FeatureSelector: | ||
"""Limit columns to those seen in training including their order.""" | ||
|
||
def fit(self, df: DataFrame) -> Self: | ||
"""Record the observed columns and their order. | ||
|
||
This function is guaranteed to not collect values. | ||
""" | ||
self.columns_ = df.column_names | ||
return self | ||
|
||
def transform(self, df: DataFrame) -> DataFrame: | ||
"""Select and sort the columns as observed in training. | ||
|
||
This function is guaranteed to not collect values. | ||
""" | ||
# FIXME: Does this ensure column order? | ||
cbourjau marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return df.select(self.columns_) | ||
|
||
|
||
class Pipeline: | ||
"""Linear pipeline of transformers.""" | ||
|
||
def __init__(self, steps: list[Any]): | ||
self.steps = steps | ||
|
||
def fit(self, df: DataFrame) -> Self: | ||
"""Call filt on the steps of the pipeline subsequently. | ||
|
||
Calling this function may trigger a collection. | ||
""" | ||
for step in self.steps: | ||
step.fit(df) | ||
|
||
self.steps_ = self.steps | ||
return self | ||
|
||
def transform(self, df: DataFrame) -> DataFrame: | ||
"""Call transform on all steps of this pipeline subsequently. | ||
|
||
This function is guaranteed to not trigger a collection. | ||
""" | ||
for step in self.steps_: | ||
df = step.transform(df) | ||
|
||
return df | ||
|
||
|
||
def copy_df(df: DataFrame): | ||
"""Create a copy of `df`. | ||
|
||
This is done by converting a data frame into standard-arrays and | ||
assembling them back into a new data frame. | ||
""" | ||
cbourjau marked this conversation as resolved.
Show resolved
Hide resolved
|
||
dfx = df.__dataframe_namespace__() | ||
|
||
dct = dataframe_to_dict_of_arrays(df) | ||
return dfx.dataframe_from_dict( | ||
# FIXME: This would require some kind of dtype mapping? | ||
{column_name: dfx.column_from_1d_array(arr, dtype=arr.dtype) for column_name, arr in dct.items()} | ||
) | ||
|
||
|
||
def dataframe_to_dict_of_arrays(df: DataFrame) -> dict[str, Array]: | ||
"""Convert the given data frame into an dictionary of standard arrays. """ | ||
dct = {} | ||
dfx = df.__dataframe_namespace__() | ||
for column_name in df.column_names: | ||
column = df.get_column_by_name(column_name) | ||
dct[column_name] = column.to_array_object(column.dtype) | ||
|
||
return dct |
Uh oh!
There was an error while loading. Please reload this page.