Skip to content

Commit 03b81f4

Browse files
Handle cancelled scope and empty flow in Flow.stateIn (#4327)
Fixes #4322
1 parent 09223c6 commit 03b81f4

File tree

2 files changed

+28
-4
lines changed

2 files changed

+28
-4
lines changed

kotlinx-coroutines-core/common/src/flow/operators/Share.kt

+8-4
Original file line numberDiff line numberDiff line change
@@ -317,29 +317,33 @@ public fun <T> Flow<T>.stateIn(
317317
* with multiple downstream subscribers. See the [StateFlow] documentation for the general concepts of state flows.
318318
*
319319
* @param scope the coroutine scope in which sharing is started.
320+
* @throws NoSuchElementException if the upstream flow does not emit any value.
320321
*/
321322
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
322323
val config = configureSharing(1)
323-
val result = CompletableDeferred<StateFlow<T>>()
324+
val result = CompletableDeferred<Result<StateFlow<T>>>(scope.coroutineContext[Job])
324325
scope.launchSharingDeferred(config.context, config.upstream, result)
325-
return result.await()
326+
return result.await().getOrThrow()
326327
}
327328

328329
private fun <T> CoroutineScope.launchSharingDeferred(
329330
context: CoroutineContext,
330331
upstream: Flow<T>,
331-
result: CompletableDeferred<StateFlow<T>>
332+
result: CompletableDeferred<Result<StateFlow<T>>>,
332333
) {
333334
launch(context) {
334335
try {
335336
var state: MutableStateFlow<T>? = null
336337
upstream.collect { value ->
337338
state?.let { it.value = value } ?: run {
338339
state = MutableStateFlow(value).also {
339-
result.complete(ReadonlyStateFlow(it, coroutineContext.job))
340+
result.complete(Result.success(ReadonlyStateFlow(it, coroutineContext.job)))
340341
}
341342
}
342343
}
344+
if (state == null) {
345+
result.complete(Result.failure(NoSuchElementException("Flow is empty")))
346+
}
343347
} catch (e: Throwable) {
344348
// Notify the waiter that the flow has failed
345349
result.completeExceptionally(e)

kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt

+20
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package kotlinx.coroutines.flow
33
import kotlinx.coroutines.testing.*
44
import kotlinx.coroutines.*
55
import kotlinx.coroutines.channels.*
6+
import kotlin.coroutines.*
67
import kotlin.test.*
78

89
/**
@@ -88,4 +89,23 @@ class StateInTest : TestBase() {
8889
fun testSubscriptionByFirstSuspensionInStateFlow() = runTest {
8990
testSubscriptionByFirstSuspensionInCollect(flowOf(1).stateIn(this@runTest)) { }
9091
}
92+
93+
@Test
94+
fun testRethrowsCEOnCancelledScope() = runTest {
95+
val cancelledScope = CoroutineScope(EmptyCoroutineContext).apply { cancel("CancelMessageToken") }
96+
val flow = flowOf(1, 2, 3)
97+
assertFailsWith<CancellationException>("CancelMessageToken") {
98+
flow.stateIn(cancelledScope)
99+
}
100+
}
101+
102+
@Test
103+
fun testThrowsNoSuchElementExceptionOnEmptyFlow() = runTest {
104+
val flow = emptyFlow<Any>()
105+
assertFailsWith<NoSuchElementException> {
106+
flow.stateIn(this)
107+
}
108+
// Ensure that the collecting scope is not cancelled by the NoSuchElementException
109+
assertEquals(true, coroutineContext[Job]?.isActive)
110+
}
91111
}

0 commit comments

Comments
 (0)