Skip to content

ENH: parallel support in .apply #13111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jreback opened this issue May 7, 2016 · 8 comments
Closed

ENH: parallel support in .apply #13111

jreback opened this issue May 7, 2016 · 8 comments
Labels
API Design Enhancement Performance Memory or execution speed performance

Comments

@jreback
Copy link
Contributor

jreback commented May 7, 2016

xref #5751

questions from SO.
mrocklins nice example of using .apply

So here is an example of how to do a parallel apply using dask. This could be baked into .apply() in pandas by the following signature enhancement:

current:

DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, 
                args=(), **kwds)

proposed:

DataFrame.apply(self, func, axis=0, broadcast=False, raw=False, reduce=None, 
                engine=None, chunksize=None, args=(), **kwds)

where engine='dask' (or numba at some point) are possibilities
chunksize would map directly to npartitions and default to the number of cores if not specified.
further would allow engine to be a meta object like Dask(scheduler='multiprocessing') to support other options one would commonly pass (could also move chunksize inside that instead of as a separate object).

impl and timings:

from functools import partial
import pandas as pd
import dask
import dask.dataframe as dd
from dask import threaded, multiprocessing
from time import sleep

pd.__version__
dask.__version__

def make_frame(N):
    return pd.DataFrame({'A' : range(N)})
def slow_func(x):
    sleep(0.5)
    return x
df = make_frame(40)

# reg apply
def f1(df):
    return df.apply(slow_func, axis=1)
# dask apply
def f2(df, get):
    ddf = dd.from_pandas(df, npartitions=8, sort=False)
    return ddf.apply(slow_func, columns=df.columns, axis=1).compute(get=get)

f1 = partial(f1, df)
f2_threaded = partial(f2, df, threaded.get)
f2_multi = partial(f2, df, multiprocessing.get)

result1 = f1()
result2 = f2_threaded()
result3 = f2_multi()
In [18]: result1.equals(result2)
Out[18]: True

In [19]: result1.equals(result3)
Out[19]: True

In [20]: %timeit -n 1 -r 1 f1()
1 loop, best of 1: 20.6 s per loop

In [21]: %timeit -n 1 -r 1 f2_threaded()
1 loop, best of 1: 3.03 s per loop

In [22]: %timeit -n 1 -r 1 f2_multi()
1 loop, best of 1: 3.07 s per loop

Now for some caveats.

People want to parallelize a poor implementation. Generally you proceed thru the following steps first:

  • get your problem correct; optimizing incorrect results is useless
  • profile profile profile. This is always the first thing to do
  • use built-in pandas / numpy vectorized routines
  • use cython or numba on the user defined function
  • .apply is always the last choice
  • if its still not enough, parallelizaton.

You always want to make code simpler, not more complex. Its hard to know a-priori where bottlenecks are. People think .apply is some magical thing, its NOT, its JUST A FOR LOOP. The problem is people tend to throw in the kitchen sink, and just everything, which is just a terrible idea.

Ok my 2c about optimizing things.

In order for parallelization to actually matter the function you are computing should take some non-trivial amount of time to things like:

  • iteration costs of the loop
  • serialization time (esp if using multi-processing / distributed computing)
  • does the function release the GIL if not, then threading will probably not help much
  • development resources (your time)

If these criteria are met, then sure give it a try.

I think providing pandas a first class way to parallelize things, even tough people will just naively use it is probably not a bad thing.

Further extensions to this are: to_dask() (return a dask.dataframe to the user directly), and engine='dask' syntax for .groupby()

@jreback jreback added this to the 0.18.2 milestone May 7, 2016
@jreback
Copy link
Contributor Author

jreback commented May 7, 2016

@jreback
Copy link
Contributor Author

jreback commented May 7, 2016

cc @mcg1969

@mcg1969
Copy link

mcg1969 commented May 7, 2016

I too worry about premature parallelization, but I'm not sure how you prevent that. I think I'd rather try and find ways to encourage Numba compilation (and if there are missing Numba features preventing that from being effective, address them). At least then you could engage target=parallel and get multicore apply.

@mrocklin
Copy link
Contributor

mrocklin commented May 7, 2016

As a reminder, here is the recipe for a simple convert-apply-convert with minimal overhead

for dask > 0.8.2
out = dd.from_pandas(df, npartitions=..., sort=False, name='x').apply(udf).compute()

@jorisvandenbossche jorisvandenbossche modified the milestones: Next Major Release, 0.19.0 Aug 13, 2016
@rtkaleta
Copy link
Contributor

rtkaleta commented Mar 3, 2017

For those also wondering the dd above is from import dask.dataframe as dd.

You must also pass axis=1 in the call to apply because at present dask only supports applying functions to each row. Consider specifying appropriate meta too.

@enlighter
Copy link

What kind of effort would be required exactly to implement this feature?

I think if someone gives a fairly detailed answer to that question, it should help all concerned people in the community.

@TomAugspurger
Copy link
Contributor

@enlighter does the dask-based solution work for you? Properly supporting this in pandas would be a decent amount of effort. Dask already takes care of much of that.

@jreback
Copy link
Contributor Author

jreback commented Jan 1, 2020

closing this for now. if a specific soln is wanted by the community could open an issue that.

@jreback jreback closed this as completed Jan 1, 2020
@jreback jreback modified the milestones: Contributions Welcome, No action Jan 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
API Design Enhancement Performance Memory or execution speed performance
Projects
None yet
Development

No branches or pull requests

8 participants