-
Notifications
You must be signed in to change notification settings - Fork 1.9k
SharedFlow, buffer and produceIn fusion bug. #2817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I have similar issue when mixing val flow = channelFlow { // if I use flow{} builder buffer works as expected
var i = 0
repeat(100) {
send("$i")
println("send: ${++i}")
}
}
val job = flow
.buffer(capacity = 0)
.shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0) // if I remove shareIn buffer works as expected
.onEach { delay(1000) }
.take(1)
.launchIn(GlobalScope)
runBlocking {
job.join()
} |
I did more testing and maybe I just don't understand how
but based on tests processing is not sequential @DelicateCoroutinesApi
@ExperimentalCoroutinesApi
class FlowBufferTest {
@Test
fun `channelFlow with no buffer and shareIn suspends until collector is ready`() {
var sendCount = 0
channelFlow {
while (true) {
send(Unit)
sendCount++
}
}.buffer(capacity = 0)
.shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0)
.takeOne()
assertEquals(1, sendCount)
}
@Test
fun `channelFlow with no buffer suspends until collector is ready`() {
var sendCount = 0
channelFlow {
while (true) {
send(Unit)
sendCount++
}
}.buffer(capacity = 0)
.takeOne()
assertEquals(1, sendCount)
}
@Test
fun `flow with no buffer suspends until collector is ready`() {
var sendCount = 0
flow {
while (true) {
emit(Unit)
sendCount++
}
}.buffer(capacity = 0)
.takeOne()
assertEquals(1, sendCount)
}
@Test
fun `flow with no buffer and shareIn suspends until collector is ready`() {
var sendCount = 0
flow {
while (true) {
emit(Unit)
sendCount++
}
}.buffer(capacity = 0)
.shareIn(GlobalScope, SharingStarted.WhileSubscribed(), 0)
.takeOne()
assertEquals(1, sendCount)
}
private fun Flow<*>.takeOne() = runBlocking {
onEach { delay(500) }
.take(1)
.collect()
}
} |
I just hit this bug too. These phantom buffers are huge annoyance and I really wish the default behavior was no buffer. Bugs in the implementation like this makes it even worse because we can't even trust that there won't be a buffer even if we explicitly set the buffer to zero. |
Without the workaround, there are 64 emissions before suspension happens. There should be at most 1 emissions.
The bug is either in
SharedFlow
fusion or in this function thatproduceIn
uses.The text was updated successfully, but these errors were encountered: