|
| 1 | +/* |
| 2 | + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
| 3 | + */ |
| 4 | + |
| 5 | +package kotlinx.coroutines.flow |
| 6 | + |
| 7 | +import kotlinx.atomicfu.* |
| 8 | +import kotlinx.coroutines.* |
| 9 | +import kotlinx.coroutines.channels.* |
| 10 | +import kotlinx.coroutines.flow.internal.* |
| 11 | +import kotlinx.coroutines.internal.* |
| 12 | +import kotlin.coroutines.* |
| 13 | +import kotlin.native.concurrent.* |
| 14 | + |
| 15 | +/** |
| 16 | + * A [Flow] that represents a read-only state with a single updatable data [value] that emits updates |
| 17 | + * to the value to its collectors. The current value can be retrieved via [value] property. |
| 18 | + * The flow of future updates to the value can be observed by collecting values from this flow. |
| 19 | + * |
| 20 | + * A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with |
| 21 | + * the initial value. The value of mutable state flow can be updated by setting its [value] property. |
| 22 | + * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates, |
| 23 | + * but always collects the most recently emitted value. |
| 24 | + * |
| 25 | + * [StateFlow] is useful as a data-model class to represent any kind of state. |
| 26 | + * Derived values can be defined using various operators on the flows, with [combine] operator being especially |
| 27 | + * useful to combine values from multiple state flows using arbitrary functions. |
| 28 | + * |
| 29 | + * For example, the following class encapsulates an integer state and increments its value on each call to `inc`: |
| 30 | + * |
| 31 | + * ``` |
| 32 | + * class CounterModel { |
| 33 | + * private val _counter = MutableStateFlow(0) // private mutable state flow |
| 34 | + * val counter: StateFlow<Int> get() = _counter // publicly exposed as read-only state flow |
| 35 | + * |
| 36 | + * fun inc() { |
| 37 | + * _counter.value++ |
| 38 | + * } |
| 39 | + * } |
| 40 | + * ``` |
| 41 | + * |
| 42 | + * Having two instances of the above `CounterModel` class one can define the sum of their counters like this: |
| 43 | + * |
| 44 | + * ``` |
| 45 | + * val aModel = CounterModel() |
| 46 | + * val bModel = CounterModel() |
| 47 | + * val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b } |
| 48 | + * ``` |
| 49 | + * |
| 50 | + * ### Strong equality-based conflation |
| 51 | + * |
| 52 | + * Values in state flow are conflated using [Any.equals] comparison in a similar way to |
| 53 | + * [distinctUntilChanged] operator. It is used to conflate incoming updates |
| 54 | + * to [value][MutableStateFlow.value] in [MutableStateFlow] and to suppress emission of the values to collectors |
| 55 | + * when new value is equal to the previously emitted one. State flow behavior with classes that violate |
| 56 | + * the contract for [Any.equals] is unspecified. |
| 57 | + * |
| 58 | + * ### StateFlow vs ConflatedBroadcastChannel |
| 59 | + * |
| 60 | + * Conceptually state flow is similar to |
| 61 | + * [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel] |
| 62 | + * and is designed to completely replace `ConflatedBroadcastChannel` in the future. |
| 63 | + * It has the following important difference: |
| 64 | + * |
| 65 | + * * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows |
| 66 | + * for faster, garbage-free implementation, unlike `ConflatedBroadcastChannel` implementation that |
| 67 | + * allocates objects on each emitted value. |
| 68 | + * * `StateFlow` always has a value which can be safely read at any time via [value] property. |
| 69 | + * Unlike `ConflatedBroadcastChannel`, there is no way to create a state flow without a value. |
| 70 | + * * `StateFlow` has a clear separation into a read-only `StateFlow` interface and a [MutableStateFlow]. |
| 71 | + * * `StateFlow` conflation is based on equality like [distinctUntilChanged] operator, |
| 72 | + * unlike conflation in `ConflatedBroadcastChannel` that is based on reference identity. |
| 73 | + * * `StateFlow` cannot be currently closed like `ConflatedBroadcastChannel` and can never represent a failure. |
| 74 | + * This feature might be added in the future if enough compelling use-cases are found. |
| 75 | + * |
| 76 | + * `StateFlow` is designed to better cover typical use-cases of keeping track of state changes in time, taking |
| 77 | + * more pragmatic design choices for the sake of convenience. |
| 78 | + * |
| 79 | + * ### Concurrency |
| 80 | + * |
| 81 | + * All methods of data flow are **thread-safe** and can be safely invoked from concurrent coroutines without |
| 82 | + * external synchronization. |
| 83 | + * |
| 84 | + * ### Operator fusion |
| 85 | + * |
| 86 | + * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate], |
| 87 | + * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity, |
| 88 | + * or a [distinctUntilChanged][Flow.distinctUntilChanged] operator has no effect on the state flow. |
| 89 | + * |
| 90 | + * ### Implementation notes |
| 91 | + * |
| 92 | + * State flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure |
| 93 | + * thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when |
| 94 | + * using unconfined coroutines. Adding new collectors has `O(1)` amortized cost, but updating a [value] has `O(N)` |
| 95 | + * cost, where `N` is the number of active collectors. |
| 96 | + * |
| 97 | + * ### Not stable for inheritance |
| 98 | + * |
| 99 | + * **`StateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods |
| 100 | + * might be added to this interface in the future, but is stable for use. |
| 101 | + * Use `MutableStateFlow()` constructor function to create an implementation. |
| 102 | + */ |
| 103 | +@ExperimentalCoroutinesApi |
| 104 | +public interface StateFlow<out T> : Flow<T> { |
| 105 | + /** |
| 106 | + * The current value of this state flow. |
| 107 | + */ |
| 108 | + public val value: T |
| 109 | +} |
| 110 | + |
| 111 | +/** |
| 112 | + * A mutable [StateFlow] that provides a setter for [value] and a method to [close] the flow. |
| 113 | + * |
| 114 | + * See [StateFlow] documentation for details. |
| 115 | + * |
| 116 | + * ### Not stable for inheritance |
| 117 | + * |
| 118 | + * **`MutableStateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods |
| 119 | + * might be added to this interface in the future, but is stable for use. |
| 120 | + * Use `MutableStateFlow()` constructor function to create an implementation. |
| 121 | + */ |
| 122 | +@ExperimentalCoroutinesApi |
| 123 | +public interface MutableStateFlow<T> : StateFlow<T> { |
| 124 | + /** |
| 125 | + * The current value of this state flow. |
| 126 | + * |
| 127 | + * Setting a value that is [equal][Any.equals] to the previous one does nothing. |
| 128 | + */ |
| 129 | + public override var value: T |
| 130 | +} |
| 131 | + |
| 132 | +/** |
| 133 | + * Creates a [MutableStateFlow] with the given initial [value]. |
| 134 | + */ |
| 135 | +@Suppress("FunctionName") |
| 136 | +@ExperimentalCoroutinesApi |
| 137 | +public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL) |
| 138 | + |
| 139 | +// ------------------------------------ Implementation ------------------------------------ |
| 140 | + |
| 141 | +@SharedImmutable |
| 142 | +private val NONE = Symbol("NONE") |
| 143 | + |
| 144 | +@SharedImmutable |
| 145 | +private val PENDING = Symbol("PENDING") |
| 146 | + |
| 147 | +private const val INITIAL_SIZE = 2 // optimized for just a few collectors |
| 148 | + |
| 149 | +// StateFlow slots are allocated for its collectors |
| 150 | +private class StateFlowSlot { |
| 151 | + /** |
| 152 | + * Each slot can have one of the following states: |
| 153 | + * |
| 154 | + * * `null` -- it is not used right now. Can [allocate] to new collector. |
| 155 | + * * `NONE` -- used by a collector, but neither suspended nor has pending value. |
| 156 | + * * `PENDING` -- pending to process new value. |
| 157 | + * * `CancellableContinuationImpl<Unit>` -- suspended waiting for new value. |
| 158 | + * |
| 159 | + * It is important that default `null` value is used, because there can be a race between allocation |
| 160 | + * of a new slot and trying to do [makePending] on this slot. |
| 161 | + */ |
| 162 | + private val _state = atomic<Any?>(null) |
| 163 | + |
| 164 | + fun allocate(): Boolean { |
| 165 | + // No need for atomic check & update here, since allocated happens under StateFlow lock |
| 166 | + if (_state.value != null) return false // not free |
| 167 | + _state.value = NONE // allocated |
| 168 | + return true |
| 169 | + } |
| 170 | + |
| 171 | + fun free() { |
| 172 | + _state.value = null // free now |
| 173 | + } |
| 174 | + |
| 175 | + @Suppress("UNCHECKED_CAST") |
| 176 | + fun makePending() { |
| 177 | + _state.loop { state -> |
| 178 | + when { |
| 179 | + state == null -> return // this slot is free - skip it |
| 180 | + state === PENDING -> return // already pending, nothing to do |
| 181 | + state === NONE -> { // mark as pending |
| 182 | + if (_state.compareAndSet(state, PENDING)) return |
| 183 | + } |
| 184 | + else -> { // must be a suspend continuation state |
| 185 | + // we must still use CAS here since continuation may get cancelled and free the slot at any time |
| 186 | + if (_state.compareAndSet(state, NONE)) { |
| 187 | + (state as CancellableContinuationImpl<Unit>).resume(Unit) |
| 188 | + return |
| 189 | + } |
| 190 | + } |
| 191 | + } |
| 192 | + } |
| 193 | + } |
| 194 | + |
| 195 | + fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state -> |
| 196 | + assert { state !is CancellableContinuationImpl<*> } |
| 197 | + return state === PENDING |
| 198 | + } |
| 199 | + |
| 200 | + @Suppress("UNCHECKED_CAST") |
| 201 | + suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont -> |
| 202 | + assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING |
| 203 | + if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending |
| 204 | + // CAS failed -- the only possible reason is that it is already in pending state now |
| 205 | + assert { _state.value === PENDING } |
| 206 | + cont.resume(Unit) |
| 207 | + } |
| 208 | +} |
| 209 | + |
| 210 | +private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> { |
| 211 | + private val _state = atomic(initialValue) // T | NULL |
| 212 | + private var sequence = 0 // serializes updates, value update is in process when sequence is odd |
| 213 | + private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE) |
| 214 | + private var nSlots = 0 // number of allocated (!free) slots |
| 215 | + private var nextIndex = 0 // oracle for the next free slot index |
| 216 | + |
| 217 | + @Suppress("UNCHECKED_CAST") |
| 218 | + public override var value: T |
| 219 | + get() = NULL.unbox(_state.value) |
| 220 | + set(value) { |
| 221 | + var curSequence = 0 |
| 222 | + var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it |
| 223 | + val newState = value ?: NULL |
| 224 | + synchronized(this) { |
| 225 | + val oldState = _state.value |
| 226 | + if (oldState == newState) return // Don't do anything if value is not changing |
| 227 | + _state.value = newState |
| 228 | + curSequence = sequence |
| 229 | + if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update) |
| 230 | + curSequence++ // make it odd |
| 231 | + sequence = curSequence |
| 232 | + } else { |
| 233 | + // update is already in process, notify it, and return |
| 234 | + sequence = curSequence + 2 // change sequence to notify, keep it odd |
| 235 | + return |
| 236 | + } |
| 237 | + curSlots = slots // read current reference to collectors under lock |
| 238 | + } |
| 239 | + /* |
| 240 | + Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines |
| 241 | + Loop until we're done firing all the changes. This is sort of simple flat combining that |
| 242 | + ensures sequential firing of concurrent updates and avoids the storm of collector resumes |
| 243 | + when updates happen concurrently from many threads. |
| 244 | + */ |
| 245 | + while (true) { |
| 246 | + // Benign race on element read from array |
| 247 | + for (col in curSlots) { |
| 248 | + col?.makePending() |
| 249 | + } |
| 250 | + // check if the value was updated again while we were updating the old one |
| 251 | + synchronized(this) { |
| 252 | + if (sequence == curSequence) { // nothing changed, we are done |
| 253 | + sequence = curSequence + 1 // make sequence even again |
| 254 | + return // done |
| 255 | + } |
| 256 | + // reread everything for the next loop under the lock |
| 257 | + curSequence = sequence |
| 258 | + curSlots = slots |
| 259 | + } |
| 260 | + } |
| 261 | + } |
| 262 | + |
| 263 | + override suspend fun collect(collector: FlowCollector<T>) { |
| 264 | + val slot = allocateSlot() |
| 265 | + var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet) |
| 266 | + try { |
| 267 | + // The loop is arranged so that it starts delivering current value without waiting first |
| 268 | + while (true) { |
| 269 | + // Here the coroutine could have waited for a while to be dispatched, |
| 270 | + // so we use the most recent state here to ensure the best possible conflation of stale values |
| 271 | + val newState = _state.value |
| 272 | + // Conflate value emissions using equality |
| 273 | + if (prevState == null || newState != prevState) { |
| 274 | + collector.emit(NULL.unbox(newState)) |
| 275 | + prevState = newState |
| 276 | + } |
| 277 | + // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot |
| 278 | + if (!slot.takePending()) { // try fast-path without suspending first |
| 279 | + slot.awaitPending() // only suspend for new values when needed |
| 280 | + } |
| 281 | + } |
| 282 | + } finally { |
| 283 | + freeSlot(slot) |
| 284 | + } |
| 285 | + } |
| 286 | + |
| 287 | + private fun allocateSlot(): StateFlowSlot = synchronized(this) { |
| 288 | + val size = slots.size |
| 289 | + if (nSlots >= size) slots = slots.copyOf(2 * size) |
| 290 | + var index = nextIndex |
| 291 | + var slot: StateFlowSlot |
| 292 | + while (true) { |
| 293 | + slot = slots[index] ?: StateFlowSlot().also { slots[index] = it } |
| 294 | + index++ |
| 295 | + if (index >= slots.size) index = 0 |
| 296 | + if (slot.allocate()) break // break when found and allocated free slot |
| 297 | + } |
| 298 | + nextIndex = index |
| 299 | + nSlots++ |
| 300 | + slot |
| 301 | + } |
| 302 | + |
| 303 | + private fun freeSlot(slot: StateFlowSlot): Unit = synchronized(this) { |
| 304 | + slot.free() |
| 305 | + nSlots-- |
| 306 | + } |
| 307 | + |
| 308 | + override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> { |
| 309 | + // context is irrelevant for state flow and it is always conflated |
| 310 | + // so it should not do anything unless buffering is requested |
| 311 | + return when (capacity) { |
| 312 | + Channel.CONFLATED, Channel.RENDEZVOUS -> this |
| 313 | + else -> ChannelFlowOperatorImpl(this, context, capacity) |
| 314 | + } |
| 315 | + } |
| 316 | +} |
0 commit comments