Skip to content

Commit 270c964

Browse files
committed
~ Add non-suspend stateIn operator with the initial value
* Move StateInTest * Add `out T` projection to StateFlow type
1 parent 7612c4a commit 270c964

File tree

3 files changed

+105
-26
lines changed

3 files changed

+105
-26
lines changed

kotlinx-coroutines-core/common/src/flow/StateFlow.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ import kotlin.native.concurrent.*
147147
* cost, where `N` is the number of active collectors.
148148
*/
149149
@ExperimentalCoroutinesApi
150-
public interface StateFlow<T> : Flow<T> {
150+
public interface StateFlow<out T> : Flow<T> {
151151
/**
152152
* The current value of this state flow.
153153
*/

kotlinx-coroutines-core/common/src/flow/operators/State.kt

+55-25
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,38 @@ import kotlin.jvm.*
1616
* **Preview feature**: The design of `stateIn` operator is tentative and could change.
1717
*/
1818
@FlowPreview
19-
public interface StateFlowJob<T> : StateFlow<T>, Job
19+
public interface StateFlowJob<out T> : StateFlow<T>, Job
2020

2121
/**
22-
* Launches a coroutine that collects this [Flow] and emits all the collected values as a resulting [StateFlow].
22+
* Launches a coroutine that collects this [Flow] and emits all the collected values as the resulting [StateFlow].
23+
* The result of this function is both a [StateFlow] and a [Job]. This call effectively turns a cold [Flow] into a
24+
* hot, active [Flow], making the most recently emitted value available for consumption at any time via
25+
* [StateFlow.value]. This function returns immediately. The state flow it creates is initially set
26+
* to the specified default [value]. As an alternative a version of `stateIn` without a default value can be used,
27+
* which suspend until the source flow emits a first value.
28+
*
29+
* The resulting coroutine can be cancelled by [cancelling][CoroutineScope.cancel] the [scope] in which it is
30+
* launched or by [cancelling][Job.cancel] the resulting [Job].
31+
*
32+
* Errors in the source flow are not propagated to the [scope] but [close][MutableStateFlow.close] the resulting
33+
* [StateFlow] or are rethrown to the caller of `stateIn` if they happen before emission of the first value.
34+
*
35+
* **Preview feature**: The design of `stateIn` operator is tentative and could change.
36+
*/
37+
@FlowPreview
38+
public fun <T> Flow<T>.stateIn(scope: CoroutineScope, value: T): StateFlowJob<T> {
39+
val state = StateFlow(value)
40+
val job = scope.launchStateJob(this, null, state)
41+
return StateFlowJobImpl(state, job)
42+
}
43+
44+
/**
45+
* Launches a coroutine that collects this [Flow] and emits all the collected values as the resulting [StateFlow].
2346
* The result of this function is both a [StateFlow] and a [Job]. This call effectively turns a cold [Flow] into a
2447
* hot, active [Flow], making the most recently emitted value available for consumption at any time via
2548
* [StateFlow.value]. This call suspends until the source flow the emits first value and
26-
* throws [NoSuchElementException] if the flow was empty.
49+
* throws [NoSuchElementException] if the flow was empty. As an alternative, a version of `stateIn` with a default
50+
* value can be used, which does not suspend.
2751
*
2852
* The resulting coroutine can be cancelled by [cancelling][CoroutineScope.cancel] the [scope] in which it is
2953
* launched or by [cancelling][Job.cancel] the resulting [Job].
@@ -35,32 +59,38 @@ public interface StateFlowJob<T> : StateFlow<T>, Job
3559
*/
3660
@FlowPreview
3761
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlowJob<T> {
38-
val result = CompletableDeferred<StateFlowJob<T>>()
39-
scope.launch {
40-
var state: MutableStateFlow<T>? = null
41-
var exception: Throwable? = null
42-
try {
43-
collect { value ->
44-
// Update state flow if initialized
45-
state?.let { it.value = value } ?: run {
46-
// Or create state on the first value if state was not created yet (first value)
47-
state = StateFlow(value).also {
48-
// resume stateIn call with initialized StateFlow and current job
49-
result.complete(StateFlowJobImpl(it, coroutineContext[Job]!!))
50-
}
62+
val deferredResult = CompletableDeferred<StateFlowJob<T>>()
63+
scope.launchStateJob(this, deferredResult, null)
64+
return deferredResult.await() // tail call
65+
}
66+
67+
private fun <T> CoroutineScope.launchStateJob(
68+
flow: Flow<T>,
69+
deferredResult: CompletableDeferred<StateFlowJob<T>>?,
70+
initialState: MutableStateFlow<T>?
71+
) = launch {
72+
var state: MutableStateFlow<T>? = initialState
73+
var exception: Throwable? = null
74+
try {
75+
flow.collect { value ->
76+
// Update state flow if initialized
77+
state?.let { it.value = value } ?: run {
78+
// Or create state on the first value if state was not created yet (first value)
79+
state = StateFlow(value).also {
80+
// resume stateIn call with initialized StateFlow and current job
81+
deferredResult?.complete(StateFlowJobImpl(it, coroutineContext[Job]!!))
5182
}
5283
}
53-
} catch (e: Throwable) {
54-
// Ignore cancellation exception -- it is a normal way to stop the flow
55-
if (e !is CancellationException) exception = e
56-
}
57-
// Close the state flow with exception if initialized
58-
state?.apply { close(exception) } ?: run {
59-
// Or complete the deferred exceptionally if the state was not create yet)
60-
result.completeExceptionally(exception ?: NoSuchElementException("Expected at least one element"))
6184
}
85+
} catch (e: Throwable) {
86+
// Ignore cancellation exception -- it is a normal way to stop the flow
87+
if (e !is CancellationException) exception = e
88+
}
89+
// Close the state flow with exception if initialized
90+
state?.apply { close(exception) } ?: run {
91+
// Or complete the deferred exceptionally if the state was not create yet)
92+
deferredResult?.completeExceptionally(exception ?: NoSuchElementException("Expected at least one element"))
6293
}
63-
return result.await() // tail call
6494
}
6595

6696
private class StateFlowJobImpl<T>(state: StateFlow<T>, job: Job) : StateFlowJob<T>, StateFlow<T> by state, Job by job

kotlinx-coroutines-core/common/test/flow/StateInTest.kt renamed to kotlinx-coroutines-core/common/test/flow/operators/StateInTest.kt

+49
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,52 @@ class StateInTest : TestBase() {
1111
}
1212
}
1313

14+
@Test
15+
fun testEmptyFlowNullValue() = runTest {
16+
val state = emptyFlow<Int>().stateIn(this, null)
17+
assertEquals(null, state.value)
18+
assertFalse(state.isClosed)
19+
yield()
20+
assertEquals(null, state.value)
21+
assertTrue(state.isClosed)
22+
}
23+
24+
@Test
25+
fun testEmptyFlowZeroValue() = runTest {
26+
val state = emptyFlow<Int>().stateIn(this, 0)
27+
assertEquals(0, state.value)
28+
assertFalse(state.isClosed)
29+
yield()
30+
assertEquals(0, state.value)
31+
assertTrue(state.isClosed)
32+
}
33+
1434
@Test
1535
fun testFailingFlow() = runTest {
1636
assertFailsWith<TestException> {
1737
flow<Int> { throw TestException() }.stateIn(this)
1838
}
1939
}
2040

41+
@Test
42+
fun testFailingFlowValue() = runTest {
43+
expect(1)
44+
val state = flow<Int> { throw TestException() }.stateIn(this, 42)
45+
assertEquals(42, state.value)
46+
assertFalse(state.isClosed)
47+
yield()
48+
assertEquals(42, state.value)
49+
assertTrue(state.isClosed)
50+
assertFailsWith<TestException> {
51+
expect(2)
52+
state.collect { value ->
53+
assertEquals(42, value)
54+
expect(3)
55+
}
56+
}
57+
finish(4)
58+
}
59+
2160
@Test
2261
fun testOneElementFlow() = runTest {
2362
val state = flowOf("OK").onCompletion { yield() }.stateIn(this)
@@ -28,6 +67,16 @@ class StateInTest : TestBase() {
2867
assertTrue(state.isClosed)
2968
}
3069

70+
@Test
71+
fun testOneElementFlowValue() = runTest {
72+
val state = flowOf("OK").stateIn(this, "INIT")
73+
assertEquals("INIT", state.value)
74+
assertFalse(state.isClosed)
75+
yield()
76+
assertEquals("OK", state.value)
77+
assertTrue(state.isClosed)
78+
}
79+
3180
@Test
3281
fun testStateFlowJobCancellation() = runTest {
3382
val flow = flow<Int> {

0 commit comments

Comments
 (0)