Skip to content

Commit d66206e

Browse files
committed
DataFlow implementation
DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of DataFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes #1082
1 parent 693142c commit d66206e

File tree

6 files changed

+587
-14
lines changed

6 files changed

+587
-14
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+27-3
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,23 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/
791791
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
792792
}
793793

794+
public abstract interface class kotlinx/coroutines/flow/DataFlow : kotlinx/coroutines/flow/Flow {
795+
public abstract fun close (Ljava/lang/Throwable;)Z
796+
public abstract fun getValue ()Ljava/lang/Object;
797+
public abstract fun isClosed ()Z
798+
public abstract fun isInitialized ()Z
799+
public abstract fun setValue (Ljava/lang/Object;)V
800+
}
801+
802+
public final class kotlinx/coroutines/flow/DataFlow$DefaultImpls {
803+
public static synthetic fun close$default (Lkotlinx/coroutines/flow/DataFlow;Ljava/lang/Throwable;ILjava/lang/Object;)Z
804+
}
805+
806+
public final class kotlinx/coroutines/flow/DataFlowKt {
807+
public static final fun DataFlow ()Lkotlinx/coroutines/flow/DataFlow;
808+
public static final fun DataFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/DataFlow;
809+
}
810+
794811
public abstract interface class kotlinx/coroutines/flow/Flow {
795812
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
796813
}
@@ -924,7 +941,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
924941
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
925942
}
926943

927-
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
944+
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow {
928945
public final field capacity I
929946
public final field context Lkotlin/coroutines/CoroutineContext;
930947
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
@@ -933,10 +950,17 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
933950
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
934951
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
935952
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
953+
public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
936954
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
937955
public fun toString ()Ljava/lang/String;
938-
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
939-
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
956+
}
957+
958+
public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow {
959+
public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
960+
}
961+
962+
public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls {
963+
public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow;
940964
}
941965

942966
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.flow.internal.*
11+
import kotlinx.coroutines.internal.*
12+
import kotlin.coroutines.*
13+
import kotlin.jvm.*
14+
15+
/**
16+
* A [Flow] that contains a single updatable data [value].
17+
*
18+
* A data flow can be created with `DataFlow()` constructor function either without an initial value
19+
* or with an initial value. Data value can be updated by setting its [value] property.
20+
* Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector will skip fast updates,
21+
* but always collects the most recently emitted value.
22+
*
23+
* The name reflects the fact that [DataFlow] represents an cell in a "data flow programming" model
24+
* (think of an electronic spreadsheet). Dependent values can be defined by using various operators on the
25+
* flows, with [combineLatest] being especially useful to combine multiple data flows using arbitrary functions.
26+
*
27+
* A data flow can be [closed][close] and all its collectors will complete.
28+
*
29+
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data.
30+
* The ability to read the current [value] is provided only for convenience of the code that updates the data.
31+
* For example, the following class encapsulates an integer data flow and increments its value on each call to `inc`:
32+
*
33+
* ```
34+
* class CounterModel {
35+
* private val _counter = DataFlow(0) // private data flow
36+
* val counter: Flow<Int> get() = _counter // publicly exposed as a flow
37+
*
38+
* fun inc() {
39+
* _counter.value++
40+
* }
41+
* }
42+
* ```
43+
*
44+
* Having two instances of the above `CounterModel` class one can define the sum of their counters like this:
45+
*
46+
* ```
47+
* val aModel = CounterModel()
48+
* val bModel = CounterModel()
49+
* val sum = aModel.counter.combineLatest(bModel.counter) { a, b -> a + b }
50+
* ```
51+
*
52+
* Conceptually data flow is similar to
53+
* [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel]
54+
* but it is simpler, because it does not have to implement all the channel APIs.
55+
*
56+
* All methods of data flow are **thread-safe** and can be safely invoked from concurrent coroutines without
57+
* external synchronization.
58+
*
59+
* ### Operator fusion
60+
*
61+
* Application of [flowOn], [conflate], and
62+
* [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity
63+
* has no effect on the data flow.
64+
*
65+
* ### Implementation notes
66+
*
67+
* Data flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure
68+
* thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when
69+
* using unconfined coroutines. Adding new collectors has `O(1)` amortized code, but updating a [value] has `O(N)`
70+
* cost, where `N` is the number of collectors.
71+
*/
72+
@FlowPreview
73+
public interface DataFlow<T> : Flow<T> {
74+
/**
75+
* Returns `true` if this data flow is initialized.
76+
*
77+
* A data flow is initialized when its [value] was set at least once.
78+
* A [closed][isClosed] data flow is considered to be initialized.
79+
*/
80+
public val isInitialized: Boolean
81+
82+
/**
83+
* Returns `true` if this data flow is closed.
84+
*
85+
* See [close].
86+
*/
87+
public val isClosed: Boolean
88+
89+
/**
90+
* Value of this data flow.
91+
*
92+
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
93+
*
94+
* Getting a value from an [uninitialized][isInitialized] flow produces [IllegalStateException].
95+
* Getting a value from or setting a value to a [closed][isClosed] flow produces [IllegalStateException].
96+
*/
97+
public var value: T
98+
99+
/**
100+
* Closes this data fl9w.
101+
* This is an idempotent operation -- subsequent invocations of this function have no effect and return `false`.
102+
* Conceptually, its sets a value to a special "close token" and all active and future collector from
103+
* this [Flow] complete either normally (when [cause] is null) or with the corresponding exception that
104+
* was specified as a [cause].
105+
*/
106+
public fun close(cause: Throwable? = null): Boolean
107+
}
108+
109+
/**
110+
* Creates an uninitialized [DataFlow].
111+
*/
112+
@Suppress("FunctionName")
113+
@FlowPreview
114+
public fun <T> DataFlow(): DataFlow<T> = DataFlowImpl(NONE)
115+
116+
/**
117+
* Creates a [DataFlow] with a given initial [value].
118+
*
119+
* This is a shorthand for `DataFlow().apply { this.value = value }`.
120+
*/
121+
@Suppress("FunctionName")
122+
@FlowPreview
123+
public fun <T> DataFlow(value: T): DataFlow<T> = DataFlowImpl(value ?: NULL)
124+
125+
@SharedImmutable
126+
private val NONE = Symbol("NONE")
127+
128+
private const val INITIAL_SIZE = 2 // optimized for just a few collectors
129+
130+
// DataFlow slots are allocated for its collectors
131+
private class DataFlowSlot {
132+
/**
133+
* Each slot can have one of the following states:
134+
*
135+
* * `null` -- it is not used right now. Can [allocate] to new collector.
136+
* * `NONE` -- used by a collector, but neither suspended nor has pending value.
137+
* * `T` | `Closed` -- pending value or closed token. [NULL] is used for `null` values.
138+
* * `CancellableContinuationImpl<T>` -- suspended waiting for value.
139+
*
140+
* It is important that default `null` value is used, because there can be a race between allocation
141+
* of a new slot and trying to do [updateValue] on this slot.
142+
*/
143+
private val _state = atomic<Any?>(null)
144+
private var fresh = false // true when this slot is fresh -- was allocated while update was in process
145+
146+
val isFree: Boolean
147+
get() = _state.value === null
148+
149+
fun allocate(fresh: Boolean, value: Any) {
150+
this.fresh = fresh // write fresh before volatile write to state
151+
val old = _state.getAndSet(value)
152+
assert { old == null }
153+
}
154+
155+
fun free() {
156+
_state.value = null
157+
}
158+
159+
@Suppress("UNCHECKED_CAST")
160+
fun updateValue(value: Any, updateFresh: Boolean, updateStale: Boolean): Unit =
161+
_state.loop { state ->
162+
if (state == null) return // this slot is free - skit it
163+
val wasFresh = fresh // read after volatile read of state
164+
// if we see non-null (allocated) state, then wasFresh correctly reflects what's going on
165+
val update = if (wasFresh) updateFresh else updateStale
166+
if (!update) return // do not update this slot
167+
if (state is CancellableContinuationImpl<*>) {
168+
// this slot is suspended -- resume it
169+
if (_state.compareAndSet(state, NONE)) {
170+
fresh = false // not fresh anymore
171+
(state as CancellableContinuationImpl<Any?>).resume(value)
172+
return
173+
}
174+
} else if (_state.compareAndSet(state, value)) {
175+
// this slot contains previous value (or NONE) -- save new value
176+
fresh = false // not fresh anymore
177+
return
178+
}
179+
}
180+
181+
fun takeValue(): Any = _state.getAndSet(NONE)!!.also {
182+
assert { it !is CancellableContinuationImpl<*> }
183+
}
184+
185+
@Suppress("UNCHECKED_CAST")
186+
suspend fun takeValueSuspend(): Any = suspendCancellableCoroutine sc@ { cont ->
187+
val value = _state.getAndUpdate { state ->
188+
assert { state !is CancellableContinuationImpl<*> }
189+
if (state === NONE) cont else NONE
190+
}
191+
if (value !== NONE) {
192+
// there was a value -- resume now
193+
cont.resume(value!!)
194+
}
195+
// Note: we don't need cont.invokeOnCancellation. free will be called on cancellation anyway
196+
}
197+
}
198+
199+
private class DataFlowImpl<T>(initialValue: Any) : SynchronizedObject(), DataFlow<T>, FusibleFlow<T> {
200+
private val _value = atomic(initialValue) // NONE | T!! | NULL | Closed
201+
private var sequence = 0 // serializes updates, value update is in process when sequence is odd
202+
private var hasFresh = false // true when we have freshly allocated slots during update
203+
private var slots = arrayOfNulls<DataFlowSlot?>(INITIAL_SIZE)
204+
private var nSlots = 0 // number of allocated (!free) slots
205+
private var nextIndex = 0 // oracle for the next free slot index
206+
207+
public override val isInitialized: Boolean
208+
get() = _value.value != NONE
209+
210+
override val isClosed: Boolean
211+
get() = _value.value is Closed
212+
213+
@Suppress("UNCHECKED_CAST")
214+
public override var value: T
215+
get() {
216+
val value = _value.value
217+
check(value !== NONE) { "DataFlow is not initialized" }
218+
check(value !is Closed) { "DataFlow is closed" }
219+
return NULL.unbox(value)
220+
}
221+
set(value) {
222+
require(value !is CancellableContinuationImpl<*>) // just in case
223+
check(update(value ?: NULL)) { "DataFlow is closed" }
224+
}
225+
226+
// Update returns false when the data flow was already closed
227+
private fun update(value: Any): Boolean {
228+
var curSequence = 0
229+
var curSlots: Array<DataFlowSlot?> = this.slots // benign race, we will not use it
230+
var curValue = value
231+
var updateFresh = false
232+
var updateStale = true // update all stale values on first pass
233+
synchronized(this) {
234+
val oldValue = _value.value
235+
if (oldValue is Closed) return false // already closed
236+
if (oldValue == value) return true // Don't do anything if value is not changing
237+
_value.value = value
238+
curSequence = sequence
239+
if (curSequence and 1 == 0) { // even value, quiescent state
240+
curSequence++ // make it odd
241+
sequence = curSequence
242+
} else {
243+
// update is already in process, notify it, and return
244+
sequence = curSequence + 2 // change sequence to notify, keep it odd
245+
return true
246+
}
247+
curSlots = slots // read current reference to collectors under lock
248+
}
249+
// Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
250+
// Loop until we're done firing all the changes (this is sort of simple flat combining)
251+
while (true) {
252+
// Benign race on element read from array
253+
for (col in curSlots) {
254+
col?.updateValue(curValue, updateFresh, updateStale)
255+
}
256+
// check if the value was updated again while we were updating the old one
257+
synchronized(this) {
258+
updateFresh = hasFresh // see if we need a loop to update fresh values
259+
updateStale = sequence != curSequence // see if we need a loop to update stale values again
260+
hasFresh = false // reset fresh flag for next update
261+
if (!updateFresh && !updateStale) { // nothing changed
262+
sequence = curSequence + 1 // make sequence even again
263+
return true // done
264+
}
265+
// reread everything for the next loop under the lock
266+
curSequence = sequence
267+
curSlots = slots
268+
curValue = _value.value
269+
}
270+
}
271+
}
272+
273+
override fun close(cause: Throwable?): Boolean = update(Closed(cause))
274+
275+
@Suppress("UNCHECKED_CAST")
276+
override suspend fun collect(collector: FlowCollector<T>) {
277+
val slot = allocateSlot()
278+
try {
279+
while (true) {
280+
var value = slot.takeValue()
281+
// Note: if takeValueSuspend is cancelled, then it bails out of this loop and calls freeSlot
282+
if (value === NONE) {
283+
value = slot.takeValueSuspend()
284+
assert { value !== NONE }
285+
}
286+
if (value is Closed) {
287+
value.cause?.let { throw it }
288+
break
289+
}
290+
collector.emit(NULL.unbox(value))
291+
}
292+
} finally {
293+
freeSlot(slot)
294+
}
295+
}
296+
297+
private fun allocateSlot(): DataFlowSlot = synchronized(this) {
298+
val size = slots.size
299+
if (nSlots >= size) slots = slots.copyOf(2 * size)
300+
var index = nextIndex
301+
var slot: DataFlowSlot
302+
while (true) {
303+
slot = slots[index] ?: DataFlowSlot().also { slots[index] = it }
304+
index++
305+
if (index >= slots.size) index = 0
306+
if (slot.isFree) break
307+
}
308+
// We allocate slot with existing value only when the value is not currently being updated.
309+
// Otherwise update may or may not see the newly allocated slot, so we mark it as "fresh"
310+
// and let update loop again and deliver the value to the fresh slots exactly once.
311+
val fresh = sequence and 1 != 0
312+
val value = if (fresh) NONE else _value.value
313+
if (fresh) hasFresh = true // force update to loop again
314+
slot.allocate(fresh, value)
315+
nextIndex = index
316+
nSlots++
317+
slot
318+
}
319+
320+
private fun freeSlot(slot: DataFlowSlot): Unit = synchronized(this) {
321+
slot.free()
322+
nSlots--
323+
}
324+
325+
override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> {
326+
// context is irrelevant for data flow and it is always conflated
327+
// so it should not do anything unless buffering is requested
328+
return when (capacity) {
329+
Channel.CONFLATED, Channel.RENDEZVOUS -> this
330+
else -> ChannelFlowOperatorImpl(this, context, capacity)
331+
}
332+
}
333+
334+
private class Closed(@JvmField val cause: Throwable?)
335+
}

0 commit comments

Comments
 (0)