diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index b4833fead6..1ded0a1750 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -137,6 +137,16 @@ public interface SharedFlow : Flow { * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator * on a shared flow never completes normally. * + * It is guaranteed that, by the time the first suspension happens, [collect] has already subscribed to the + * [SharedFlow] and is eligible for receiving emissions. In particular, the following code will always print `1`: + * ``` + * val flow = MutableSharedFlow() + * launch(start = CoroutineStart.UNDISPATCHED) { + * flow.collect { println(1) } + * } + * flow.emit(1) + * ``` + * * @see [Flow.collect] for implementation and inheritance details. */ override suspend fun collect(collector: FlowCollector): Nothing diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt index cf83a50b0f..19b70032f3 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt @@ -236,4 +236,9 @@ class ShareInTest : TestBase() { assertEquals(239, shared.first()) j.cancel() } + + @Test + fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest { + testSubscriptionByFirstSuspensionInCollect(flowOf(1).stateIn(this@runTest), emit = { }) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt index 98e04f00e8..bb36c0ef9a 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt @@ -818,4 +818,24 @@ class SharedFlowTest : TestBase() { j2.cancelAndJoin() assertEquals(0, flow.subscriptionCount.first()) } + + @Test + fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest { + testSubscriptionByFirstSuspensionInCollect(MutableSharedFlow()) { emit(it) } + } +} + +/** + * Check that, by the time [SharedFlow.collect] suspends for the first time, its subscription is already active. + */ +inline fun> CoroutineScope.testSubscriptionByFirstSuspensionInCollect(flow: T, emit: T.(Int) -> Unit) { + var received = 0 + val job = launch(start = CoroutineStart.UNDISPATCHED) { + flow.collect { + received = it + } + } + flow.emit(1) + assertEquals(1, received) + job.cancel() } diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt index be4f8c536b..a77f091b4f 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt @@ -181,4 +181,9 @@ class StateFlowTest : TestBase() { state.update { it + 3 } assertEquals(5, state.value) } + + @Test + fun testSubscriptionByFirstSuspensionInStateFlow() = runTest { + testSubscriptionByFirstSuspensionInCollect(MutableStateFlow(0)) { value = it; yield() } + } } diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt index d0e76c461e..a27489bb4a 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt @@ -86,4 +86,9 @@ class StateInTest : TestBase() { assertFailsWith { flow.stateIn(CoroutineScope(currentCoroutineContext() + Job() + ceh)) } finish(3) } + + @Test + fun testSubscriptionByFirstSuspensionInStateFlow() = runTest { + testSubscriptionByFirstSuspensionInCollect(flowOf(1).stateIn(this@runTest)) { } + } }