Skip to content

.asPublisher().asFlow() slowness introduced in 1.3.4 #1860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
josephlbarnett opened this issue Mar 11, 2020 · 2 comments
Closed

.asPublisher().asFlow() slowness introduced in 1.3.4 #1860

josephlbarnett opened this issue Mar 11, 2020 · 2 comments
Assignees

Comments

@josephlbarnett
Copy link

We have a test that consumes a flow which does a Thread.sleep() to simulate potentially blocking calls between flow emit() calls. The test awaits the first emission before triggering further steps in the test. Using coroutines 1.3.3, that await finishes relatively quickly (~150ms), but using coroutines 1.3.4 it takes ~1.4 seconds. Looking into it, it appears that the issue comes from using flow.asPublisher().asFlow() (we know not recommended, but have not yet removed that pattern from our code), but can be avoided by adding a .flowOn(Dispatchers.IO)before converting to Publisher.

A minimal testcase that reproduces the issue is here: https://gist.github.com/josephlbarnett/1ebd20d5c3cb3684238a6d4d6f81574c -- unmodified the await() call is slow. By adding the flowOn() or removing the Flow -> Publisher -> Flow conversion, the await() call is fast.

( Possibly caused by the fix for #1825 )

@qwwdfsad qwwdfsad self-assigned this Mar 12, 2020
@qwwdfsad qwwdfsad added the flow label Mar 12, 2020
@qwwdfsad
Copy link
Collaborator

Thanks for the self-contained repro!

It is again the problem of .asPublisher().asFlow() chain, all meaningful context information is lost during this transition, so we can't really help here.

The "issue" was introduced in #1766 by adding request batching, so the fix is straightforward: replace
flow.asPublisher().asFlow() with flow.asPublisher().asFlow().buffer(1).

When using flow { emit(computation()) }.asPublisher(), computation() is computed in the same thread as the corresponding Subscriber.onNext with all the implying restrictions.
One of these restrictions is "do not block in non-blocking threads" and calling Thread.sleep(20) is exactly the prohibited blocking. All blocking and heavy-lifting operations should be moved to the separate dispatcher/scheduler. It is hard to operate correctly (e.g. without visible slowdowns) when such contracts are broken
Both flow.flowOn(Dispatchers.IO).asPublisher() and flow.asPublisher().subscribeOn(Schedulers.io() will do the trick.

@josephlbarnett
Copy link
Author

Makes sense. our production code is at least already doing flowOn(Dispatchers.IO) so have updated our test to do the same. Thanks for the explanation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants