Skip to content

Commit fd5a58b

Browse files
authored
Document that SharedFlow.collect subscribes by its first suspension (#3885)
1 parent 748ace5 commit fd5a58b

File tree

5 files changed

+45
-0
lines changed

5 files changed

+45
-0
lines changed

kotlinx-coroutines-core/common/src/flow/SharedFlow.kt

+10
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,16 @@ public interface SharedFlow<out T> : Flow<T> {
137137
* **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator
138138
* on a shared flow never completes normally.
139139
*
140+
* It is guaranteed that, by the time the first suspension happens, [collect] has already subscribed to the
141+
* [SharedFlow] and is eligible for receiving emissions. In particular, the following code will always print `1`:
142+
* ```
143+
* val flow = MutableSharedFlow<Int>()
144+
* launch(start = CoroutineStart.UNDISPATCHED) {
145+
* flow.collect { println(1) }
146+
* }
147+
* flow.emit(1)
148+
* ```
149+
*
140150
* @see [Flow.collect] for implementation and inheritance details.
141151
*/
142152
override suspend fun collect(collector: FlowCollector<T>): Nothing

kotlinx-coroutines-core/common/test/flow/sharing/ShareInTest.kt

+5
Original file line numberDiff line numberDiff line change
@@ -236,4 +236,9 @@ class ShareInTest : TestBase() {
236236
assertEquals(239, shared.first())
237237
j.cancel()
238238
}
239+
240+
@Test
241+
fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest {
242+
testSubscriptionByFirstSuspensionInCollect(flowOf(1).stateIn(this@runTest), emit = { })
243+
}
239244
}

kotlinx-coroutines-core/common/test/flow/sharing/SharedFlowTest.kt

+20
Original file line numberDiff line numberDiff line change
@@ -818,4 +818,24 @@ class SharedFlowTest : TestBase() {
818818
j2.cancelAndJoin()
819819
assertEquals(0, flow.subscriptionCount.first())
820820
}
821+
822+
@Test
823+
fun testSubscriptionByFirstSuspensionInSharedFlow() = runTest {
824+
testSubscriptionByFirstSuspensionInCollect(MutableSharedFlow()) { emit(it) }
825+
}
826+
}
827+
828+
/**
829+
* Check that, by the time [SharedFlow.collect] suspends for the first time, its subscription is already active.
830+
*/
831+
inline fun<T: Flow<Int>> CoroutineScope.testSubscriptionByFirstSuspensionInCollect(flow: T, emit: T.(Int) -> Unit) {
832+
var received = 0
833+
val job = launch(start = CoroutineStart.UNDISPATCHED) {
834+
flow.collect {
835+
received = it
836+
}
837+
}
838+
flow.emit(1)
839+
assertEquals(1, received)
840+
job.cancel()
821841
}

kotlinx-coroutines-core/common/test/flow/sharing/StateFlowTest.kt

+5
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,9 @@ class StateFlowTest : TestBase() {
181181
state.update { it + 3 }
182182
assertEquals(5, state.value)
183183
}
184+
185+
@Test
186+
fun testSubscriptionByFirstSuspensionInStateFlow() = runTest {
187+
testSubscriptionByFirstSuspensionInCollect(MutableStateFlow(0)) { value = it; yield() }
188+
}
184189
}

kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt

+5
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,9 @@ class StateInTest : TestBase() {
8686
assertFailsWith<TestException> { flow.stateIn(CoroutineScope(currentCoroutineContext() + Job() + ceh)) }
8787
finish(3)
8888
}
89+
90+
@Test
91+
fun testSubscriptionByFirstSuspensionInStateFlow() = runTest {
92+
testSubscriptionByFirstSuspensionInCollect(flowOf(1).stateIn(this@runTest)) { }
93+
}
8994
}

0 commit comments

Comments
 (0)