Skip to content

Commit bc46e8d

Browse files
committed
Promote the bare minimum Flow API to stable for incoming 1.3.0-RC
* Extract SafeFlow for nicer stacktraces
1 parent 693142c commit bc46e8d

File tree

10 files changed

+7
-43
lines changed

10 files changed

+7
-43
lines changed

kotlinx-coroutines-core/common/src/flow/Builders.kt

+6-17
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ import kotlin.jvm.*
4444
* ```
4545
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
4646
*/
47-
@ExperimentalCoroutinesApi
48-
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
49-
return object : Flow<T> {
50-
override suspend fun collect(collector: FlowCollector<T>) {
51-
SafeCollector(collector, coroutineContext).block()
52-
}
47+
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
48+
49+
// Named anonymous object
50+
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
51+
override suspend fun collect(collector: FlowCollector<T>) {
52+
SafeCollector(collector, coroutineContext).block()
5353
}
5454
}
5555

@@ -90,7 +90,6 @@ public fun <T> (suspend () -> T).asFlow(): Flow<T> = unsafeFlow {
9090
/**
9191
* Creates a flow that produces values from the given iterable.
9292
*/
93-
@ExperimentalCoroutinesApi
9493
public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
9594
forEach { value ->
9695
emit(value)
@@ -100,7 +99,6 @@ public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
10099
/**
101100
* Creates a flow that produces values from the given iterable.
102101
*/
103-
@ExperimentalCoroutinesApi
104102
public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
105103
forEach { value ->
106104
emit(value)
@@ -110,7 +108,6 @@ public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
110108
/**
111109
* Creates a flow that produces values from the given sequence.
112110
*/
113-
@ExperimentalCoroutinesApi
114111
public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
115112
forEach { value ->
116113
emit(value)
@@ -120,7 +117,6 @@ public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
120117
/**
121118
* Creates a flow that produces values from the given array of elements.
122119
*/
123-
@ExperimentalCoroutinesApi
124120
public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
125121
for (element in elements) {
126122
emit(element)
@@ -130,7 +126,6 @@ public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
130126
/**
131127
* Creates flow that produces a given [value].
132128
*/
133-
@ExperimentalCoroutinesApi
134129
public fun <T> flowOf(value: T): Flow<T> = unsafeFlow {
135130
/*
136131
* Implementation note: this is just an "optimized" overload of flowOf(vararg)
@@ -142,7 +137,6 @@ public fun <T> flowOf(value: T): Flow<T> = unsafeFlow {
142137
/**
143138
* Returns an empty flow.
144139
*/
145-
@ExperimentalCoroutinesApi
146140
public fun <T> emptyFlow(): Flow<T> = EmptyFlow
147141

148142
private object EmptyFlow : Flow<Nothing> {
@@ -152,7 +146,6 @@ private object EmptyFlow : Flow<Nothing> {
152146
/**
153147
* Creates a flow that produces values from the given array.
154148
*/
155-
@ExperimentalCoroutinesApi
156149
public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
157150
forEach { value ->
158151
emit(value)
@@ -162,7 +155,6 @@ public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
162155
/**
163156
* Creates flow that produces values from the given array.
164157
*/
165-
@ExperimentalCoroutinesApi
166158
public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
167159
forEach { value ->
168160
emit(value)
@@ -172,7 +164,6 @@ public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
172164
/**
173165
* Creates flow that produces values from the given array.
174166
*/
175-
@ExperimentalCoroutinesApi
176167
public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
177168
forEach { value ->
178169
emit(value)
@@ -182,7 +173,6 @@ public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
182173
/**
183174
* Creates flow that produces values from the given range.
184175
*/
185-
@ExperimentalCoroutinesApi
186176
public fun IntRange.asFlow(): Flow<Int> = unsafeFlow {
187177
forEach { value ->
188178
emit(value)
@@ -192,7 +182,6 @@ public fun IntRange.asFlow(): Flow<Int> = unsafeFlow {
192182
/**
193183
* Creates flow that produces values from the given range.
194184
*/
195-
@ExperimentalCoroutinesApi
196185
public fun LongRange.asFlow(): Flow<Long> = flow {
197186
forEach { value ->
198187
emit(value)

kotlinx-coroutines-core/common/src/flow/Flow.kt

-2
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ import kotlin.coroutines.*
157157
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
158158
* reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
159159
*/
160-
@ExperimentalCoroutinesApi
161160
public interface Flow<out T> {
162161
/**
163162
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
@@ -195,7 +194,6 @@ public interface Flow<out T> {
195194
* }
196195
* ```
197196
*/
198-
@FlowPreview
199197
public abstract class AbstractFlow<T> : Flow<T> {
200198

201199
@InternalCoroutinesApi

kotlinx-coroutines-core/common/src/flow/FlowCollector.kt

-3
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44

55
package kotlinx.coroutines.flow
66

7-
import kotlinx.coroutines.*
8-
97
/**
108
* [FlowCollector] is used as an intermediate or a terminal collector of the flow and represents
119
* an entity that accepts values emitted by the [Flow].
1210
*
1311
* This interface should usually not be implemented directly, but rather used as a receiver in a [flow] builder when implementing a custom operator.
1412
* Implementations of this interface are not thread-safe.
1513
*/
16-
@ExperimentalCoroutinesApi
1714
public interface FlowCollector<in T> {
1815

1916
/**

kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt

-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
package kotlinx.coroutines.flow.internal
66

7-
import kotlinx.coroutines.flow.*
8-
97
internal object NopCollector : ConcurrentFlowCollector<Any?> {
108
override suspend fun emit(value: Any?) {
119
// does nothing

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

-1
Original file line numberDiff line numberDiff line change
@@ -265,4 +265,3 @@ private fun checkFlowContext(context: CoroutineContext) {
265265
"Flow context cannot contain job in it. Had $context"
266266
}
267267
}
268-

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

-7
Original file line numberDiff line numberDiff line change
@@ -17,46 +17,40 @@ import kotlinx.coroutines.flow.unsafeTransform as transform
1717
/**
1818
* Returns a flow containing only values of the original flow that matches the given [predicate].
1919
*/
20-
@ExperimentalCoroutinesApi
2120
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
2221
if (predicate(value)) return@transform emit(value)
2322
}
2423

2524
/**
2625
* Returns a flow containing only values of the original flow that do not match the given [predicate].
2726
*/
28-
@ExperimentalCoroutinesApi
2927
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
3028
if (!predicate(value)) return@transform emit(value)
3129
}
3230

3331
/**
3432
* Returns a flow containing only values that are instances of specified type [R].
3533
*/
36-
@ExperimentalCoroutinesApi
3734
@Suppress("UNCHECKED_CAST")
3835
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
3936

4037
/**
4138
* Returns a flow containing only values of the original flow that are not null.
4239
*/
43-
@ExperimentalCoroutinesApi
4440
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
4541
if (value != null) return@transform emit(value)
4642
}
4743

4844
/**
4945
* Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
5046
*/
51-
@ExperimentalCoroutinesApi
5247
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
5348
return@transform emit(transform(value))
5449
}
5550

5651
/**
5752
* Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
5853
*/
59-
@ExperimentalCoroutinesApi
6054
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
6155
val transformed = transform(value) ?: return@transform
6256
return@transform emit(transformed)
@@ -65,7 +59,6 @@ public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend
6559
/**
6660
* Returns a flow which performs the given [action] on each value of the original flow.
6761
*/
68-
@ExperimentalCoroutinesApi
6962
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
7063
action(value)
7164
return@transform emit(value)

kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt

-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import kotlin.jvm.*
2727
* .collect() // trigger collection of the flow
2828
* ```
2929
*/
30-
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
3130
public suspend fun Flow<*>.collect() = collect(NopCollector)
3231

3332
/**
@@ -69,7 +68,6 @@ public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
6968
* }
7069
* ```
7170
*/
72-
@ExperimentalCoroutinesApi
7371
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
7472
collect(object : FlowCollector<T> {
7573
override suspend fun emit(value: T) = action(value)

kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt

-4
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,21 @@
77

88
package kotlinx.coroutines.flow
99

10-
import kotlinx.coroutines.*
1110
import kotlin.jvm.*
1211

1312
/**
1413
* Collects given flow into a [destination]
1514
*/
16-
@ExperimentalCoroutinesApi
1715
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T> = toCollection(destination)
1816

1917
/**
2018
* Collects given flow into a [destination]
2119
*/
22-
@ExperimentalCoroutinesApi
2320
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T> = toCollection(destination)
2421

2522
/**
2623
* Collects given flow into a [destination]
2724
*/
28-
@ExperimentalCoroutinesApi
2925
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C {
3026
collect { value ->
3127
destination.add(value)

kotlinx-coroutines-core/common/src/flow/terminal/Count.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public suspend fun <T> Flow<T>.count(): Int {
2727
* Returns the number of elements matching the given predicate.
2828
*/
2929
@ExperimentalCoroutinesApi
30-
public suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int {
30+
public suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int {
3131
var i = 0
3232
collect { value ->
3333
if (predicate(value)) {

kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt

-4
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public suspend inline fun <T, R> Flow<T>.fold(
5555
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
5656
* that contains more than one element.
5757
*/
58-
@ExperimentalCoroutinesApi
5958
public suspend fun <T> Flow<T>.single(): T {
6059
var result: Any? = NULL
6160
collect { value ->
@@ -72,7 +71,6 @@ public suspend fun <T> Flow<T>.single(): T {
7271
* The terminal operator, that awaits for one and only one value to be published.
7372
* Throws [IllegalStateException] for flow that contains more than one element.
7473
*/
75-
@ExperimentalCoroutinesApi
7674
public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
7775
var result: T? = null
7876
collect { value ->
@@ -87,7 +85,6 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
8785
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
8886
* Throws [NoSuchElementException] if the flow was empty.
8987
*/
90-
@ExperimentalCoroutinesApi
9188
public suspend fun <T> Flow<T>.first(): T {
9289
var result: Any? = NULL
9390
try {
@@ -107,7 +104,6 @@ public suspend fun <T> Flow<T>.first(): T {
107104
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
108105
* Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate].
109106
*/
110-
@ExperimentalCoroutinesApi
111107
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
112108
var result: Any? = NULL
113109
try {

0 commit comments

Comments
 (0)