Skip to content

Commit 36a0ab3

Browse files
committed
DataFlow implementation
DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of DataFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes #1082
1 parent 693142c commit 36a0ab3

File tree

6 files changed

+589
-14
lines changed

6 files changed

+589
-14
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+27-3
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,23 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/
791791
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
792792
}
793793

794+
public abstract interface class kotlinx/coroutines/flow/DataFlow : kotlinx/coroutines/flow/Flow {
795+
public abstract fun close (Ljava/lang/Throwable;)Z
796+
public abstract fun getValue ()Ljava/lang/Object;
797+
public abstract fun isClosed ()Z
798+
public abstract fun isInitialized ()Z
799+
public abstract fun setValue (Ljava/lang/Object;)V
800+
}
801+
802+
public final class kotlinx/coroutines/flow/DataFlow$DefaultImpls {
803+
public static synthetic fun close$default (Lkotlinx/coroutines/flow/DataFlow;Ljava/lang/Throwable;ILjava/lang/Object;)Z
804+
}
805+
806+
public final class kotlinx/coroutines/flow/DataFlowKt {
807+
public static final fun DataFlow ()Lkotlinx/coroutines/flow/DataFlow;
808+
public static final fun DataFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/DataFlow;
809+
}
810+
794811
public abstract interface class kotlinx/coroutines/flow/Flow {
795812
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
796813
}
@@ -924,7 +941,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
924941
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
925942
}
926943

927-
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow {
944+
public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow {
928945
public final field capacity I
929946
public final field context Lkotlin/coroutines/CoroutineContext;
930947
public fun <init> (Lkotlin/coroutines/CoroutineContext;I)V
@@ -933,10 +950,17 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
933950
public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
934951
protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
935952
protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
953+
public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
936954
public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
937955
public fun toString ()Ljava/lang/String;
938-
public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow;
939-
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
956+
}
957+
958+
public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow {
959+
public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow;
960+
}
961+
962+
public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls {
963+
public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow;
940964
}
941965

942966
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow
6+
7+
import kotlinx.atomicfu.*
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.channels.*
10+
import kotlinx.coroutines.flow.internal.*
11+
import kotlinx.coroutines.internal.*
12+
import kotlin.coroutines.*
13+
import kotlin.jvm.*
14+
15+
/**
16+
* A [Flow] that contains a single updatable data [value].
17+
*
18+
* A data flow can be created with `DataFlow()` constructor function either without an initial value
19+
* or with an initial value. Data value can be updated by setting its [value] property.
20+
* Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector will skip fast updates,
21+
* but always collects the most recently emitted value.
22+
*
23+
* The name reflects the fact that [DataFlow] represent an cell in a "data flow programming" model
24+
* (think of an electronic spreadsheet). Dependent values can be defined by using various operators on the
25+
* flows, with [combineLatest] being especially useful to combine multiple data flows using arbitrary functions.
26+
*
27+
* A data flow can be [closed][close] and all its collectors will complete.
28+
*
29+
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data.
30+
* The ability to read the current [value] is provided only for convenience of the code that updates the data.
31+
* For example, the following class encapsulates an integer data flow and increments its value on each call to `inc`:
32+
*
33+
* ```
34+
* class CounterModel {
35+
* // private data flow
36+
* private val _counter = DataFlow(0)
37+
* // publicly exposed as a flow
38+
* val counter: Flow<Int> get() = _counter
39+
*
40+
* fun inc() {
41+
* _counter.value++
42+
* }
43+
* }
44+
* ```
45+
*
46+
* Having two instances of the above `CounterModel` class one can define the sum of their counters like this:
47+
*
48+
* ```
49+
* val aModel = CounterModel()
50+
* val bModel = CounterModel()
51+
* val sum = aModel.counter.combineLatest(bModel.counter) { a, b -> a + b }
52+
* ```
53+
*
54+
* Conceptually data flow is similar to
55+
* [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel]
56+
* but it is simpler, because it does not have to implement all the channel APIs.
57+
*
58+
* All methods of data flow are **thread-safe** and can be safely invoked from concurrent coroutines without
59+
* external synchronization.
60+
*
61+
* ### Operator fusion
62+
*
63+
* Application of [flowOn], [conflate], and
64+
* [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity
65+
* has no effect on the data flow.
66+
*
67+
* ### Implementation notes
68+
*
69+
* Data flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure
70+
* thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when
71+
* using unconfined coroutines. Adding new collectors has `O(1)` amortized code, but updating a [value] has `O(N)`
72+
* cost, where `N` is the number of collectors.
73+
*/
74+
@FlowPreview
75+
public interface DataFlow<T> : Flow<T> {
76+
/**
77+
* Returns `true` if this data flow is initialized.
78+
*
79+
* A data flow is initialized when its [value] was set at least once.
80+
* A [closed][isClosed] data flow is considered to be initialized.
81+
*/
82+
public val isInitialized: Boolean
83+
84+
/**
85+
* Returns `true` if this data flow is closed.
86+
*
87+
* See [close].
88+
*/
89+
public val isClosed: Boolean
90+
91+
/**
92+
* Value of this data flow.
93+
*
94+
* Setting a value that is [equal][Any.equals] to the previous one does nothing.
95+
*
96+
* Getting a value from an [uninitialized][isInitialized] flow produces [IllegalStateException].
97+
* Getting a value from or setting a value to a [closed][isClosed] flow produces [IllegalStateException].
98+
*/
99+
public var value: T
100+
101+
/**
102+
* Closes this data fl9w.
103+
* This is an idempotent operation -- subsequent invocations of this function have no effect and return `false`.
104+
* Conceptually, its sets a value to a special "close token" and all active and future collector from
105+
* this [Flow] complete either normally (when [cause] is null) or with the corresponding exception that
106+
* was specified as a [cause].
107+
*/
108+
public fun close(cause: Throwable? = null): Boolean
109+
}
110+
111+
/**
112+
* Creates an uninitialized [DataFlow].
113+
*/
114+
@Suppress("FunctionName")
115+
@FlowPreview
116+
public fun <T> DataFlow(): DataFlow<T> = DataFlowImpl(NONE)
117+
118+
/**
119+
* Creates a [DataFlow] with a given initial [value].
120+
*
121+
* This is a shorthand for `DataFlow().apply { this.value = value }`.
122+
*/
123+
@Suppress("FunctionName")
124+
@FlowPreview
125+
public fun <T> DataFlow(value: T): DataFlow<T> = DataFlowImpl(value ?: NULL)
126+
127+
@SharedImmutable
128+
private val NONE = Symbol("NONE")
129+
130+
private const val INITIAL_SIZE = 2 // optimized for just a few collectors
131+
132+
// DataFlow slots are allocated for its collectors
133+
private class DataFlowSlot {
134+
/**
135+
* Each slot can have one of the following states:
136+
*
137+
* * `null` -- it is not used right now. Can [allocate] to new collector.
138+
* * `NONE` -- used by a collector, but neither suspended nor has pending value.
139+
* * `T` | `Closed` -- pending value or closed token. [NULL] is used for `null` values.
140+
* * `CancellableContinuationImpl<T>` -- suspended waiting for value.
141+
*
142+
* It is important that default `null` value is used, because there can be a race between allocation
143+
* of a new slot and trying to do [updateValue] on this slot.
144+
*/
145+
private val _state = atomic<Any?>(null)
146+
private var fresh = false // true when this slot is fresh -- was allocated while update was in process
147+
148+
val isFree: Boolean
149+
get() = _state.value === null
150+
151+
fun allocate(fresh: Boolean, value: Any) {
152+
this.fresh = fresh // write fresh before volatile write to state
153+
val old = _state.getAndSet(value)
154+
assert { old == null }
155+
}
156+
157+
fun free() {
158+
_state.value = null
159+
}
160+
161+
@Suppress("UNCHECKED_CAST")
162+
fun updateValue(value: Any, updateFresh: Boolean, updateStale: Boolean): Unit =
163+
_state.loop { state ->
164+
if (state == null) return // this slot is free - skit it
165+
val wasFresh = fresh // read after volatile read of state
166+
// if we see non-null (allocated) state, then wasFresh correctly reflects what's going on
167+
val update = if (wasFresh) updateFresh else updateStale
168+
if (!update) return // do not update this slot
169+
if (state is CancellableContinuationImpl<*>) {
170+
// this slot is suspended -- resume it
171+
if (_state.compareAndSet(state, NONE)) {
172+
fresh = false // not fresh anymore
173+
(state as CancellableContinuationImpl<Any?>).resume(value)
174+
return
175+
}
176+
} else if (_state.compareAndSet(state, value)) {
177+
// this slot contains previous value (or NONE) -- save new value
178+
fresh = false // not fresh anymore
179+
return
180+
}
181+
}
182+
183+
fun takeValue(): Any = _state.getAndSet(NONE)!!.also {
184+
assert { it !is CancellableContinuationImpl<*> }
185+
}
186+
187+
@Suppress("UNCHECKED_CAST")
188+
suspend fun takeValueSuspend(): Any = suspendCancellableCoroutine sc@ { cont ->
189+
val value = _state.getAndUpdate { state ->
190+
assert { state !is CancellableContinuationImpl<*> }
191+
if (state === NONE) cont else NONE
192+
}
193+
if (value !== NONE) {
194+
// there was a value -- resume now
195+
cont.resume(value!!)
196+
}
197+
// Note: we don't need cont.invokeOnCancellation. free will be called on cancellation anyway
198+
}
199+
}
200+
201+
private class DataFlowImpl<T>(initialValue: Any) : SynchronizedObject(), DataFlow<T>, FusibleFlow<T> {
202+
private val _value = atomic(initialValue) // NONE | T!! | NULL | Closed
203+
private var sequence = 0 // serializes updates, value update is in process when sequence is odd
204+
private var hasFresh = false // true when we have freshly allocated slots during update
205+
private var slots = arrayOfNulls<DataFlowSlot?>(INITIAL_SIZE)
206+
private var nSlots = 0 // number of allocated (!free) slots
207+
private var nextIndex = 0 // oracle for the next free slot index
208+
209+
public override val isInitialized: Boolean
210+
get() = _value.value != NONE
211+
212+
override val isClosed: Boolean
213+
get() = _value.value is Closed
214+
215+
@Suppress("UNCHECKED_CAST")
216+
public override var value: T
217+
get() {
218+
val value = _value.value
219+
check(value !== NONE) { "DataFlow is not initialized" }
220+
check(value !is Closed) { "DataFlow is closed" }
221+
return NULL.unbox(value)
222+
}
223+
set(value) {
224+
require(value !is CancellableContinuationImpl<*>) // just in case
225+
check(update(value ?: NULL)) { "DataFlow is closed" }
226+
}
227+
228+
// Update returns false when the data flow was already closed
229+
private fun update(value: Any): Boolean {
230+
var curSequence = 0
231+
var curSlots: Array<DataFlowSlot?> = this.slots // benign race, we will not use it
232+
var curValue = value
233+
var updateFresh = false
234+
var updateStale = true // update all stale values on first pass
235+
synchronized(this) {
236+
val oldValue = _value.value
237+
if (oldValue is Closed) return false // already closed
238+
if (oldValue == value) return true // Don't do anything if value is not changing
239+
_value.value = value
240+
curSequence = sequence
241+
if (curSequence and 1 == 0) { // even value, quiescent state
242+
curSequence++ // make it odd
243+
sequence = curSequence
244+
} else {
245+
// update is already in process, notify it, and return
246+
sequence = curSequence + 2 // change sequence to notify, keep it odd
247+
return true
248+
}
249+
curSlots = slots // read current reference to collectors under lock
250+
}
251+
// Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
252+
// Loop until we're done firing all the changes (this is sort of simple flat combining)
253+
while (true) {
254+
// Benign race on element read from array
255+
for (col in curSlots) {
256+
col?.updateValue(curValue, updateFresh, updateStale)
257+
}
258+
// check if the value was updated again while we were updating the old one
259+
synchronized(this) {
260+
updateFresh = hasFresh // see if we need a loop to update fresh values
261+
updateStale = sequence != curSequence // see if we need a loop to update stale values again
262+
hasFresh = false // reset fresh flag for next update
263+
if (!updateFresh && !updateStale) { // nothing changed
264+
sequence = curSequence + 1 // make sequence even again
265+
return true // done
266+
}
267+
// reread everything for the next loop under the lock
268+
curSequence = sequence
269+
curSlots = slots
270+
curValue = _value.value
271+
}
272+
}
273+
}
274+
275+
override fun close(cause: Throwable?): Boolean = update(Closed(cause))
276+
277+
@Suppress("UNCHECKED_CAST")
278+
override suspend fun collect(collector: FlowCollector<T>) {
279+
val slot = allocateSlot()
280+
try {
281+
while (true) {
282+
var value = slot.takeValue()
283+
// Note: if takeValueSuspend is cancelled, then it bails out of this loop and calls freeSlot
284+
if (value === NONE) {
285+
value = slot.takeValueSuspend()
286+
assert { value !== NONE }
287+
}
288+
if (value is Closed) {
289+
value.cause?.let { throw it }
290+
break
291+
}
292+
collector.emit(NULL.unbox(value))
293+
}
294+
} finally {
295+
freeSlot(slot)
296+
}
297+
}
298+
299+
private fun allocateSlot(): DataFlowSlot = synchronized(this) {
300+
val size = slots.size
301+
if (nSlots >= size) slots = slots.copyOf(2 * size)
302+
var index = nextIndex
303+
var slot: DataFlowSlot
304+
while (true) {
305+
slot = slots[index] ?: DataFlowSlot().also { slots[index] = it }
306+
index++
307+
if (index >= slots.size) index = 0
308+
if (slot.isFree) break
309+
}
310+
// We allocate slot with existing value only when the value is not currently being updated.
311+
// Otherwise update may or may not see the newly allocated slot, so we mark it as "fresh"
312+
// and let update loop again and deliver the value to the fresh slots exactly once.
313+
val fresh = sequence and 1 != 0
314+
val value = if (fresh) NONE else _value.value
315+
if (fresh) hasFresh = true // force update to loop again
316+
slot.allocate(fresh, value)
317+
nextIndex = index
318+
nSlots++
319+
slot
320+
}
321+
322+
private fun freeSlot(slot: DataFlowSlot): Unit = synchronized(this) {
323+
slot.free()
324+
nSlots--
325+
}
326+
327+
override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow<T> {
328+
// context is irrelevant for data flow and it is always conflated
329+
// so it should not do anything unless buffering is requested
330+
return when (capacity) {
331+
Channel.CONFLATED, Channel.RENDEZVOUS -> this
332+
else -> ChannelFlowOperatorImpl(this, context, capacity)
333+
}
334+
}
335+
336+
private class Closed(@JvmField val cause: Throwable?)
337+
}

0 commit comments

Comments
 (0)