You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This did not deadlock until I commented out the second emission.
Is this by design? The docs don't seem to suggest this.
[emit][MutableSharedFlow.emit] call to such a shared flow suspends until all subscribers receive the emitted value
I haven't received anything from the channel so this shouldn't happen.
I need this behaviour so I need to decide between using a workaround or a re-design?
If this won't be fixed then I'll redesign but if it will be fixed/implemented then I justify using a workaround.
Mini investigation
So API aside, I had a look at the code and the culprit seems to be in SharedFlow.collect.
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
if (newValue !==NO_VALUE) break
awaitValue(slot) // await signal that the new value is available
}
collectorJob?.ensureActive()
collector.emit(newValue asT)
}
.
tryTakeValue/awaitValue "pops" the value out of the buffer for the current subscriber, (which effectively counts as the receipt the docs talked about) then the value is emitted to the collector.
var resumes:Array<Continuation<Unit>?>=EMPTY_RESUMES
val value = synchronized(this) {
val index = tryPeekLocked(slot)
if (index <0) {
NO_VALUE
} else {
val oldIndex = slot.index
val newValue = getPeekedValueLockedAt(index)
slot.index = index +1// points to the next index after peeked one
resumes = updateCollectorIndexLocked(oldIndex)
newValue
}
}
for (resume in resumes) resume?.resume(Unit)
return value
}
In theory, that could be changed to a two step process, one step to peek the value in the buffer (i.e. getPeekedValueLockedAt), then the value is emitted and another step to pop/release the value in the buffer (i.e. slot index increment and updateCollectorIndexLocked).
It means locking twice instead of once but it's probably best to measure than listen to whatever I was about hypothesise here lol.
The text was updated successfully, but these errors were encountered:
I expected this bit of code to deadlock. (The
onEach
is a workaround for #2817)This did not deadlock until I commented out the second emission.
Is this by design? The docs don't seem to suggest this.
I haven't received anything from the channel so this shouldn't happen.
I need this behaviour so I need to decide between using a workaround or a re-design?
If this won't be fixed then I'll redesign but if it will be fixed/implemented then I justify using a workaround.
Mini investigation
So API aside, I had a look at the code and the culprit seems to be in
SharedFlow.collect
.kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Lines 343 to 352 in 65e1b84
tryTakeValue
/awaitValue
"pops" the value out of the buffer for the current subscriber, (which effectively counts as the receipt the docs talked about) then the value is emitted to the collector.Looking at
tryTakeValue
.kotlinx.coroutines/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Lines 593 to 609 in 65e1b84
In theory, that could be changed to a two step process, one step to peek the value in the buffer (i.e.
getPeekedValueLockedAt
), then the value is emitted and another step to pop/release the value in the buffer (i.e. slot index increment andupdateCollectorIndexLocked
).It means locking twice instead of once but it's probably best to measure than listen to whatever I was about hypothesise here lol.
The text was updated successfully, but these errors were encountered: