Skip to content

Stabilize core flow #1352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun toSet (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -972,9 +971,8 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
public static final fun checkIndexOverflow (I)I
}

public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final class kotlinx/coroutines/flow/internal/SafeCollectorKt {
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
}

public final class kotlinx/coroutines/flow/internal/SendingCollector : kotlinx/coroutines/flow/internal/ConcurrentFlowCollector {
Expand Down
57 changes: 17 additions & 40 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
import kotlin.coroutines.*
import kotlin.jvm.*

Expand Down Expand Up @@ -44,33 +45,20 @@ import kotlin.jvm.*
* ```
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
*/
@ExperimentalCoroutinesApi
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
SafeCollector(collector, coroutineContext).block()
}
}
}
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

/**
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
* Used in our own operators where we trust the context of invocations.
*/
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
SafeCollector(collector, coroutineContext).block()
}
}

/**
* Creates a flow that produces a single value from the given functional type.
*/
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = unsafeFlow {
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}

Expand All @@ -83,15 +71,14 @@ public fun <T> (() -> T).asFlow(): Flow<T> = unsafeFlow {
* ```
*/
@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = unsafeFlow {
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}

/**
* Creates a flow that produces values from the given iterable.
*/
@ExperimentalCoroutinesApi
public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
Expand All @@ -100,8 +87,7 @@ public fun <T> Iterable<T>.asFlow(): Flow<T> = unsafeFlow {
/**
* Creates a flow that produces values from the given iterable.
*/
@ExperimentalCoroutinesApi
public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
Expand All @@ -110,8 +96,7 @@ public fun <T> Iterator<T>.asFlow(): Flow<T> = unsafeFlow {
/**
* Creates a flow that produces values from the given sequence.
*/
@ExperimentalCoroutinesApi
public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
Expand All @@ -120,8 +105,7 @@ public fun <T> Sequence<T>.asFlow(): Flow<T> = unsafeFlow {
/**
* Creates a flow that produces values from the given array of elements.
*/
@ExperimentalCoroutinesApi
public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
Expand All @@ -130,8 +114,7 @@ public fun <T> flowOf(vararg elements: T): Flow<T> = unsafeFlow {
/**
* Creates flow that produces a given [value].
*/
@ExperimentalCoroutinesApi
public fun <T> flowOf(value: T): Flow<T> = unsafeFlow {
public fun <T> flowOf(value: T): Flow<T> = flow {
/*
* Implementation note: this is just an "optimized" overload of flowOf(vararg)
* which significantly reduce the footprint of widespread single-value flows.
Expand All @@ -142,7 +125,6 @@ public fun <T> flowOf(value: T): Flow<T> = unsafeFlow {
/**
* Returns an empty flow.
*/
@ExperimentalCoroutinesApi
public fun <T> emptyFlow(): Flow<T> = EmptyFlow

private object EmptyFlow : Flow<Nothing> {
Expand All @@ -152,8 +134,7 @@ private object EmptyFlow : Flow<Nothing> {
/**
* Creates a flow that produces values from the given array.
*/
@ExperimentalCoroutinesApi
public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
public fun <T> Array<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
Expand All @@ -162,8 +143,7 @@ public fun <T> Array<T>.asFlow(): Flow<T> = unsafeFlow {
/**
* Creates flow that produces values from the given array.
*/
@ExperimentalCoroutinesApi
public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
public fun IntArray.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
Expand All @@ -172,8 +152,7 @@ public fun IntArray.asFlow(): Flow<Int> = unsafeFlow {
/**
* Creates flow that produces values from the given array.
*/
@ExperimentalCoroutinesApi
public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
public fun LongArray.asFlow(): Flow<Long> = flow {
forEach { value ->
emit(value)
}
Expand All @@ -182,8 +161,7 @@ public fun LongArray.asFlow(): Flow<Long> = unsafeFlow {
/**
* Creates flow that produces values from the given range.
*/
@ExperimentalCoroutinesApi
public fun IntRange.asFlow(): Flow<Int> = unsafeFlow {
public fun IntRange.asFlow(): Flow<Int> = flow {
forEach { value ->
emit(value)
}
Expand All @@ -192,7 +170,6 @@ public fun IntRange.asFlow(): Flow<Int> = unsafeFlow {
/**
* Creates flow that produces values from the given range.
*/
@ExperimentalCoroutinesApi
public fun LongRange.asFlow(): Flow<Long> = flow {
forEach { value ->
emit(value)
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Emits all elements from the given [channel] to this flow collector and [cancels][cancel] (consumes)
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/Flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ import kotlin.coroutines.*
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
* reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
*/
@ExperimentalCoroutinesApi
public interface Flow<out T> {
/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/FlowCollector.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@

package kotlinx.coroutines.flow

import kotlinx.coroutines.*

/**
* [FlowCollector] is used as an intermediate or a terminal collector of the flow and represents
* an entity that accepts values emitted by the [Flow].
*
* This interface should usually not be implemented directly, but rather used as a receiver in a [flow] builder when implementing a custom operator.
* Implementations of this interface are not thread-safe.
*/
@ExperimentalCoroutinesApi
public interface FlowCollector<in T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Creates a [CoroutineScope] and calls the specified suspend block with this scope.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.flow.*

internal object NopCollector : ConcurrentFlowCollector<Any?> {
override suspend fun emit(value: Any?) {
// does nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*

@PublishedApi
internal class SafeCollector<T>(
private val collector: FlowCollector<T>,
private val collectContext: CoroutineContext
Expand Down Expand Up @@ -99,3 +98,16 @@ internal class SafeCollector<T>(
return parent.transitiveCoroutineParent(collectJob)
}
}

/**
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
* Used in our own operators where we trust the context of invocations.
*/
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,3 @@ private fun checkFlowContext(context: CoroutineContext) {
"Flow context cannot contain job in it. Had $context"
}
}

2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Delays the emission of values from this flow for the given [timeMillis].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Returns flow where all subsequent repetitions of the same value are filtered out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Catches exceptions in the flow completion and calls a specified [action] with
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Returns a flow that ignores first [count] elements.
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
* Name of the property that defines the value of [DEFAULT_CONCURRENCY].
Expand Down Expand Up @@ -125,7 +125,7 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY
* ```
* produces `aa bb b_last`
*/
@ExperimentalCoroutinesApi
@FlowPreview
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = scopedFlow { downstream ->
var previousFlow: Job? = null
collect { value ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,46 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
import kotlinx.coroutines.flow.unsafeTransform as transform

/**
* Returns a flow containing only values of the original flow that matches the given [predicate].
*/
@ExperimentalCoroutinesApi
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}

/**
* Returns a flow containing only values of the original flow that do not match the given [predicate].
*/
@ExperimentalCoroutinesApi
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (!predicate(value)) return@transform emit(value)
}

/**
* Returns a flow containing only values that are instances of specified type [R].
*/
@ExperimentalCoroutinesApi
@Suppress("UNCHECKED_CAST")
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>

/**
* Returns a flow containing only values of the original flow that are not null.
*/
@ExperimentalCoroutinesApi
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
if (value != null) return@transform emit(value)
}

/**
* Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
*/
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
return@transform emit(transform(value))
}

/**
* Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
*/
@ExperimentalCoroutinesApi
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
val transformed = transform(value) ?: return@transform
return@transform emit(transformed)
Expand All @@ -76,7 +70,6 @@ public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
/**
* Returns a flow which performs the given [action] on each value of the original flow.
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
Expand Down
Loading