Skip to content

Commit c02c44c

Browse files
author
Rick Busarow
committed
shareIn and cache operators
Fixes Kotlin#1261
1 parent a930b0c commit c02c44c

File tree

5 files changed

+646
-0
lines changed

5 files changed

+646
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines.flow.internal
6+
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.channels.*
9+
import kotlinx.coroutines.flow.*
10+
import kotlinx.coroutines.internal.*
11+
import kotlinx.coroutines.sync.*
12+
13+
internal fun <T> Flow<T>.asCachedFlow(
14+
cacheHistory: Int
15+
): Flow<T> {
16+
17+
require(cacheHistory > 0) { "cacheHistory parameter must be greater than 0, but was $cacheHistory" }
18+
19+
val cache = CircularArray<T>(cacheHistory)
20+
21+
return onEach { value ->
22+
// While flowing, also record all values in the cache.
23+
cache.add(value)
24+
}.onStart {
25+
// Before emitting any values in sourceFlow,
26+
// emit any cached values starting with the oldest.
27+
cache.forEach { emit(it) }
28+
}
29+
}
30+
31+
internal fun <T> Flow<T>.asSharedFlow(
32+
scope: CoroutineScope, cacheHistory: Int
33+
): Flow<T> = SharedFlow(this, scope, cacheHistory)
34+
35+
/**
36+
* An auto-resetting [broadcast] flow. It tracks the number of active collectors, and automatically resets when
37+
* the number reaches 0.
38+
*
39+
* `SharedFlow` has an optional [cache], where the latest _n_ elements emitted by the source Flow will be replayed to
40+
* late collectors.
41+
*
42+
* ### Upon reset
43+
* 1) The underlying [BroadcastChannel] is closed. A new BroadcastChannel will be created when a new collector starts.
44+
* 2) The cache is reset. New collectors will not receive values from before the reset, but will generate a new cache.
45+
*/
46+
internal class SharedFlow<T>(
47+
private val sourceFlow: Flow<T>,
48+
private val scope: CoroutineScope,
49+
private val cacheHistory: Int
50+
) : Flow<T> {
51+
52+
private var refCount = 0
53+
private var cache = CircularArray<T>(cacheHistory)
54+
private val mutex = Mutex(false)
55+
56+
init {
57+
require(cacheHistory >= 0) { "cacheHistory parameter must be at least 0, but was $cacheHistory" }
58+
}
59+
60+
public override suspend fun collect(
61+
collector: FlowCollector<T>
62+
) = collector.emitAll(createFlow())
63+
64+
// Replay happens per new collector, if cacheHistory > 0.
65+
private suspend fun createFlow(): Flow<T> = getChannel()
66+
.asFlow()
67+
.replayIfNeeded()
68+
.onCompletion { onCollectEnd() }
69+
70+
// lazy holder for the BroadcastChannel, which is reset whenever all collection ends
71+
private var lazyChannelRef = createLazyChannel()
72+
73+
// must be lazy so that the broadcast doesn't begin immediately after a reset
74+
private fun createLazyChannel() = lazy(LazyThreadSafetyMode.NONE) {
75+
sourceFlow.cacheIfNeeded()
76+
.broadcastIn(scope)
77+
}
78+
79+
private fun Flow<T>.replayIfNeeded(): Flow<T> = if (cacheHistory > 0) {
80+
onStart {
81+
cache.forEach {
82+
emit(it)
83+
}
84+
}
85+
} else this
86+
87+
private fun Flow<T>.cacheIfNeeded(): Flow<T> = if (cacheHistory > 0) {
88+
onEach { value ->
89+
// While flowing, also record all values in the cache.
90+
cache.add(value)
91+
}
92+
} else this
93+
94+
private fun reset() {
95+
cache = CircularArray(cacheHistory)
96+
lazyChannelRef = createLazyChannel()
97+
}
98+
99+
private suspend fun onCollectEnd() = mutex.withLock { if (--refCount == 0) reset() }
100+
private suspend fun getChannel(): BroadcastChannel<T> = mutex.withLock {
101+
refCount++
102+
lazyChannelRef.value
103+
}
104+
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
@file:JvmMultifileClass
6+
@file:JvmName("FlowKt")
7+
8+
package kotlinx.coroutines.flow
9+
10+
import kotlinx.coroutines.*
11+
import kotlinx.coroutines.channels.*
12+
import kotlinx.coroutines.flow.internal.*
13+
import kotlin.jvm.*
14+
15+
/**
16+
* A "cached" [Flow] which will record the last [history] collected values.
17+
*
18+
* When a collector begins collecting after values have already been recorded,
19+
* those values will be collected *before* values from the receiver [Flow] are collected.
20+
*
21+
* example:
22+
* ```Kotlin
23+
* val ints = flowOf(1, 2, 3, 4).cache(2) // cache the last 2 values
24+
*
25+
* ints.take(4).collect { ... } // 4 values are emitted, but also recorded. The last 2 remain.
26+
*
27+
* ints.collect { ... } // collects [3, 4, 1, 2, 3, 4]
28+
* ```
29+
*
30+
* Throws [IllegalArgumentException] if size parameter is not greater than 0
31+
*
32+
* @param history the number of items to keep in the [Flow]'s history -- must be greater than 0
33+
*/
34+
@FlowPreview
35+
public fun <T> Flow<T>.cache(history: Int): Flow<T> = asCachedFlow(history)
36+
37+
/**
38+
* Creates a [broadcast] coroutine which collects the [Flow] receiver and shares with multiple collectors.
39+
*
40+
* A [BroadcastChannel] with [default][Channel.Factory.BUFFERED] buffer size is created.
41+
* Use [buffer] operator on the flow before calling `shareIn` to specify a value other than
42+
* default and to control what happens when data is produced faster than it is consumed,
43+
* that is to control back-pressure behavior.
44+
*
45+
* Concurrent collectors will all collect from a single [broadcast] flow. This flow will be cancelled automatically
46+
* when it is no longer being collected, and the underlying channel will be closed.
47+
*
48+
* If a new collector is added after the channel has been closed, a new channel will be created.
49+
*
50+
* By default, this flow is effectively **stateless** in that collectors will only receive values emitted after collection begins.
51+
*
52+
* example:
53+
*
54+
* ```
55+
* val sourceFlow = flowOf(1, 2, 3, 4, 5)
56+
* .onStart { println("start source") }
57+
* .onEach { println("emit $it") }
58+
* .onCompletion { println("complete source") }
59+
* .shareIn(this)
60+
*
61+
* val a = async { sourceFlow.toList() }
62+
* val b = async { sourceFlow.toList() } // collect concurrently
63+
*
64+
* println(a.await())
65+
* println(b.await())
66+
*
67+
* println("** break **")
68+
*
69+
* println(sourceFlow.toList())
70+
*
71+
* prints:
72+
*
73+
* start source
74+
* emit 1
75+
* emit 2
76+
* emit 3
77+
* emit 4
78+
* emit 5
79+
* complete source
80+
* [1, 2, 3, 4, 5]
81+
* [1, 2, 3, 4, 5]
82+
* ** break **
83+
* start source
84+
* emit 1
85+
* emit 2
86+
* emit 3
87+
* emit 4
88+
* emit 5
89+
* complete source
90+
* [1, 2, 3, 4, 5]
91+
*
92+
* ```
93+
* ### Caching
94+
*
95+
* When a shared flow is cached, the values are recorded as they are emitted from the source Flow.
96+
* They are then replayed for each new subscriber.
97+
*
98+
* When a shared flow is reset, the cached values are cleared.
99+
*
100+
* example:
101+
*
102+
* ```
103+
* val sourceFlow = flowOf(1, 2, 3, 4, 5)
104+
* .onEach {
105+
* delay(50)
106+
* println("emit $it")
107+
* }.shareIn(this, 1)
108+
*
109+
* val a = async { sourceFlow.toList() }
110+
* delay(125)
111+
* val b = async { sourceFlow.toList() } // begin collecting after "emit 3"
112+
*
113+
* println(a.await())
114+
* println(b.await())
115+
*
116+
* println("** break **")
117+
*
118+
* println(sourceFlow.toList()) // the shared flow has been reset, so the cached values are cleared
119+
*
120+
* prints:
121+
*
122+
* emit 1
123+
* emit 2
124+
* emit 3
125+
* emit 4
126+
* emit 5
127+
* [1, 2, 3, 4, 5]
128+
* [2, 3, 4, 5]
129+
* ** break **
130+
* emit 1
131+
* emit 2
132+
* emit 3
133+
* emit 4
134+
* emit 5
135+
* [1, 2, 3, 4, 5]
136+
*
137+
* ```
138+
*
139+
* In order to have cached values persist across resets, use `cache(n)` before `shareIn(...)`.
140+
*
141+
* example:
142+
*
143+
* ```
144+
* // resets cache whenever the Flow is reset
145+
* flowOf(1, 2, 3).shareIn(myScope, 3)
146+
*
147+
* // persists cache across resets
148+
* flowOf(1, 2, 3).cached(3).shareIn(myScope)
149+
* ```
150+
*
151+
* ### Cancellation semantics
152+
* 1) Flow consumer is cancelled when the original channel is cancelled.
153+
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
154+
* 3) Collection is cancelled when the (scope)[CoroutineScope] parameter is cancelled,
155+
* thereby ending the consumer when it has run out of elements.
156+
* 4) If the flow consumer fails with an exception, subscription is cancelled.
157+
*
158+
* @param scope The [CoroutineScope] used to create the [broadcast] coroutine. Cancellation of this scope
159+
* will close the underlying [BroadcastChannel].
160+
* @param cacheHistory (default = 0). Any value greater than zero will add a [cache] to the shared Flow.
161+
*
162+
*/
163+
@FlowPreview
164+
fun <T> Flow<T>.shareIn(
165+
scope: CoroutineScope, cacheHistory: Int = 0
166+
): Flow<T> = asSharedFlow(scope, cacheHistory)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package kotlinx.coroutines.internal
2+
3+
import kotlinx.coroutines.*
4+
5+
6+
/**
7+
* CircularArray implementation which will hold the latest of up to `size` elements.
8+
*
9+
* After the cache has been filled, all further additions will overwrite the least recent value.
10+
*
11+
* @param size the maximum number of elements to store in the array
12+
*/
13+
internal class CircularArray<T>(size: Int) : Iterable<T> {
14+
15+
private val array: Array<Any?> = arrayOfNulls(size)
16+
private var count: Int = 0
17+
private var tail: Int = -1
18+
19+
/**
20+
* Adds [item] to the [CircularArray].
21+
*
22+
* If the `CircularArray` has not yet been filled,
23+
* `item` will simply be added to the next available slot.
24+
*
25+
* If the `CircularArray` has already been filled,
26+
* `item` will replace the oldest item already in the array.
27+
*
28+
* example:
29+
* ```
30+
* val ca = CircularArray<T>(3)
31+
*
32+
* ca.add(0) // ca contents : [0, null, null]
33+
* ca.add(1) // ca contents : [0, 1, null]
34+
* ca.add(2) // ca contents : [0, 1, 2]
35+
* // overwrite the oldest value
36+
* ca.add(3) // ca contents : [3, 1, 2]
37+
* ```
38+
*/
39+
public fun add(item: T) {
40+
tail = (tail + 1) % array.size
41+
array[tail] = item
42+
if (count < array.size) count++
43+
}
44+
45+
/**
46+
* Iterates over the [CircularArray].
47+
*
48+
* Order is always first-in-first-out, with the oldest item being used first.
49+
*
50+
* example:
51+
* ```
52+
* val ca = CircularArray<Int>(3)
53+
*
54+
* ca.add(0) // ca contents : [0, null, null]
55+
* ca.add(1) // ca contents : [0, 1, null]
56+
* ca.add(2) // ca contents : [0, 1, 2]
57+
* // overwrite the oldest value
58+
* ca.add(3) // ca contents : [3, 1, 2]
59+
*
60+
* ca.forEach { ... } // order : [1, 2, 3]
61+
* ```
62+
*/
63+
public override fun iterator(): Iterator<T> = object : Iterator<T> {
64+
private val arraySnapshot = array.copyOf()
65+
private val tailSnapshot = tail
66+
67+
private var _index = 0
68+
69+
private val head: Int
70+
get() = when (count) {
71+
arraySnapshot.size -> (tailSnapshot + 1) % count
72+
else -> 0
73+
}
74+
75+
@Suppress("UNCHECKED_CAST")
76+
private fun get(index: Int): T = when (count) {
77+
arraySnapshot.size -> arraySnapshot[(head + index) % arraySnapshot.size] as T
78+
else -> arraySnapshot[index] as T
79+
}
80+
81+
override fun hasNext(): Boolean = _index < count
82+
override fun next(): T = get(_index++)
83+
84+
}
85+
86+
public override fun toString(): String = "$classSimpleName[array=${contentToString()}]"
87+
88+
private fun contentToString(): String = joinToString { "$it" }
89+
}

0 commit comments

Comments
 (0)