Skip to content

Commit dc4a474

Browse files
authored
Stabilize core flow (#1352)
* Promote the bare minimum Flow API to stable for incoming 1.3.0-RC * Extract SafeFlow for nicer stacktraces * Demote switchMap and combineLatest to preview features as we may want to rework in #1262 and #1335 * Make unsafeFlow less explicit, return preview status to AbstractFlow
1 parent 97863c3 commit dc4a474

File tree

21 files changed

+50
-86
lines changed

21 files changed

+50
-86
lines changed

binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+2-4
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
946946
public static final fun toSet (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
947947
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
948948
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
949-
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
950949
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
951950
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
952951
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
@@ -972,9 +971,8 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
972971
public static final fun checkIndexOverflow (I)I
973972
}
974973

975-
public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
976-
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
977-
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
974+
public final class kotlinx/coroutines/flow/internal/SafeCollectorKt {
975+
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
978976
}
979977

980978
public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector {

kotlinx-coroutines-core/common/src/flow/Builders.kt

+17-40
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlinx.coroutines.*
1111
import kotlinx.coroutines.channels.*
1212
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
1313
import kotlinx.coroutines.flow.internal.*
14+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1415
import kotlin.coroutines.*
1516
import kotlin.jvm.*
1617

@@ -44,33 +45,20 @@ import kotlin.jvm.*
4445
* ```
4546
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
4647
*/
47-
@ExperimentalCoroutinesApi
48-
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
49-
return object : Flow<T> {
50-
override suspend fun collect(collector: FlowCollector<T>) {
51-
SafeCollector(collector, coroutineContext).block()
52-
}
53-
}
54-
}
48+
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
5549

56-
/**
57-
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
58-
* Used in our own operators where we trust the context of invocations.
59-
*/
60-
@PublishedApi
61-
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
62-
return object : Flow<T> {
63-
override suspend fun collect(collector: FlowCollector<T>) {
64-
collector.block()
65-
}
50+
// Named anonymous object
51+
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
52+
override suspend fun collect(collector: FlowCollector<T>) {
53+
SafeCollector(collector, coroutineContext).block()
6654
}
6755
}
6856

6957
/**
7058
* Creates a flow that produces a single value from the given functional type.
7159
*/
7260
@FlowPreview
73-
public fun <T> (() -> T).asFlow(): Flow<T> = unsafeFlow {
61+
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
7462
emit(invoke())
7563
}
7664

@@ -83,15 +71,14 @@ public fun <T> (() -> T).asFlow(): Flow<T> = unsafeFlow {
8371
* ```
8472
*/
8573
@FlowPreview
86-
public fun <T> (suspend () -> T).asFlow(): Flow<T> = unsafeFlow {
74+
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
8775
emit(invoke())
8876
}
8977

9078
/**
9179
* Creates a flow that produces values from the given iterable.
9280
*/
93-
@ExperimentalCoroutinesApi
94-
public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
81+
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
9582
forEach { value ->
9683
emit(value)
9784
}
@@ -100,8 +87,7 @@ public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
10087
/**
10188
* Creates a flow that produces values from the given iterable.
10289
*/
103-
@ExperimentalCoroutinesApi
104-
public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
90+
public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
10591
forEach { value ->
10692
emit(value)
10793
}
@@ -110,8 +96,7 @@ public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
11096
/**
11197
* Creates a flow that produces values from the given sequence.
11298
*/
113-
@ExperimentalCoroutinesApi
114-
public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
99+
public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
115100
forEach { value ->
116101
emit(value)
117102
}
@@ -120,8 +105,7 @@ public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
120105
/**
121106
* Creates a flow that produces values from the given array of elements.
122107
*/
123-
@ExperimentalCoroutinesApi
124-
public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
108+
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
125109
for (element in elements) {
126110
emit(element)
127111
}
@@ -130,8 +114,7 @@ public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
130114
/**
131115
* Creates flow that produces a given [value].
132116
*/
133-
@ExperimentalCoroutinesApi
134-
public fun <T> flowOf(value: T): Flow<T> = unsafeFlow {
117+
public fun <T> flowOf(value: T): Flow<T> = flow {
135118
/*
136119
* Implementation note: this is just an "optimized" overload of flowOf(vararg)
137120
* which significantly reduce the footprint of widespread single-value flows.
@@ -142,7 +125,6 @@ public fun <T> flowOf(value: T): Flow<T> = unsafeFlow {
142125
/**
143126
* Returns an empty flow.
144127
*/
145-
@ExperimentalCoroutinesApi
146128
public fun <T> emptyFlow(): Flow<T> = EmptyFlow
147129

148130
private object EmptyFlow : Flow<Nothing> {
@@ -152,8 +134,7 @@ private object EmptyFlow : Flow<Nothing> {
152134
/**
153135
* Creates a flow that produces values from the given array.
154136
*/
155-
@ExperimentalCoroutinesApi
156-
public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
137+
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
157138
forEach { value ->
158139
emit(value)
159140
}
@@ -162,8 +143,7 @@ public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
162143
/**
163144
* Creates flow that produces values from the given array.
164145
*/
165-
@ExperimentalCoroutinesApi
166-
public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
146+
public fun IntArray.asFlow(): Flow<Int> = flow {
167147
forEach { value ->
168148
emit(value)
169149
}
@@ -172,8 +152,7 @@ public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
172152
/**
173153
* Creates flow that produces values from the given array.
174154
*/
175-
@ExperimentalCoroutinesApi
176-
public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
155+
public fun LongArray.asFlow(): Flow<Long> = flow {
177156
forEach { value ->
178157
emit(value)
179158
}
@@ -182,8 +161,7 @@ public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
182161
/**
183162
* Creates flow that produces values from the given range.
184163
*/
185-
@ExperimentalCoroutinesApi
186-
public fun IntRange.asFlow(): Flow<Int> = unsafeFlow {
164+
public fun IntRange.asFlow(): Flow<Int> = flow {
187165
forEach { value ->
188166
emit(value)
189167
}
@@ -192,7 +170,6 @@ public fun IntRange.asFlow(): Flow<Int> = unsafeFlow {
192170
/**
193171
* Creates flow that produces values from the given range.
194172
*/
195-
@ExperimentalCoroutinesApi
196173
public fun LongRange.asFlow(): Flow<Long> = flow {
197174
forEach { value ->
198175
emit(value)

kotlinx-coroutines-core/common/src/flow/Channels.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import kotlinx.coroutines.channels.*
1313
import kotlinx.coroutines.flow.internal.*
1414
import kotlin.coroutines.*
1515
import kotlin.jvm.*
16-
import kotlinx.coroutines.flow.unsafeFlow as flow
16+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1717

1818
/**
1919
* Emits all elements from the given [channel] to this flow collector and [cancels][cancel] (consumes)

kotlinx-coroutines-core/common/src/flow/Flow.kt

-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ import kotlin.coroutines.*
157157
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
158158
* reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
159159
*/
160-
@ExperimentalCoroutinesApi
161160
public interface Flow<out T> {
162161
/**
163162
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.

kotlinx-coroutines-core/common/src/flow/FlowCollector.kt

-3
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44

55
package kotlinx.coroutines.flow
66

7-
import kotlinx.coroutines.*
8-
97
/**
108
* [FlowCollector] is used as an intermediate or a terminal collector of the flow and represents
119
* an entity that accepts values emitted by the [Flow].
1210
*
1311
* This interface should usually not be implemented directly, but rather used as a receiver in a [flow] builder when implementing a custom operator.
1412
* Implementations of this interface are not thread-safe.
1513
*/
16-
@ExperimentalCoroutinesApi
1714
public interface FlowCollector<in T> {
1815

1916
/**

kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import kotlinx.coroutines.internal.*
1111
import kotlinx.coroutines.intrinsics.*
1212
import kotlin.coroutines.*
1313
import kotlin.coroutines.intrinsics.*
14-
import kotlinx.coroutines.flow.unsafeFlow as flow
14+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1515

1616
/**
1717
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.

kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt

-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
package kotlinx.coroutines.flow.internal
66

7-
import kotlinx.coroutines.flow.*
8-
97
internal object NopCollector : ConcurrentFlowCollector<Any?> {
108
override suspend fun emit(value: Any?) {
119
// does nothing

kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

+13-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.*
99
import kotlinx.coroutines.internal.*
1010
import kotlin.coroutines.*
1111

12-
@PublishedApi
1312
internal class SafeCollector<T>(
1413
private val collector: FlowCollector<T>,
1514
private val collectContext: CoroutineContext
@@ -99,3 +98,16 @@ internal class SafeCollector<T>(
9998
return parent.transitiveCoroutineParent(collectJob)
10099
}
101100
}
101+
102+
/**
103+
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
104+
* Used in our own operators where we trust the context of invocations.
105+
*/
106+
@PublishedApi
107+
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
108+
return object : Flow<T> {
109+
override suspend fun collect(collector: FlowCollector<T>) {
110+
collector.block()
111+
}
112+
}
113+
}

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

-1
Original file line numberDiff line numberDiff line change
@@ -265,4 +265,3 @@ private fun checkFlowContext(context: CoroutineContext) {
265265
"Flow context cannot contain job in it. Had $context"
266266
}
267267
}
268-

kotlinx-coroutines-core/common/src/flow/operators/Delay.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import kotlinx.coroutines.channels.*
1212
import kotlinx.coroutines.flow.internal.*
1313
import kotlinx.coroutines.selects.*
1414
import kotlin.jvm.*
15-
import kotlinx.coroutines.flow.unsafeFlow as flow
15+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1616

1717
/**
1818
* Delays the emission of values from this flow for the given [timeMillis].

kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ package kotlinx.coroutines.flow
1010
import kotlinx.coroutines.*
1111
import kotlinx.coroutines.flow.internal.*
1212
import kotlin.jvm.*
13-
import kotlinx.coroutines.flow.unsafeFlow as flow
13+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1414

1515
/**
1616
* Returns flow where all subsequent repetitions of the same value are filtered out.

kotlinx-coroutines-core/common/src/flow/operators/Errors.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import kotlinx.coroutines.*
1111
import kotlinx.coroutines.internal.*
1212
import kotlin.coroutines.*
1313
import kotlin.jvm.*
14-
import kotlinx.coroutines.flow.unsafeFlow as flow
14+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1515

1616
/**
1717
* Catches exceptions in the flow completion and calls a specified [action] with

kotlinx-coroutines-core/common/src/flow/operators/Limit.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ package kotlinx.coroutines.flow
1010
import kotlinx.coroutines.*
1111
import kotlinx.coroutines.flow.internal.*
1212
import kotlin.jvm.*
13-
import kotlinx.coroutines.flow.unsafeFlow as flow
13+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1414

1515
/**
1616
* Returns a flow that ignores first [count] elements.

kotlinx-coroutines-core/common/src/flow/operators/Merge.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import kotlinx.coroutines.internal.*
1616
import kotlinx.coroutines.sync.*
1717
import kotlin.coroutines.*
1818
import kotlin.jvm.*
19-
import kotlinx.coroutines.flow.unsafeFlow as flow
19+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
2020

2121
/**
2222
* Name of the property that defines the value of [DEFAULT_CONCURRENCY].
@@ -125,7 +125,7 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
125125
* ```
126126
* produces `aa bb b_last`
127127
*/
128-
@ExperimentalCoroutinesApi
128+
@FlowPreview
129129
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
130130
var previousFlow: Job? = null
131131
collect { value ->

kotlinx-coroutines-core/common/src/flow/operators/Transform.kt

+1-8
Original file line numberDiff line numberDiff line change
@@ -11,52 +11,46 @@ package kotlinx.coroutines.flow
1111
import kotlinx.coroutines.*
1212
import kotlinx.coroutines.flow.internal.*
1313
import kotlin.jvm.*
14-
import kotlinx.coroutines.flow.unsafeFlow as flow
14+
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1515
import kotlinx.coroutines.flow.unsafeTransform as transform
1616

1717
/**
1818
* Returns a flow containing only values of the original flow that matches the given [predicate].
1919
*/
20-
@ExperimentalCoroutinesApi
2120
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
2221
if (predicate(value)) return@transform emit(value)
2322
}
2423

2524
/**
2625
* Returns a flow containing only values of the original flow that do not match the given [predicate].
2726
*/
28-
@ExperimentalCoroutinesApi
2927
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
3028
if (!predicate(value)) return@transform emit(value)
3129
}
3230

3331
/**
3432
* Returns a flow containing only values that are instances of specified type [R].
3533
*/
36-
@ExperimentalCoroutinesApi
3734
@Suppress("UNCHECKED_CAST")
3835
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
3936

4037
/**
4138
* Returns a flow containing only values of the original flow that are not null.
4239
*/
43-
@ExperimentalCoroutinesApi
4440
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
4541
if (value != null) return@transform emit(value)
4642
}
4743

4844
/**
4945
* Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
5046
*/
51-
@ExperimentalCoroutinesApi
5247
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
5348
return@transform emit(transform(value))
5449
}
5550

5651
/**
5752
* Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
5853
*/
59-
@ExperimentalCoroutinesApi
6054
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
6155
val transformed = transform(value) ?: return@transform
6256
return@transform emit(transformed)
@@ -76,7 +70,6 @@ public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
7670
/**
7771
* Returns a flow which performs the given [action] on each value of the original flow.
7872
*/
79-
@ExperimentalCoroutinesApi
8073
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
8174
action(value)
8275
return@transform emit(value)

0 commit comments

Comments
 (0)