Skip to content

Convert RxJava Flowable to Flow and elements seem to be delayed or skipped to collect when onErrorReturnItem called. #1766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
bennyhuo opened this issue Jan 18, 2020 · 5 comments
Assignees
Labels

Comments

@bennyhuo
Copy link

Hi, I use asFlow to convert rxjava Flowable to Flow.

suspend fun main() {
    Flowable.create<Int>({ emitter ->
        repeat(5) {
            emitter.onNext(it)
            println("emit: $it")
            Thread.sleep(100)
        }
    }, BackpressureStrategy.BUFFER)
        .onErrorReturnItem(-1)
        .subscribeOn(Schedulers.io())
        .asFlow()
        .collect {
            println("collect: $it")
        }
}

The producer is slower than consumer, but the result is strange:

emit: 0
collect: 0
emit: 1
emit: 2
emit: 3
emit: 4
collect: 1
collect: 2
collect: 3
collect: 4

If I use BackpressureStrategy.LATEST,the consumer will only get 0 and 4 (the first and the last):

emit: 0
collect: 0
emit: 1
emit: 2
emit: 3
emit: 4
collect: 4

I finally figure out that it may be related to the onErrorReturnItem(-1) call. If I comment it out, the result seems as expected:

emit: 0
collect: 0
collect: 1
emit: 1
collect: 2
emit: 2
collect: 3
emit: 3
collect: 4
emit: 4
@qwwdfsad qwwdfsad self-assigned this Jan 22, 2020
@elizarov
Copy link
Contributor

Note, that is also reproduces with other operators (like map) when you put them between Flowable.create { ... } and subscribeOn(...).

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jan 22, 2020

As a temporary workaround, please use asFlow().buffer(5) instead of just asFlow()

@akarnokd
Copy link

Since you are using blocking inside create, you should use .subscribeOn(Schedulers.io(), false) to avoid requests piling up, as documented here.

@qwwdfsad qwwdfsad removed the bug label Jan 23, 2020
@qwwdfsad
Copy link
Collaborator

This is not a bug, but an intended behaviour of RxJava.
As a solution, you can either use .subscribeOn(Schedulers.io(), false) (thanks @akarnokd for the suggestion!) or a larger buffer.

E.g. you can observe the same behaviour using only Rx:

.observeOn(Schedulers.io(), false, 1) // 1 is for buffer size which is by default greater than initial count of blocking emits (5)
.doOnEach { println("Notification") }
.subscribe()

Meanwhile, we will add buffering by default to asFlow integration to both stop requesting values one by one (and get some performance), be more aligned with Reactive primitives (that usually buffer by default) and mask problems like this in a trivial cases

qwwdfsad added a commit that referenced this issue Jan 27, 2020
…nts one by one in a default configuration

Also, partially masks #1766
qwwdfsad added a commit that referenced this issue Jan 28, 2020
…nts one by one in a default configuration

Also, partially masks #1766
@qwwdfsad
Copy link
Collaborator

Behaves as expected, additional buffering was introduced in 1.3.4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants