Skip to content

SharedFlow, buffer and produceIn fusion bug. #2817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Dominaezzz opened this issue Jul 10, 2021 · 3 comments
Open

SharedFlow, buffer and produceIn fusion bug. #2817

Dominaezzz opened this issue Jul 10, 2021 · 3 comments

Comments

@Dominaezzz
Copy link
Contributor

val stream = MutableSharedFlow<Unit>()
stream
//  .onEach {} // workaround
    .buffer(0)
    .produceIn(GlobalScope)

yield() // To make sure subscription has started.

var i = 0
while (true) {
    stream.emit(Unit)
    println("Emitted $i")
    i++
}

Without the workaround, there are 64 emissions before suspension happens. There should be at most 1 emissions.

The bug is either in SharedFlow fusion or in this function that produceIn uses.

@linean
Copy link

linean commented Sep 9, 2021

I have similar issue when mixing channelFlow, buffer and shareIn. When I remove shareIn or use flow builder I can see only one emission. Otherwise send does not suspend.

val flow = channelFlow { // if I use flow{} builder buffer works as expected
    var i = 0
    repeat(100) {
        send("$i")
        println("send: ${++i}")
    }
}

val job = flow
    .buffer(capacity = 0)
    .shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0) // if I remove shareIn buffer works as expected
    .onEach { delay(1000) }
    .take(1)
    .launchIn(GlobalScope)

runBlocking {
    job.join()
}

@linean
Copy link

linean commented Sep 10, 2021

I did more testing and maybe I just don't understand how buffer + shareIn + WhileSubscribed should behave 🤔
Docs says:

buffer(0).shareIn(scope, started, 0) - ... Effectively, it configures sequential processing between the upstream emitter and subscribers ...

but based on tests processing is not sequential

@DelicateCoroutinesApi
@ExperimentalCoroutinesApi
class FlowBufferTest {

    @Test
    fun `channelFlow with no buffer and shareIn suspends until collector is ready`() {
        var sendCount = 0

        channelFlow {
            while (true) {
                send(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    @Test
    fun `channelFlow with no buffer suspends until collector is ready`() {
        var sendCount = 0

        channelFlow {
            while (true) {
                send(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    @Test
    fun `flow with no buffer suspends until collector is ready`() {
        var sendCount = 0

        flow {
            while (true) {
                emit(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    @Test
    fun `flow with no buffer and shareIn suspends until collector is ready`() {
        var sendCount = 0

        flow {
            while (true) {
                emit(Unit)
                sendCount++
            }
        }.buffer(capacity = 0)
            .shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0)
            .takeOne()

        assertEquals(1, sendCount)
    }

    private fun Flow<*>.takeOne() = runBlocking {
        onEach { delay(500) }
            .take(1)
            .collect()
    }
}

@ajmalk
Copy link

ajmalk commented Dec 2, 2022

I just hit this bug too. These phantom buffers are huge annoyance and I really wish the default behavior was no buffer. Bugs in the implementation like this makes it even worse because we can't even trust that there won't be a buffer even if we explicitly set the buffer to zero.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants