Skip to content

MutableSharedFlow rendezvous #2818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Dominaezzz opened this issue Jul 10, 2021 · 0 comments
Open

MutableSharedFlow rendezvous #2818

Dominaezzz opened this issue Jul 10, 2021 · 0 comments

Comments

@Dominaezzz
Copy link
Contributor

I expected this bit of code to deadlock. (The onEach is a workaround for #2817)

val stream = MutableSharedFlow<Unit>()
stream/* .onEach {} */.buffer(Channel.RENDEZVOUS).produceIn(this)
yield()
stream.emit(Unit)
// stream.emit(Unit)

This did not deadlock until I commented out the second emission.

Is this by design? The docs don't seem to suggest this.

[emit][MutableSharedFlow.emit] call to such a shared flow suspends until all subscribers receive the emitted value

I haven't received anything from the channel so this shouldn't happen.

I need this behaviour so I need to decide between using a workaround or a re-design?
If this won't be fixed then I'll redesign but if it will be fixed/implemented then I justify using a workaround.

Mini investigation

So API aside, I had a look at the code and the culprit seems to be in SharedFlow.collect.

while (true) {
var newValue: Any?
while (true) {
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
if (newValue !== NO_VALUE) break
awaitValue(slot) // await signal that the new value is available
}
collectorJob?.ensureActive()
collector.emit(newValue as T)
}
.

tryTakeValue/awaitValue "pops" the value out of the buffer for the current subscriber, (which effectively counts as the receipt the docs talked about) then the value is emitted to the collector.

Looking at tryTakeValue.

private fun tryTakeValue(slot: SharedFlowSlot): Any? {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val value = synchronized(this) {
val index = tryPeekLocked(slot)
if (index < 0) {
NO_VALUE
} else {
val oldIndex = slot.index
val newValue = getPeekedValueLockedAt(index)
slot.index = index + 1 // points to the next index after peeked one
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
for (resume in resumes) resume?.resume(Unit)
return value
}

In theory, that could be changed to a two step process, one step to peek the value in the buffer (i.e. getPeekedValueLockedAt), then the value is emitted and another step to pop/release the value in the buffer (i.e. slot index increment and updateCollectorIndexLocked).

It means locking twice instead of once but it's probably best to measure than listen to whatever I was about hypothesise here lol.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants