diff --git a/coroutines-guide.md b/coroutines-guide.md index 0b7b842acb..4b3c09c40f 100644 --- a/coroutines-guide.md +++ b/coroutines-guide.md @@ -47,7 +47,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m * [Suspending functions](docs/flow.md#suspending-functions) * [Flows](docs/flow.md#flows) * [Flows are cold](docs/flow.md#flows-are-cold) - * [Flow cancellation](docs/flow.md#flow-cancellation) + * [Flow cancellation basics](docs/flow.md#flow-cancellation-basics) * [Flow builders](docs/flow.md#flow-builders) * [Intermediate flow operators](docs/flow.md#intermediate-flow-operators) * [Transform operator](docs/flow.md#transform-operator) @@ -79,6 +79,8 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m * [Successful completion](docs/flow.md#successful-completion) * [Imperative versus declarative](docs/flow.md#imperative-versus-declarative) * [Launching flow](docs/flow.md#launching-flow) + * [Flow cancellation checks](docs/flow.md#flow-cancellation-checks) + * [Making busy flow cancellable](docs/flow.md#making-busy-flow-cancellable) * [Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams) * [Channels](docs/channels.md#channels) diff --git a/docs/flow.md b/docs/flow.md index 3f14b10417..9b331588c3 100644 --- a/docs/flow.md +++ b/docs/flow.md @@ -10,7 +10,7 @@ * [Suspending functions](#suspending-functions) * [Flows](#flows) * [Flows are cold](#flows-are-cold) - * [Flow cancellation](#flow-cancellation) + * [Flow cancellation basics](#flow-cancellation-basics) * [Flow builders](#flow-builders) * [Intermediate flow operators](#intermediate-flow-operators) * [Transform operator](#transform-operator) @@ -42,6 +42,8 @@ * [Successful completion](#successful-completion) * [Imperative versus declarative](#imperative-versus-declarative) * [Launching flow](#launching-flow) + * [Flow cancellation checks](#flow-cancellation-checks) + * [Making busy flow cancellable](#making-busy-flow-cancellable) * [Flow and Reactive Streams](#flow-and-reactive-streams) @@ -267,12 +269,10 @@ This is a key reason the `foo()` function (which returns a flow) is not marked w By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected, that is why we see "Flow started" when we call `collect` again. -### Flow cancellation - -Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce -additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be -cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise. +### Flow cancellation basics +Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be +cancelled when the flow is suspended in a cancellable suspending function (like [delay]). The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block and stops executing its code: @@ -316,6 +316,8 @@ Done +See [Flow cancellation checks](#flow-cancellation-checks) section for more details. + ### Flow builders The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for @@ -1777,6 +1779,127 @@ as cancellation and structured concurrency serve this purpose. Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection coroutine only without cancelling the whole scope or to [join][Job.join] it. +### Flow cancellation checks + +For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value. +It means that a busy loop emitting from a `flow { ... }` is cancellable: + +
+ +```kotlin +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +//sampleStart +fun foo(): Flow = flow { + for (i in 1..5) { + println("Emitting $i") + emit(i) + } +} + +fun main() = runBlocking { + foo().collect { value -> + if (value == 3) cancel() + println(value) + } +} +//sampleEnd +``` + +
+ +> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt). + +We get only numbers up to 3 and a [CancellationException] after trying to emit number 4: + +```text +Emitting 1 +1 +Emitting 2 +2 +Emitting 3 +3 +Emitting 4 +Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c +``` + + + +However, most other flow operators do not do additional cancellation checks on their own for performance reasons. +For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere, +then there are no checks for cancellation: + +
+ +```kotlin +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +//sampleStart +fun main() = runBlocking { + (1..5).asFlow().collect { value -> + if (value == 3) cancel() + println(value) + } +} +//sampleEnd +``` + +
+ +> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt). + +All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`: + +```text +1 +2 +3 +4 +5 +Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23 +``` + + + +#### Making busy flow cancellable + +In the case where you have a busy loop with coroutines you must explicitly check for cancellation. +You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use +[cancellable] operator provided to do that: + +
+ +```kotlin +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +//sampleStart +fun main() = runBlocking { + (1..5).asFlow().cancellable().collect { value -> + if (value == 3) cancel() + println(value) + } +} +//sampleEnd +``` + +
+ +> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt). + +With the `cancellable` operator only the numbers from 1 to 3 are collected: + +```text +1 +2 +3 +Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365 +``` + + + ### Flow and Reactive Streams For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor, @@ -1813,6 +1936,8 @@ Integration modules include conversions from and to `Flow`, integration with Rea [Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html [Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html [Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html +[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html +[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html [Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html [flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html @@ -1845,4 +1970,6 @@ Integration modules include conversions from and to `Flow`, integration with Rea [catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html [onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html [launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html +[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html +[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 74a0ad2583..8fd9ae76a4 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow * Creates a flow from the given suspendable [block]. * * Example of usage: + * * ``` * fun fibonacci(): Flow = flow { * var x = BigInteger.ZERO @@ -33,10 +34,13 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow * * fibonacci().take(100).collect { println(it) } * ``` - * Emissions from [flow] builder are [cancellable] by default. + * + * Emissions from [flow] builder are [cancellable] by default — each call to [emit][FlowCollector.emit] + * also calls [ensureActive][CoroutineContext.ensureActive]. * * `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context. * For example, the following code will result in an [IllegalStateException]: + * * ``` * flow { * emit(1) // Ok @@ -45,6 +49,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow * } * } * ``` + * * If you want to switch the context of execution of a flow, use the [flowOn] operator. */ public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index afdcd9ed18..010d781c02 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -212,6 +212,9 @@ public fun Flow.flowOn(context: CoroutineContext): Flow { * Returns a flow which checks cancellation status on each emission and throws * the corresponding cancellation cause if flow collector was cancelled. * Note that [flow] builder is [cancellable] by default. + * + * This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`. + * See [ensureActive][CoroutineContext.ensureActive] for details. */ public fun Flow.cancellable(): Flow { if (this is AbstractFlow<*>) return this // Fast-path, already cancellable diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt new file mode 100644 index 0000000000..229e55b130 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt @@ -0,0 +1,23 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +// This file was automatically generated from flow.md by Knit tool. Do not edit. +package kotlinx.coroutines.guide.exampleFlow37 + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +fun foo(): Flow = flow { + for (i in 1..5) { + println("Emitting $i") + emit(i) + } +} + +fun main() = runBlocking { + foo().collect { value -> + if (value == 3) cancel() + println(value) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt new file mode 100644 index 0000000000..fcbbb24ef7 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt @@ -0,0 +1,16 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +// This file was automatically generated from flow.md by Knit tool. Do not edit. +package kotlinx.coroutines.guide.exampleFlow38 + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +fun main() = runBlocking { + (1..5).asFlow().collect { value -> + if (value == 3) cancel() + println(value) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt new file mode 100644 index 0000000000..86275c7c4a --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt @@ -0,0 +1,16 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +// This file was automatically generated from flow.md by Knit tool. Do not edit. +package kotlinx.coroutines.guide.exampleFlow39 + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +fun main() = runBlocking { + (1..5).asFlow().cancellable().collect { value -> + if (value == 3) cancel() + println(value) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt index 39a7853b5f..15c9e1e4c6 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt @@ -381,4 +381,40 @@ class FlowGuideTest { "Event: 3" ) } + + @Test + fun testExampleFlow37() { + test("ExampleFlow37") { kotlinx.coroutines.guide.exampleFlow37.main() }.verifyExceptions( + "Emitting 1", + "1", + "Emitting 2", + "2", + "Emitting 3", + "3", + "Emitting 4", + "Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@6d7b4f4c" + ) + } + + @Test + fun testExampleFlow38() { + test("ExampleFlow38") { kotlinx.coroutines.guide.exampleFlow38.main() }.verifyExceptions( + "1", + "2", + "3", + "4", + "5", + "Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@3327bd23" + ) + } + + @Test + fun testExampleFlow39() { + test("ExampleFlow39") { kotlinx.coroutines.guide.exampleFlow39.main() }.verifyExceptions( + "1", + "2", + "3", + "Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@5ec0a365" + ) + } }