Skip to content

Add DataFrame.persist, and notes on execution model #307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Nov 10, 2023
Merged
23 changes: 23 additions & 0 deletions spec/API_specification/dataframe_api/dataframe_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ def dataframe(self) -> SupportsDataFrameAPI:
def shape(self) -> tuple[int, int]:
"""
Return number of rows and number of columns.

Notes
-----
To be guaranteed to run across all implementations, :meth:`maybe_execute` should
be executed at some point before calling this method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that all operations are potentially (e.g. in a polars-based implementation) eager after a call to maybe_execute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right, though Column would still be backed by an expression (which is lazy), but the parent dataframe would be eager. you can try this out with https://github.com/data-apis/dataframe-api-compat

"""
...

Expand Down Expand Up @@ -928,6 +933,9 @@ def to_array(self, dtype: DType) -> Any:
may choose to return a numpy array (for numpy prior to 2.0), with the
understanding that consuming libraries would then use the
``array-api-compat`` package to convert it to a Standard-compliant array.

To be guaranteed to run across all implementations, :meth:`maybe_execute` should
be executed at some point before calling this method.
"""

def join(
Expand Down Expand Up @@ -972,3 +980,18 @@ def join(
present in both `self` and `other`.
"""
...

def maybe_execute(self) -> Self:
"""
Hint that execution may be triggered, depending on the implementation.

This is intended as a hint, rather than as a directive. Implementations
which do not separate lazy vs eager execution may ignore this method and
treat it as a no-op. Likewise for implementations which support automated
execution.

.. note::
This method may force execution. If necessary, it should be called
at most once per dataframe, and as late as possible in the pipeline.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "at most once" rather than "as few times as possible"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you're using it multiple times, then you're potentially re-executing things

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but there are reasonable cases where that would be what you want, are there not? For example, you may want to collect a dataframe, filter it further in a lazy manner, and then collect it again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure but why would you collect it before filtering?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit of a constructed example, but maybe you want to do computations on the entire dataframe and also on some subset of it. It would make sense to collect just prior to the first computation on the entire frame so that whatever came before it doesn't have to be recomputed when doing the computation on the subset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like

df: DataFrame
df = df.persist()
sub_df_1 = df.filter(df.col('a') > 0)
sub_df_2 = df.filter(df.col('a') <= 0)
features_1= []
for column_name in sub_df_1.column_names:
    if sub_df_1.col(column_name).std() > 0:
        features_1.append(column_name)
features_2= []
for column_name in sub_df_2.column_names:
    if sub_df_2.col(column_name).std() > 0:
        features_2.append(column_name)

?

You'd still just be calling it once per dataframe - could you show an example of where you'd want to call it twice for the same dataframe?

"""
...
83 changes: 83 additions & 0 deletions spec/design_topics/execution_model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Execution model

The vast majority of the Dataframe API is designed to be agnostic of the
underlying execution model.

However, there are some methods which, depending on the implementation, may
not be supported in some cases.

For example, let's consider the following:
```python
df: DataFrame
features = []
for column_name in df.column_names:
if df.col(column_name).std() > 0:
features.append(column_name)
return features
```
If `df` is a lazy dataframe, then the call `df.col(column_name).std() > 0` returns
a (ducktyped) Python boolean scalar. No issues so far. Problem is,
what happens when `if df.col(column_name).std() > 0` is called?

Under the hood, Python will call `(df.col(column_name).std() > 0).__bool__()` in
order to extract a Python boolean. This is a problem for "lazy" implementations,
as the laziness needs breaking in order to evaluate the above.

Dask and Polars both require that `.compute` (resp. `.collect`) be called beforehand
for such an operation to be executed:
```python
In [1]: import dask.dataframe as dd

In [2]: pandas_df = pd.DataFrame({"x": [1, 2, 3], "y": 1})

In [3]: df = dd.from_pandas(pandas_df, npartitions=2)

In [4]: scalar = df.x.std() > 0

In [5]: if scalar:
...: print('scalar is positive')
...:
---------------------------------------------------------------------------
[...]

TypeError: Trying to convert dd.Scalar<gt-bbc3..., dtype=bool> to a boolean value. Because Dask objects are lazily evaluated, they cannot be converted to a boolean value or used in boolean conditions like if statements. Try calling .compute() to force computation prior to converting to a boolean value or using in a conditional statement.
```

The Dataframe API has a `DataFrame.maybe_evaluate` for addressing the above. We can use it to rewrite the code above
as follows:
```python
df: DataFrame
df = df.maybe_execute()
features = []
for column_name in df.column_names:
if df.col(column_name).std() > 0:
features.append(column_name)
return features
```

Note that `maybe_evaluate` is to be interpreted as a hint, rather than as a directive -
the implementation itself may decide
whether to force execution at this step, or whether to defer it to later.
For example, a dataframe which can convert to a lazy array could decide to ignore
`maybe_evaluate` when evaluting `DataFrame.to_array` but to respect it when evaluating
`float(Column.std())`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, maybe_evaluate may do:

  • Nothing at all
  • Nothing at this point but allows later collections when the backend thinks it is expedient
  • An immediate collection

What happens to subsequent calls to df assuming that a collection did take place? Are they eager or lazy?

df: DataFrame
column_name: str
df = df.maybe_execute()
col = df.col(column_name)
filtered_col = col.filter(col > 42)  # is this computation eager now?
filtered_col.std()

Copy link
Contributor Author

@MarcoGorelli MarcoGorelli Nov 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this computation eager now?

It's implementation-dependent. It can stay lazy

What really matters is when you do

bool(filtered_col.std())

(which you might trigger via if filtered_col.std() > 0 - at that point:

  • if maybe_execute wasn't called previously, this is unsupported by the standard and may vary across implementations
  • if maybe_execute was called, then libraries supporting eager evaluation should return a result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would you want the first option rather than an implicit default for the latter behavior? It seems rather obvious that bool(filtered_col.std()) needs to materialize something so it is hardly a surprise to the user at this point. Sure, the user may want to strategically place a maybe_execute earlier for performance reasons, but why introduce undefined behavior if they don't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to introduce undefined behaviour, I just mean that it's undefined by the Dataframe API - the Standard makes no guarantee of what will happen there


Operations which require `DataFrame.maybe_execute` to have been called at some prior
point are:
- `DataFrame.to_array`
- `DataFrame.shape`
- calling `bool`, `int`, or `float` on a scalar

Note now `DataFrame.maybe_execute` is called only once, and as late as possible.
Conversely, the "wrong" way to execute the above would be:

```python
df: DataFrame
features = []
for column_name in df.column_names:
# Do NOT do this!
if df.maybe_execute().col(column_name).std() > 0:
features.append(column_name)
return features
```
as that will potentially re-trigger the same execution multiple times.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there are no guarantees here, are there? Given that maybe_execute still allows for deferred execution the backend may still re-trigger the same execution multiple times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you're right, that's an issue with the suggestion I put in #307 (comment)

not sure what to suggest, will think about this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there's two cases that really need addressing:

  • bool(scalar) requires computation in all cases
  • to_array only requires computation in some cases

1 change: 1 addition & 0 deletions spec/design_topics/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ Design topics & constraints
backwards_compatibility
data_interchange
python_builtin_types
execution_model