diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index e4dca6662e..3e20e88bba 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -946,7 +946,6 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun toSet (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; - public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; @@ -972,9 +971,8 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt { public static final fun checkIndexOverflow (I)I } -public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector { - public fun (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V - public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +public final class kotlinx/coroutines/flow/internal/SafeCollectorKt { + public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; } public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector { diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 5c01d00973..1405507fa0 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.Channel.Factory.BUFFERED import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.flow.internal.unsafeFlow as flow import kotlin.coroutines.* import kotlin.jvm.* @@ -44,25 +45,12 @@ import kotlin.jvm.* * ``` * If you want to switch the context of execution of a flow, use the [flowOn] operator. */ -@ExperimentalCoroutinesApi -public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow { - return object : Flow { - override suspend fun collect(collector: FlowCollector) { - SafeCollector(collector, coroutineContext).block() - } - } -} +public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block) -/** - * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. - * Used in our own operators where we trust the context of invocations. - */ -@PublishedApi -internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { - return object : Flow { - override suspend fun collect(collector: FlowCollector) { - collector.block() - } +// Named anonymous object +private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : Flow { + override suspend fun collect(collector: FlowCollector) { + SafeCollector(collector, coroutineContext).block() } } @@ -70,7 +58,7 @@ internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend * Creates a flow that produces a single value from the given functional type. */ @FlowPreview -public fun (() -> T).asFlow(): Flow = unsafeFlow { +public fun (() -> T).asFlow(): Flow = flow { emit(invoke()) } @@ -83,15 +71,14 @@ public fun (() -> T).asFlow(): Flow = unsafeFlow { * ``` */ @FlowPreview -public fun (suspend () -> T).asFlow(): Flow = unsafeFlow { +public fun (suspend () -> T).asFlow(): Flow = flow { emit(invoke()) } /** * Creates a flow that produces values from the given iterable. */ -@ExperimentalCoroutinesApi -public fun Iterable.asFlow(): Flow = unsafeFlow { +public fun Iterable.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -100,8 +87,7 @@ public fun Iterable.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given iterable. */ -@ExperimentalCoroutinesApi -public fun Iterator.asFlow(): Flow = unsafeFlow { +public fun Iterator.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -110,8 +96,7 @@ public fun Iterator.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given sequence. */ -@ExperimentalCoroutinesApi -public fun Sequence.asFlow(): Flow = unsafeFlow { +public fun Sequence.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -120,8 +105,7 @@ public fun Sequence.asFlow(): Flow = unsafeFlow { /** * Creates a flow that produces values from the given array of elements. */ -@ExperimentalCoroutinesApi -public fun flowOf(vararg elements: T): Flow = unsafeFlow { +public fun flowOf(vararg elements: T): Flow = flow { for (element in elements) { emit(element) } @@ -130,8 +114,7 @@ public fun flowOf(vararg elements: T): Flow = unsafeFlow { /** * Creates flow that produces a given [value]. */ -@ExperimentalCoroutinesApi -public fun flowOf(value: T): Flow = unsafeFlow { +public fun flowOf(value: T): Flow = flow { /* * Implementation note: this is just an "optimized" overload of flowOf(vararg) * which significantly reduce the footprint of widespread single-value flows. @@ -142,7 +125,6 @@ public fun flowOf(value: T): Flow = unsafeFlow { /** * Returns an empty flow. */ -@ExperimentalCoroutinesApi public fun emptyFlow(): Flow = EmptyFlow private object EmptyFlow : Flow { @@ -152,8 +134,7 @@ private object EmptyFlow : Flow { /** * Creates a flow that produces values from the given array. */ -@ExperimentalCoroutinesApi -public fun Array.asFlow(): Flow = unsafeFlow { +public fun Array.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -162,8 +143,7 @@ public fun Array.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given array. */ -@ExperimentalCoroutinesApi -public fun IntArray.asFlow(): Flow = unsafeFlow { +public fun IntArray.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -172,8 +152,7 @@ public fun IntArray.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given array. */ -@ExperimentalCoroutinesApi -public fun LongArray.asFlow(): Flow = unsafeFlow { +public fun LongArray.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -182,8 +161,7 @@ public fun LongArray.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given range. */ -@ExperimentalCoroutinesApi -public fun IntRange.asFlow(): Flow = unsafeFlow { +public fun IntRange.asFlow(): Flow = flow { forEach { value -> emit(value) } @@ -192,7 +170,6 @@ public fun IntRange.asFlow(): Flow = unsafeFlow { /** * Creates flow that produces values from the given range. */ -@ExperimentalCoroutinesApi public fun LongRange.asFlow(): Flow = flow { forEach { value -> emit(value) diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index 05afaacf64..a554a4addf 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -13,7 +13,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlin.coroutines.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Emits all elements from the given [channel] to this flow collector and [cancels][cancel] (consumes) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index b81030582c..bda326f85d 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -157,7 +157,6 @@ import kotlin.coroutines.* * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with * reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module. */ -@ExperimentalCoroutinesApi public interface Flow { /** * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. diff --git a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt index bb0d5b5d6a..7254c6d7bb 100644 --- a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt @@ -4,8 +4,6 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.* - /** * [FlowCollector] is used as an intermediate or a terminal collector of the flow and represents * an entity that accepts values emitted by the [Flow]. @@ -13,7 +11,6 @@ import kotlinx.coroutines.* * This interface should usually not be implemented directly, but rather used as a receiver in a [flow] builder when implementing a custom operator. * Implementations of this interface are not thread-safe. */ -@ExperimentalCoroutinesApi public interface FlowCollector { /** diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt index 258869f135..f0b5b391fa 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Creates a [CoroutineScope] and calls the specified suspend block with this scope. diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt index 297d4d1480..f83f31348f 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt @@ -4,8 +4,6 @@ package kotlinx.coroutines.flow.internal -import kotlinx.coroutines.flow.* - internal object NopCollector : ConcurrentFlowCollector { override suspend fun emit(value: Any?) { // does nothing diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt index 30f5484cad..66a6060ad6 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt @@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* -@PublishedApi internal class SafeCollector( private val collector: FlowCollector, private val collectContext: CoroutineContext @@ -99,3 +98,16 @@ internal class SafeCollector( return parent.transitiveCoroutineParent(collectJob) } } + +/** + * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. + * Used in our own operators where we trust the context of invocations. + */ +@PublishedApi +internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { + return object : Flow { + override suspend fun collect(collector: FlowCollector) { + collector.block() + } + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index c9aa555df8..8f3325c508 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -265,4 +265,3 @@ private fun checkFlowContext(context: CoroutineContext) { "Flow context cannot contain job in it. Had $context" } } - diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 2f01061d36..8d74be5584 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Delays the emission of values from this flow for the given [timeMillis]. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt b/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt index a9a0d63eed..89491f4166 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Returns flow where all subsequent repetitions of the same value are filtered out. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index c744842f58..9b7a91f155 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -11,7 +11,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import kotlin.coroutines.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Catches exceptions in the flow completion and calls a specified [action] with diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt index a1e78fa550..7f638f9814 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Limit.kt @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Returns a flow that ignores first [count] elements. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index b05d5a8338..e593d0355f 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -16,7 +16,7 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.sync.* import kotlin.coroutines.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Name of the property that defines the value of [DEFAULT_CONCURRENCY]. @@ -125,7 +125,7 @@ public fun Flow>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY * ``` * produces `aa bb b_last` */ -@ExperimentalCoroutinesApi +@FlowPreview public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = scopedFlow { downstream -> var previousFlow: Job? = null collect { value -> diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt index e19312bd3f..37ba0d39c3 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Transform.kt @@ -11,13 +11,12 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow import kotlinx.coroutines.flow.unsafeTransform as transform /** * Returns a flow containing only values of the original flow that matches the given [predicate]. */ -@ExperimentalCoroutinesApi public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value -> if (predicate(value)) return@transform emit(value) } @@ -25,7 +24,6 @@ public inline fun Flow.filter(crossinline predicate: suspend (T) -> Boole /** * Returns a flow containing only values of the original flow that do not match the given [predicate]. */ -@ExperimentalCoroutinesApi public inline fun Flow.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow = transform { value -> if (!predicate(value)) return@transform emit(value) } @@ -33,14 +31,12 @@ public inline fun Flow.filterNot(crossinline predicate: suspend (T) -> Bo /** * Returns a flow containing only values that are instances of specified type [R]. */ -@ExperimentalCoroutinesApi @Suppress("UNCHECKED_CAST") public inline fun Flow<*>.filterIsInstance(): Flow = filter { it is R } as Flow /** * Returns a flow containing only values of the original flow that are not null. */ -@ExperimentalCoroutinesApi public fun Flow.filterNotNull(): Flow = transform { value -> if (value != null) return@transform emit(value) } @@ -48,7 +44,6 @@ public fun Flow.filterNotNull(): Flow = transform { value /** * Returns a flow containing the results of applying the given [transform] function to each value of the original flow. */ -@ExperimentalCoroutinesApi public inline fun Flow.map(crossinline transform: suspend (value: T) -> R): Flow = transform { value -> return@transform emit(transform(value)) } @@ -56,7 +51,6 @@ public inline fun Flow.map(crossinline transform: suspend (value: T) - /** * Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow. */ -@ExperimentalCoroutinesApi public inline fun Flow.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow = transform { value -> val transformed = transform(value) ?: return@transform return@transform emit(transformed) @@ -76,7 +70,6 @@ public fun Flow.withIndex(): Flow> = flow { /** * Returns a flow which performs the given [action] on each value of the original flow. */ -@ExperimentalCoroutinesApi public fun Flow.onEach(action: suspend (T) -> Unit): Flow = transform { value -> action(value) return@transform emit(value) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index e9d99d321a..72822bbe4c 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -13,7 +13,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Returns a [Flow] whose values are generated with [transform] function by combining @@ -28,7 +28,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow * } * ``` */ -@ExperimentalCoroutinesApi +@FlowPreview public fun Flow.combineLatest(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow { coroutineScope { val firstChannel = asFairChannel(this@combineLatest) @@ -80,7 +80,7 @@ public fun Flow.combineLatest(other: Flow, transform: suspen * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest( other: Flow, other2: Flow, @@ -97,7 +97,7 @@ public inline fun Flow.combineLatest( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest( other: Flow, other2: Flow, @@ -116,7 +116,7 @@ public inline fun Flow.combineLatest( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest( other: Flow, other2: Flow, @@ -137,7 +137,7 @@ public inline fun Flow.combineLatest( * Returns a [Flow] whose values are generated with [transform] function by combining * the most recently emitted values by each flow. */ -@ExperimentalCoroutinesApi +@FlowPreview public inline fun Flow.combineLatest(vararg others: Flow, crossinline transform: suspend (Array) -> R): Flow = combineLatest(*others, arrayFactory = { arrayOfNulls(others.size + 1) }, transform = { transform(it) }) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index eca4e68dd5..42ac800365 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -27,7 +27,6 @@ import kotlin.jvm.* * .collect() // trigger collection of the flow * ``` */ -@ExperimentalCoroutinesApi // tentatively stable in 1.3.0 public suspend fun Flow<*>.collect() = collect(NopCollector) /** @@ -69,7 +68,6 @@ public fun Flow.launchIn(scope: CoroutineScope): Job = scope.launch { * } * ``` */ -@ExperimentalCoroutinesApi public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt index 836ea7e9a2..e07be61688 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collection.kt @@ -7,25 +7,21 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.* import kotlin.jvm.* /** * Collects given flow into a [destination] */ -@ExperimentalCoroutinesApi public suspend fun Flow.toList(destination: MutableList = ArrayList()): List = toCollection(destination) /** * Collects given flow into a [destination] */ -@ExperimentalCoroutinesApi public suspend fun Flow.toSet(destination: MutableSet = LinkedHashSet()): Set = toCollection(destination) /** * Collects given flow into a [destination] */ -@ExperimentalCoroutinesApi public suspend fun > Flow.toCollection(destination: C): C { collect { value -> destination.add(value) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt index 1d737002d0..d57dfdefc2 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Count.kt @@ -27,7 +27,7 @@ public suspend fun Flow.count(): Int { * Returns the number of elements matching the given predicate. */ @ExperimentalCoroutinesApi -public suspend fun Flow.count(predicate: suspend (T) -> Boolean): Int { +public suspend fun Flow.count(predicate: suspend (T) -> Boolean): Int { var i = 0 collect { value -> if (predicate(value)) { diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 8db762e12a..875e6b6634 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* -import kotlinx.coroutines.flow.unsafeFlow as flow +import kotlinx.coroutines.flow.internal.unsafeFlow as flow import kotlin.jvm.* /** @@ -55,7 +55,6 @@ public suspend inline fun Flow.fold( * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ -@ExperimentalCoroutinesApi public suspend fun Flow.single(): T { var result: Any? = NULL collect { value -> @@ -72,7 +71,6 @@ public suspend fun Flow.single(): T { * The terminal operator, that awaits for one and only one value to be published. * Throws [IllegalStateException] for flow that contains more than one element. */ -@ExperimentalCoroutinesApi public suspend fun Flow.singleOrNull(): T? { var result: T? = null collect { value -> @@ -87,7 +85,6 @@ public suspend fun Flow.singleOrNull(): T? { * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. * Throws [NoSuchElementException] if the flow was empty. */ -@ExperimentalCoroutinesApi public suspend fun Flow.first(): T { var result: Any? = NULL try { @@ -107,7 +104,6 @@ public suspend fun Flow.first(): T { * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. * Throws [NoSuchElementException] if the flow has not contained elements matching the [predicate]. */ -@ExperimentalCoroutinesApi public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { var result: Any? = NULL try { diff --git a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt index 9092a18083..699d9c6473 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/FlatMapStressTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* +import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.scheduling.* import org.junit.Assume.* import org.junit.Test