You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We have a test that consumes a flow which does a Thread.sleep() to simulate potentially blocking calls between flow emit() calls. The test awaits the first emission before triggering further steps in the test. Using coroutines 1.3.3, that await finishes relatively quickly (~150ms), but using coroutines 1.3.4 it takes ~1.4 seconds. Looking into it, it appears that the issue comes from using flow.asPublisher().asFlow() (we know not recommended, but have not yet removed that pattern from our code), but can be avoided by adding a .flowOn(Dispatchers.IO)before converting to Publisher.
It is again the problem of .asPublisher().asFlow() chain, all meaningful context information is lost during this transition, so we can't really help here.
The "issue" was introduced in #1766 by adding request batching, so the fix is straightforward: replace flow.asPublisher().asFlow() with flow.asPublisher().asFlow().buffer(1).
When using flow { emit(computation()) }.asPublisher(), computation() is computed in the same thread as the corresponding Subscriber.onNext with all the implying restrictions.
One of these restrictions is "do not block in non-blocking threads" and calling Thread.sleep(20) is exactly the prohibited blocking. All blocking and heavy-lifting operations should be moved to the separate dispatcher/scheduler. It is hard to operate correctly (e.g. without visible slowdowns) when such contracts are broken
Both flow.flowOn(Dispatchers.IO).asPublisher() and flow.asPublisher().subscribeOn(Schedulers.io() will do the trick.
We have a test that consumes a flow which does a Thread.sleep() to simulate potentially blocking calls between flow emit() calls. The test awaits the first emission before triggering further steps in the test. Using coroutines 1.3.3, that await finishes relatively quickly (~150ms), but using coroutines 1.3.4 it takes ~1.4 seconds. Looking into it, it appears that the issue comes from using
flow.asPublisher().asFlow()
(we know not recommended, but have not yet removed that pattern from our code), but can be avoided by adding a.flowOn(Dispatchers.IO)
before converting toPublisher
.A minimal testcase that reproduces the issue is here: https://gist.github.com/josephlbarnett/1ebd20d5c3cb3684238a6d4d6f81574c -- unmodified the await() call is slow. By adding the flowOn() or removing the Flow -> Publisher -> Flow conversion, the await() call is fast.
( Possibly caused by the fix for #1825 )
The text was updated successfully, but these errors were encountered: