From 71fa2111244050214db01a5263e9b1bfa052916a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 17 Jul 2019 18:33:01 +0300 Subject: [PATCH 1/3] Promote the bare minimum Flow API to stable for incoming 1.3.0-RC * Extract SafeFlow for nicer stacktraces --- .../common/src/flow/Builders.kt | 23 +++++-------------- .../common/src/flow/Flow.kt | 2 -- .../common/src/flow/FlowCollector.kt | 3 --- .../common/src/flow/internal/NopCollector.kt | 2 -- .../common/src/flow/operators/Context.kt | 1 - .../common/src/flow/operators/Transform.kt | 7 ------ .../common/src/flow/terminal/Collect.kt | 2 -- .../common/src/flow/terminal/Collection.kt | 4 ---- .../common/src/flow/terminal/Count.kt | 2 +- .../common/src/flow/terminal/Reduce.kt | 4 ---- 10 files changed, 7 insertions(+), 43 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 5c01d00973..3578ec9dc0 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -44,12 +44,12 @@ import kotlin.jvm.* * ``` * If you want to switch the context of execution of a flow, use the [flowOn] operator. */ -@ExperimentalCoroutinesApi -public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow { - return object : Flow { - override suspend fun collect(collector: FlowCollector) { - SafeCollector(collector, coroutineContext).block() - } +public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block) + +// Named anonymous object +private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : Flow { + override suspend fun collect(collector: FlowCollector) { + SafeCollector(collector, coroutineContext).block() } } @@ -90,7 +90,6 @@ public fun (suspend () -> T).asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given iterable. */ -@ExperimentalCoroutinesApi public fun Iterable.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) @@ -100,7 +99,6 @@ public fun Iterable.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given iterable. */ -@ExperimentalCoroutinesApi public fun Iterator.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) @@ -110,7 +108,6 @@ public fun Iterator.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given sequence. */ -@ExperimentalCoroutinesApi public fun Sequence.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) @@ -120,7 +117,6 @@ public fun Sequence.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given array of elements. */ -@ExperimentalCoroutinesApi public fun flowOf(vararg elements: T): Flow = unsafeFlow { for (element in elements) { emit(element) @@ -130,7 +126,6 @@ public fun flowOf(vararg elements: T): Flow = unsafeFlow { /** * Creates flow that produces a given [value]. */ -@ExperimentalCoroutinesApi public fun flowOf(value: T): Flow = unsafeFlow { /* * Implementation note: this is just an "optimized" overload of flowOf(vararg) @@ -142,7 +137,6 @@ public fun flowOf(value: T): Flow = unsafeFlow { /** * Returns an empty flow. */ -@ExperimentalCoroutinesApi public fun emptyFlow(): Flow = EmptyFlow private object EmptyFlow : Flow { @@ -152,7 +146,6 @@ private object EmptyFlow : Flow { /** * Creates a flow that produces values from the given array. */ -@ExperimentalCoroutinesApi public fun Array.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) @@ -162,7 +155,6 @@ public fun Array.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given array. */ -@ExperimentalCoroutinesApi public fun IntArray.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) @@ -172,7 +164,6 @@ public fun IntArray.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given array. */ -@ExperimentalCoroutinesApi public fun LongArray.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) @@ -182,7 +173,6 @@ public fun LongArray.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given range. */ -@ExperimentalCoroutinesApi public fun IntRange.asFlow(): Flow = unsafeFlow { forEach { value -> emit(value) @@ -192,7 +182,6 @@ public fun IntRange.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given range. */ -@ExperimentalCoroutinesApi public fun LongRange.asFlow(): Flow = flow { forEach { value -> emit(value) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index b81030582c..cd5020f940 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -157,7 +157,6 @@ import kotlin.coroutines.* * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with * reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module. */ -@ExperimentalCoroutinesApi public interface Flow { /** * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. @@ -195,7 +194,6 @@ public interface Flow { * } * ``` */ -@FlowPreview public abstract class AbstractFlow : Flow { @InternalCoroutinesApi diff --git a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt index bb0d5b5d6a..7254c6d7bb 100644 --- a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt @@ -4,8 +4,6 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.* - /** * [FlowCollector] is used as an intermediate or a terminal collector of the flow and represents * an entity that accepts values emitted by the [Flow]. @@ -13,7 +11,6 @@ import kotlinx.coroutines.* * This interface should usually not be implemented directly, but rather used as a receiver in a [flow] builder when implementing a custom operator. * Implementations of this interface are not thread-safe. */ -@ExperimentalCoroutinesApi public interface FlowCollector { /** diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt index 297d4d1480..f83f31348f 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt @@ -4,8 +4,6 @@ package kotlinx.coroutines.flow.internal -import kotlinx.coroutines.flow.* - internal object NopCollector : ConcurrentFlowCollector { override suspend fun emit(value: Any?) { // does nothing diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index c9aa555df8..8f3325c508 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -265,4 +265,3 @@ private fun checkFlowContext(context: CoroutineContext) { "Flow context cannot contain job in it. Had $context" } } - diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index e19312bd3f..f0949742ba 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -17,7 +17,6 @@ import kotlinx.coroutines.flow.unsafeTransform as transform /** * Returns a flow containing only values of the original flow that matches the given [predicate]. */ -@ExperimentalCoroutinesApi public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value -> if (predicate(value)) return@transform emit(value) } @@ -25,7 +24,6 @@ public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boole /** * Returns a flow containing only values of the original flow that do not match the given [predicate]. */ -@ExperimentalCoroutinesApi public inline fun Flow.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value -> if (!predicate(value)) return@transform emit(value) } @@ -33,14 +31,12 @@ public inline fun Flow.filterNot(crossinline predicate: suspend (T) -> Bo /** * Returns a flow containing only values that are instances of specified type [R]. */ -@ExperimentalCoroutinesApi @Suppress("UNCHECKED_CAST") public inline fun Flow<*>.filterIsInstance(): Flow = filter { it is R } as Flow /** * Returns a flow containing only values of the original flow that are not null. */ -@ExperimentalCoroutinesApi public fun Flow.filterNotNull(): Flow = transform { value -> if (value != null) return@transform emit(value) } @@ -48,7 +44,6 @@ public fun Flow.filterNotNull(): Flow = transform { value /** * Returns a flow containing the results of applying the given [transform] function to each value of the original flow. */ -@ExperimentalCoroutinesApi public inline fun Flow.map(crossinline transform: suspend (value: T) -> R): Flow = transform { value -> return@transform emit(transform(value)) } @@ -56,7 +51,6 @@ public inline fun Flow.map(crossinline transform: suspend (value: T) - /** * Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow. */ -@ExperimentalCoroutinesApi public inline fun Flow.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow = transform { value -> val transformed = transform(value) ?: return@transform return@transform emit(transformed) @@ -76,7 +70,6 @@ public fun Flow.withIndex(): Flow> = flow { /** * Returns a flow which performs the given [action] on each value of the original flow. */ -@ExperimentalCoroutinesApi public fun Flow.onEach(action: suspend (T) -> Unit): Flow = transform { value -> action(value) return@transform emit(value) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index eca4e68dd5..42ac800365 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -27,7 +27,6 @@ import kotlin.jvm.* * .collect() // trigger collection of the flow * ``` */ -@ExperimentalCoroutinesApi // tentatively stable in 1.3.0 public suspend fun Flow<*>.collect() = collect(NopCollector) /** @@ -69,7 +68,6 @@ public fun Flow.launchIn(scope: CoroutineScope): Job = scope.launch { * } * ``` */ -@ExperimentalCoroutinesApi public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt index 836ea7e9a2..e07be61688 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt @@ -7,25 +7,21 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.* import kotlin.jvm.* /** * Collects given flow into a [destination] */ -@ExperimentalCoroutinesApi public suspend fun Flow.toList(destination: MutableList = ArrayList()): List = toCollection(destination) /** * Collects given flow into a [destination] */ -@ExperimentalCoroutinesApi public suspend fun Flow.toSet(destination: MutableSet = LinkedHashSet()): Set = toCollection(destination) /** * Collects given flow into a [destination] */ -@ExperimentalCoroutinesApi public suspend fun > Flow.toCollection(destination: C): C { collect { value -> destination.add(value) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt index 1d737002d0..d57dfdefc2 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt @@ -27,7 +27,7 @@ public suspend fun Flow.count(): Int { * Returns the number of elements matching the given predicate. */ @ExperimentalCoroutinesApi -public suspend fun Flow.count(predicate: suspend (T) -> Boolean): Int { +public suspend fun Flow.count(predicate: suspend (T) -> Boolean): Int { var i = 0 collect { value -> if (predicate(value)) { diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 8db762e12a..a15c2fce67 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -55,7 +55,6 @@ public suspend inline fun Flow.fold( * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ -@ExperimentalCoroutinesApi public suspend fun Flow.single(): T { var result: Any? = NULL collect { value -> @@ -72,7 +71,6 @@ public suspend fun Flow.single(): T { * The terminal operator, that awaits for one and only one value to be published. * Throws [IllegalStateException] for flow that contains more than one element. */ -@ExperimentalCoroutinesApi public suspend fun Flow.singleOrNull(): T? { var result: T? = null collect { value -> @@ -87,7 +85,6 @@ public suspend fun Flow.singleOrNull(): T? { * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. * Throws [NoSuchElementException] if the flow was empty. */ -@ExperimentalCoroutinesApi public suspend fun Flow.first(): T { var result: Any? = NULL try { @@ -107,7 +104,6 @@ public suspend fun Flow.first(): T { * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate]. */ -@ExperimentalCoroutinesApi public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { var result: Any? = NULL try { From d59ac07770d00f269b67aee4074697390297d885 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 17 Jul 2019 18:34:47 +0300 Subject: [PATCH 2/3] Demote switchMap and combineLatest to preview features as we may want to rework in #1262 and #1335 --- .../common/src/flow/operators/Merge.kt | 2 +- .../common/src/flow/operators/Zip.kt | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index b05d5a8338..e0a3ed25a5 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -125,7 +125,7 @@ public fun Flow>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY * ``` * produces `aa bb b_last` */ -@ExperimentalCoroutinesApi +@FlowPreview public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = scopedFlow { downstream -> var previousFlow: Job? = null collect { value -> diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index e9d99d321a..43530a0deb 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -28,7 +28,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow * } * ``` */ -@ExperimentalCoroutinesApi +@FlowPreview public fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow { coroutineScope { val firstChannel = asFairChannel(this@combineLatest) @@ -80,7 +80,7 @@ public fun Flow.combineLatest(other: Flow, transform: suspen * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest( other: Flow, other2: Flow, @@ -97,7 +97,7 @@ public inline fun Flow.combineLatest( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest( other: Flow, other2: Flow, @@ -116,7 +116,7 @@ public inline fun Flow.combineLatest( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest( other: Flow, other2: Flow, @@ -137,7 +137,7 @@ public inline fun Flow.combineLatest( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest(vararg others: Flow, crossinline transform: suspend (Array) -> R): Flow = combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) }) From 1ecd044fd1d3924c43978aa39fb30ae2afcbed00 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 19 Jul 2019 15:38:21 +0300 Subject: [PATCH 3/3] Make unsafeFlow less explicit, return preview status to AbstractFlow --- .../kotlinx-coroutines-core.txt | 6 ++-- .../common/src/flow/Builders.kt | 36 +++++++------------ .../common/src/flow/Channels.kt | 2 +- .../common/src/flow/Flow.kt | 1 + .../common/src/flow/internal/FlowCoroutine.kt | 2 +- .../common/src/flow/internal/SafeCollector.kt | 14 +++++++- .../common/src/flow/operators/Delay.kt | 2 +- .../common/src/flow/operators/Distinct.kt | 2 +- .../common/src/flow/operators/Errors.kt | 2 +- .../common/src/flow/operators/Limit.kt | 2 +- .../common/src/flow/operators/Merge.kt | 2 +- .../common/src/flow/operators/Transform.kt | 2 +- .../common/src/flow/operators/Zip.kt | 2 +- .../common/src/flow/terminal/Reduce.kt | 2 +- .../jvm/test/flow/FlatMapStressTest.kt | 1 + 15 files changed, 39 insertions(+), 39 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index e4dca6662e..3e20e88bba 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -946,7 +946,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun toSet (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; - public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; @@ -972,9 +971,8 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt { public static final fun checkIndexOverflow (I)I } -public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector { - public fun (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V - public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +public final class kotlinx/coroutines/flow/internal/SafeCollectorKt { + public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; } public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector { diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 3578ec9dc0..1405507fa0 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.Channel.Factory.BUFFERED import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.flow.internal.unsafeFlow as flow import kotlin.coroutines.* import kotlin.jvm.* @@ -53,24 +54,11 @@ private class SafeFlow(private val block: suspend FlowCollector.() -> Unit } } -/** - * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. - * Used in our own operators where we trust the context of invocations. - */ -@PublishedApi -internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { - return object : Flow { - override suspend fun collect(collector: FlowCollector) { - collector.block() - } - } -} - /** * Creates a flow that produces a single value from the given functional type. */ @FlowPreview -public fun (() -> T).asFlow(): Flow = unsafeFlow { +public fun (() -> T).asFlow(): Flow = flow { emit(invoke()) } @@ -83,14 +71,14 @@ public fun (() -> T).asFlow(): Flow = unsafeFlow { * ``` */ @FlowPreview -public fun (suspend () -> T).asFlow(): Flow = unsafeFlow { +public fun (suspend () -> T).asFlow(): Flow = flow { emit(invoke()) } /** * Creates a flow that produces values from the given iterable. */ -public fun Iterable.asFlow(): Flow = unsafeFlow { +public fun Iterable.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -99,7 +87,7 @@ public fun Iterable.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given iterable. */ -public fun Iterator.asFlow(): Flow = unsafeFlow { +public fun Iterator.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -108,7 +96,7 @@ public fun Iterator.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given sequence. */ -public fun Sequence.asFlow(): Flow = unsafeFlow { +public fun Sequence.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -117,7 +105,7 @@ public fun Sequence.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given array of elements. */ -public fun flowOf(vararg elements: T): Flow = unsafeFlow { +public fun flowOf(vararg elements: T): Flow = flow { for (element in elements) { emit(element) } @@ -126,7 +114,7 @@ public fun flowOf(vararg elements: T): Flow = unsafeFlow { /** * Creates flow that produces a given [value]. */ -public fun flowOf(value: T): Flow = unsafeFlow { +public fun flowOf(value: T): Flow = flow { /* * Implementation note: this is just an "optimized" overload of flowOf(vararg) * which significantly reduce the footprint of widespread single-value flows. @@ -146,7 +134,7 @@ private object EmptyFlow : Flow { /** * Creates a flow that produces values from the given array. */ -public fun Array.asFlow(): Flow = unsafeFlow { +public fun Array.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -155,7 +143,7 @@ public fun Array.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given array. */ -public fun IntArray.asFlow(): Flow = unsafeFlow { +public fun IntArray.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -164,7 +152,7 @@ public fun IntArray.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given array. */ -public fun LongArray.asFlow(): Flow = unsafeFlow { +public fun LongArray.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -173,7 +161,7 @@ public fun LongArray.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given range. */ -public fun IntRange.asFlow(): Flow = unsafeFlow { +public fun IntRange.asFlow(): Flow = flow { forEach { value -> emit(value) } diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index 05afaacf64..a554a4addf 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -13,7 +13,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlin.coroutines.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Emits all elements from the given [channel] to this flow collector and [cancels][cancel] (consumes) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index cd5020f940..bda326f85d 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -194,6 +194,7 @@ public interface Flow { * } * ``` */ +@FlowPreview public abstract class AbstractFlow : Flow { @InternalCoroutinesApi diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt index 258869f135..f0b5b391fa 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Creates a [CoroutineScope] and calls the specified suspend block with this scope. diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt index 30f5484cad..66a6060ad6 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* -@PublishedApi internal class SafeCollector( private val collector: FlowCollector, private val collectContext: CoroutineContext @@ -99,3 +98,16 @@ internal class SafeCollector( return parent.transitiveCoroutineParent(collectJob) } } + +/** + * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. + * Used in our own operators where we trust the context of invocations. + */ +@PublishedApi +internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { + return object : Flow { + override suspend fun collect(collector: FlowCollector) { + collector.block() + } + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 2f01061d36..8d74be5584 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Delays the emission of values from this flow for the given [timeMillis]. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt b/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt index a9a0d63eed..89491f4166 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Returns flow where all subsequent repetitions of the same value are filtered out. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index c744842f58..9b7a91f155 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Catches exceptions in the flow completion and calls a specified [action] with diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt index a1e78fa550..7f638f9814 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Returns a flow that ignores first [count] elements. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index e0a3ed25a5..e593d0355f 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -16,7 +16,7 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Name of the property that defines the value of [DEFAULT_CONCURRENCY]. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index f0949742ba..37ba0d39c3 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -11,7 +11,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow import kotlinx.coroutines.flow.unsafeTransform as transform /** diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index 43530a0deb..72822bbe4c 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -13,7 +13,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Returns a [Flow] whose values are generated with [transform] function by combining diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index a15c2fce67..875e6b6634 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow import kotlin.jvm.* /** diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt index 9092a18083..699d9c6473 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* +import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.scheduling.* import org.junit.Assume.* import org.junit.Test