Skip to content

New flow builder: channelFlow (and its alias callbackFlow) and supple… #1214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ public final class kotlinx/coroutines/channels/ConflatedBroadcastChannel : kotli
}

public final class kotlinx/coroutines/channels/ProduceKt {
public static final fun awaitClose (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun awaitClose$default (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun produce (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produce$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down Expand Up @@ -796,6 +798,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
Expand Down
30 changes: 30 additions & 0 deletions kotlinx-coroutines-core/common/src/channels/Produce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,36 @@ public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
val channel: SendChannel<E>
}

/**
* Suspends the current coroutine until the channel is either [closed][SendChannel.close] or [cancelled][ReceiveChannel.cancel]
* and invokes the given [block] before resuming the coroutine.
*
* Note that when producer channel is cancelled this function resumes with cancellation exception,
* so putting the code after calling this function would not lead to its execution in case of cancellation.
* That is why this code takes a lambda parameter.
*
* Example of usage:
* ```
* val callbackEventsStream = produce {
* val disposable = registerChannelInCallback(channel)
* awaitClose { disposable.dispose() }
* }
* ```
*/
@ExperimentalCoroutinesApi
public suspend fun <T> ProducerScope<T>.awaitClose(block: () -> Unit = {}) {
check(kotlin.coroutines.coroutineContext[Job] === this) { "awaitClose() can be invoke only from the producer context" }
try {
suspendCancellableCoroutine<Unit> { cont ->
invokeOnClose {
cont.resume(Unit)
}
}
} finally {
block()
}
}

/**
* Launches new coroutine to produce a stream of values by sending them to a channel
* and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
Expand Down
123 changes: 93 additions & 30 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -200,38 +200,14 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
}

/**
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
* that is provided to the builder's [block] of code. It allows elements to be
* produced by the code that is running in a different context, e.g. from a callback-based API.
*
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
* on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
* should be used instead.
*
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
*
* Example of usage:
* ```
* fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
* val callback = object : Callback { // implementation of some callback interface
* override fun onNextValue(value: T) {
* channel.offer(value) // Note: offer drops value when buffer is full
* }
* override fun onApiError(cause: Throwable) {
* channel.cancel("API Error", CancellationException(cause))
* }
* override fun onCompleted() = channel.close()
* }
* api.register(callback)
* channel.invokeOnClose {
* api.unregister(callback)
* }
* }
* ```
* @suppress
*/
@FlowPreview
@Deprecated(
message = "Use channelFlow instead",
level = DeprecationLevel.WARNING,
replaceWith = ReplaceWith("channelFlow(bufferSize, block)")
)
public fun <T> flowViaChannel(
bufferSize: Int = 16,
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
Expand All @@ -249,3 +225,90 @@ public fun <T> flowViaChannel(
}
}
}

/**
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
* produced by the code that is running in a different context or running concurrently.
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
* on the resulting flow.
*
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used concurrently from different contexts.
* The resulting flow will complete as soon as [ProducerScope], to artificially prolong it [awaitClose] can be used.
* For more detailed example please refer to [callbackFlow] documentation.
*
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
*
* Examples of usage:
* ```
* fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
* launch {
* collect { value -> send(value) }
* }
* other.collect { value -> send(value) }
* }
*
* fun <T> contextualFlow(): Flow<T> = channelFlow {
* launch(Dispatchers.IO) {
* send(computeIoValue())
* }
*
* launch(Dispatchers.Default) {
* send(computeCpuValue())
* }
* }
* ```
*/
@FlowPreview
public fun <T> channelFlow(bufferSize: Int = 16, @BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
flow {
coroutineScope {
val channel = produce(capacity = bufferSize, block = block)
channel.consumeEach { value ->
emit(value)
}
}
}

/**
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
* that is provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
* produced by the code that is running in a different context or running concurrently.
*
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
* on the resulting flow.
*
* This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used from any context,
* e.g. from the callback-based API. The flow completes as soon as its scope completes, thus if you are using channel from the
* callback-based API, to artificially prolong scope lifetime and avoid memory-leaks related to unregistered resources,
* [awaitClose] extension should be used. [awaitClose] argument will be invoked when either flow consumer cancels flow collection
* or when callback-based API invokes [SendChannel.close] manually.
*
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
* The provided channel can later be used by any external service to communicate with the flow and its buffer determines
* backpressure buffer size or its behaviour (e.g. in the case when [Channel.CONFLATED] was used).
*
* Example of usage:
* ```
* fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
* val callback = object : Callback { // implementation of some callback interface
* override fun onNextValue(value: T) {
* // Note: offer drops value when buffer is full
* // Channel.UNLIMITED can be used to avoid overfill
* offer(value)
* }
* override fun onApiError(cause: Throwable) {
* cancel(CancellationException("API Error", cause))
* }
* override fun onCompleted() = channel.close()
* }
* api.register(callback)
* // Suspend until either onCompleted or external cancellation are invoked
* await { api.unregister(callback) }
* }
* ```
*/
public inline fun <T> callbackFlow(bufferSize: Int = 16, @BuilderInference crossinline block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
channelFlow(bufferSize) { block() }
54 changes: 54 additions & 0 deletions kotlinx-coroutines-core/common/test/channels/ProduceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,60 @@ class ProduceTest : TestBase() {
cancelOnCompletion(coroutineContext)
}

@Test
fun testAwaitConsumerCancellation() = runTest {
val parent = Job()
val channel = produce<Int>(parent) {
expect(2)
awaitClose { expect(4) }
}
expect(1)
yield()
expect(3)
channel.cancel()
parent.complete()
parent.join()
finish(5)
}

@Test
fun testAwaitProducerCancellation() = runTest {
val parent = Job()
produce<Int>(parent) {
expect(2)
launch {
expect(3)
[email protected]()
}
awaitClose { expect(4) }
}
expect(1)
parent.complete()
parent.join()
finish(5)
}

@Test
fun testAwaitParentCancellation() = runTest {
val parent = Job()
produce<Int>(parent) {
expect(2)
awaitClose { expect(4) }
}
expect(1)
yield()
expect(3)
parent.cancelAndJoin()
finish(5)
}

@Test
fun testAwaitIllegalState() = runTest {
val channel = produce<Int> { }
@Suppress("RemoveExplicitTypeArguments") // KT-31525
assertFailsWith<IllegalStateException> { (channel as ProducerScope<*>).awaitClose<Nothing>() }
}

private suspend fun cancelOnCompletion(coroutineContext: CoroutineContext) = CoroutineScope(coroutineContext).apply {
val source = Channel<Int>()
expect(1)
Expand Down
Loading