Skip to content

Commit ca2d6dd

Browse files
elizarovqwwdfsad
authored andcommitted
StateFlow implementation (Kotlin#1974)
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of StateFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes Kotlin#1973 Fixes Kotlin#395 Fixes Kotlin#1816 Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
1 parent 400f74c commit ca2d6dd

File tree

10 files changed

+632
-17
lines changed

10 files changed

+632
-17
lines changed

kotlinx-coroutines-core/api/kotlinx-coroutines-core.api

+29-3
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,26 @@ public final class kotlinx/coroutines/flow/FlowKt {
998998
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
999999
}
10001000

1001-
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
1001+
public final class kotlinx/coroutines/flow/LintKt {
1002+
public static final fun conflate (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
1003+
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow;
1004+
public static final fun flowOn (Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
1005+
}
1006+
1007+
public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow {
1008+
public abstract fun getValue ()Ljava/lang/Object;
1009+
public abstract fun setValue (Ljava/lang/Object;)V
1010+
}
1011+
1012+
public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/Flow {
1013+
public abstract fun getValue ()Ljava/lang/Object;
1014+
}
1015+
1016+
public final class kotlinx/coroutines/flow/StateFlowKt {
1017+
public static final fun MutableStateFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/MutableStateFlow;
1018+
}
1019+
1020+
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow {
10021021
public final field capacity I
10031022
public final field context Lkotlin/coroutines/CoroutineContext;
10041023
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
@@ -1007,10 +1026,9 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
10071026
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10081027
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
10091028
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
1029+
public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
10101030
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
10111031
public fun toString ()Ljava/lang/String;
1012-
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
1013-
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
10141032
}
10151033

10161034
public final class kotlinx/coroutines/flow/internal/CombineKt {
@@ -1021,6 +1039,14 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
10211039
public static final fun checkIndexOverflow (I)I
10221040
}
10231041

1042+
public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow {
1043+
public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
1044+
}
1045+
1046+
public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls {
1047+
public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow;
1048+
}
1049+
10241050
public final class kotlinx/coroutines/flow/internal/SafeCollector_commonKt {
10251051
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
10261052
}

kotlinx-coroutines-core/common/src/flow/Flow.kt

+6
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ import kotlin.coroutines.*
156156
*
157157
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
158158
* reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
159+
*
160+
* ### Not stable for inheritance
161+
*
162+
* **`Flow` interface is not stable for inheritance in 3rd party libraries**, as new methods
163+
* might be added to this interface in the future, but is stable for use.
164+
* Use `flow { ... }` builder function to create an implementation.
159165
*/
160166
public interface Flow<out T> {
161167
/**

kotlinx-coroutines-core/common/src/flow/Migration.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import kotlin.jvm.*
2222
* Deprecated functions also are moved here when they renamed. The difference is that they have
2323
* a body with their implementation while pure stubs have [noImpl].
2424
*/
25-
private fun noImpl(): Nothing =
25+
internal fun noImpl(): Nothing =
2626
throw UnsupportedOperationException("Not implemented, should not be called")
2727

2828
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.flow.internal.*
11+
import kotlinx.coroutines.internal.*
12+
import kotlin.coroutines.*
13+
import kotlin.native.concurrent.*
14+
15+
/**
16+
* A [Flow] that represents a read-only state with a single updatable data [value] that emits updates
17+
* to the value to its collectors. The current value can be retrieved via [value] property.
18+
* The flow of future updates to the value can be observed by collecting values from this flow.
19+
*
20+
* A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with
21+
* the initial value. The value of mutable state flow can be updated by setting its [value] property.
22+
* Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates,
23+
* but always collects the most recently emitted value.
24+
*
25+
* [StateFlow] is useful as a data-model class to represent any kind of state.
26+
* Derived values can be defined using various operators on the flows, with [combine] operator being especially
27+
* useful to combine values from multiple state flows using arbitrary functions.
28+
*
29+
* For example, the following class encapsulates an integer state and increments its value on each call to `inc`:
30+
*
31+
* ```
32+
* class CounterModel {
33+
* private val _counter = MutableStateFlow(0) // private mutable state flow
34+
* val counter: StateFlow<Int> get() = _counter // publicly exposed as read-only state flow
35+
*
36+
* fun inc() {
37+
* _counter.value++
38+
* }
39+
* }
40+
* ```
41+
*
42+
* Having two instances of the above `CounterModel` class one can define the sum of their counters like this:
43+
*
44+
* ```
45+
* val aModel = CounterModel()
46+
* val bModel = CounterModel()
47+
* val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }
48+
* ```
49+
*
50+
* ### Strong equality-based conflation
51+
*
52+
* Values in state flow are conflated using [Any.equals] comparison in a similar way to
53+
* [distinctUntilChanged] operator. It is used to conflate incoming updates
54+
* to [value][MutableStateFlow.value] in [MutableStateFlow] and to suppress emission of the values to collectors
55+
* when new value is equal to the previously emitted one. State flow behavior with classes that violate
56+
* the contract for [Any.equals] is unspecified.
57+
*
58+
* ### StateFlow vs ConflatedBroadcastChannel
59+
*
60+
* Conceptually state flow is similar to
61+
* [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel]
62+
* and is designed to completely replace `ConflatedBroadcastChannel` in the future.
63+
* It has the following important difference:
64+
*
65+
* * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
66+
* for faster, garbage-free implementation, unlike `ConflatedBroadcastChannel` implementation that
67+
* allocates objects on each emitted value.
68+
* * `StateFlow` always has a value which can be safely read at any time via [value] property.
69+
* Unlike `ConflatedBroadcastChannel`, there is no way to create a state flow without a value.
70+
* * `StateFlow` has a clear separation into a read-only `StateFlow` interface and a [MutableStateFlow].
71+
* * `StateFlow` conflation is based on equality like [distinctUntilChanged] operator,
72+
* unlike conflation in `ConflatedBroadcastChannel` that is based on reference identity.
73+
* * `StateFlow` cannot be currently closed like `ConflatedBroadcastChannel` and can never represent a failure.
74+
* This feature might be added in the future if enough compelling use-cases are found.
75+
*
76+
* `StateFlow` is designed to better cover typical use-cases of keeping track of state changes in time, taking
77+
* more pragmatic design choices for the sake of convenience.
78+
*
79+
* ### Concurrency
80+
*
81+
* All methods of data flow are **thread-safe** and can be safely invoked from concurrent coroutines without
82+
* external synchronization.
83+
*
84+
* ### Operator fusion
85+
*
86+
* Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
87+
* [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
88+
* or a [distinctUntilChanged][Flow.distinctUntilChanged] operator has no effect on the state flow.
89+
*
90+
* ### Implementation notes
91+
*
92+
* State flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure
93+
* thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when
94+
* using unconfined coroutines. Adding new collectors has `O(1)` amortized cost, but updating a [value] has `O(N)`
95+
* cost, where `N` is the number of active collectors.
96+
*
97+
* ### Not stable for inheritance
98+
*
99+
* **`StateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
100+
* might be added to this interface in the future, but is stable for use.
101+
* Use `MutableStateFlow()` constructor function to create an implementation.
102+
*/
103+
@ExperimentalCoroutinesApi
104+
public interface StateFlow<out T> : Flow<T> {
105+
/**
106+
* The current value of this state flow.
107+
*/
108+
public val value: T
109+
}
110+
111+
/**
112+
* A mutable [StateFlow] that provides a setter for [value] and a method to [close] the flow.
113+
*
114+
* See [StateFlow] documentation for details.
115+
*
116+
* ### Not stable for inheritance
117+
*
118+
* **`MutableStateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
119+
* might be added to this interface in the future, but is stable for use.
120+
* Use `MutableStateFlow()` constructor function to create an implementation.
121+
*/
122+
@ExperimentalCoroutinesApi
123+
public interface MutableStateFlow<T> : StateFlow<T> {
124+
/**
125+
* The current value of this state flow.
126+
*
127+
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
128+
*/
129+
public override var value: T
130+
}
131+
132+
/**
133+
* Creates a [MutableStateFlow] with the given initial [value].
134+
*/
135+
@Suppress("FunctionName")
136+
@ExperimentalCoroutinesApi
137+
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
138+
139+
// ------------------------------------ Implementation ------------------------------------
140+
141+
@SharedImmutable
142+
private val NONE = Symbol("NONE")
143+
144+
@SharedImmutable
145+
private val PENDING = Symbol("PENDING")
146+
147+
private const val INITIAL_SIZE = 2 // optimized for just a few collectors
148+
149+
// StateFlow slots are allocated for its collectors
150+
private class StateFlowSlot {
151+
/**
152+
* Each slot can have one of the following states:
153+
*
154+
* * `null` -- it is not used right now. Can [allocate] to new collector.
155+
* * `NONE` -- used by a collector, but neither suspended nor has pending value.
156+
* * `PENDING` -- pending to process new value.
157+
* * `CancellableContinuationImpl<Unit>` -- suspended waiting for new value.
158+
*
159+
* It is important that default `null` value is used, because there can be a race between allocation
160+
* of a new slot and trying to do [makePending] on this slot.
161+
*/
162+
private val _state = atomic<Any?>(null)
163+
164+
fun allocate(): Boolean {
165+
// No need for atomic check & update here, since allocated happens under StateFlow lock
166+
if (_state.value != null) return false // not free
167+
_state.value = NONE // allocated
168+
return true
169+
}
170+
171+
fun free() {
172+
_state.value = null // free now
173+
}
174+
175+
@Suppress("UNCHECKED_CAST")
176+
fun makePending() {
177+
_state.loop { state ->
178+
when {
179+
state == null -> return // this slot is free - skip it
180+
state === PENDING -> return // already pending, nothing to do
181+
state === NONE -> { // mark as pending
182+
if (_state.compareAndSet(state, PENDING)) return
183+
}
184+
else -> { // must be a suspend continuation state
185+
// we must still use CAS here since continuation may get cancelled and free the slot at any time
186+
if (_state.compareAndSet(state, NONE)) {
187+
(state as CancellableContinuationImpl<Unit>).resume(Unit)
188+
return
189+
}
190+
}
191+
}
192+
}
193+
}
194+
195+
fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
196+
assert { state !is CancellableContinuationImpl<*> }
197+
return state === PENDING
198+
}
199+
200+
@Suppress("UNCHECKED_CAST")
201+
suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
202+
assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
203+
if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
204+
// CAS failed -- the only possible reason is that it is already in pending state now
205+
assert { _state.value === PENDING }
206+
cont.resume(Unit)
207+
}
208+
}
209+
210+
private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
211+
private val _state = atomic(initialValue) // T | NULL
212+
private var sequence = 0 // serializes updates, value update is in process when sequence is odd
213+
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
214+
private var nSlots = 0 // number of allocated (!free) slots
215+
private var nextIndex = 0 // oracle for the next free slot index
216+
217+
@Suppress("UNCHECKED_CAST")
218+
public override var value: T
219+
get() = NULL.unbox(_state.value)
220+
set(value) {
221+
var curSequence = 0
222+
var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
223+
val newState = value ?: NULL
224+
synchronized(this) {
225+
val oldState = _state.value
226+
if (oldState == newState) return // Don't do anything if value is not changing
227+
_state.value = newState
228+
curSequence = sequence
229+
if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
230+
curSequence++ // make it odd
231+
sequence = curSequence
232+
} else {
233+
// update is already in process, notify it, and return
234+
sequence = curSequence + 2 // change sequence to notify, keep it odd
235+
return
236+
}
237+
curSlots = slots // read current reference to collectors under lock
238+
}
239+
/*
240+
Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
241+
Loop until we're done firing all the changes. This is sort of simple flat combining that
242+
ensures sequential firing of concurrent updates and avoids the storm of collector resumes
243+
when updates happen concurrently from many threads.
244+
*/
245+
while (true) {
246+
// Benign race on element read from array
247+
for (col in curSlots) {
248+
col?.makePending()
249+
}
250+
// check if the value was updated again while we were updating the old one
251+
synchronized(this) {
252+
if (sequence == curSequence) { // nothing changed, we are done
253+
sequence = curSequence + 1 // make sequence even again
254+
return // done
255+
}
256+
// reread everything for the next loop under the lock
257+
curSequence = sequence
258+
curSlots = slots
259+
}
260+
}
261+
}
262+
263+
override suspend fun collect(collector: FlowCollector<T>) {
264+
val slot = allocateSlot()
265+
var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
266+
try {
267+
// The loop is arranged so that it starts delivering current value without waiting first
268+
while (true) {
269+
// Here the coroutine could have waited for a while to be dispatched,
270+
// so we use the most recent state here to ensure the best possible conflation of stale values
271+
val newState = _state.value
272+
// Conflate value emissions using equality
273+
if (prevState == null || newState != prevState) {
274+
collector.emit(NULL.unbox(newState))
275+
prevState = newState
276+
}
277+
// Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
278+
if (!slot.takePending()) { // try fast-path without suspending first
279+
slot.awaitPending() // only suspend for new values when needed
280+
}
281+
}
282+
} finally {
283+
freeSlot(slot)
284+
}
285+
}
286+
287+
private fun allocateSlot(): StateFlowSlot = synchronized(this) {
288+
val size = slots.size
289+
if (nSlots >= size) slots = slots.copyOf(2 * size)
290+
var index = nextIndex
291+
var slot: StateFlowSlot
292+
while (true) {
293+
slot = slots[index] ?: StateFlowSlot().also { slots[index] = it }
294+
index++
295+
if (index >= slots.size) index = 0
296+
if (slot.allocate()) break // break when found and allocated free slot
297+
}
298+
nextIndex = index
299+
nSlots++
300+
slot
301+
}
302+
303+
private fun freeSlot(slot: StateFlowSlot): Unit = synchronized(this) {
304+
slot.free()
305+
nSlots--
306+
}
307+
308+
override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> {
309+
// context is irrelevant for state flow and it is always conflated
310+
// so it should not do anything unless buffering is requested
311+
return when (capacity) {
312+
Channel.CONFLATED, Channel.RENDEZVOUS -> this
313+
else -> ChannelFlowOperatorImpl(this, context, capacity)
314+
}
315+
}
316+
}

0 commit comments

Comments
 (0)