diff --git a/coroutines-guide.md b/coroutines-guide.md
index 0b7b842acb..4b3c09c40f 100644
--- a/coroutines-guide.md
+++ b/coroutines-guide.md
@@ -47,7 +47,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
* [Suspending functions](docs/flow.md#suspending-functions)
* [Flows](docs/flow.md#flows)
* [Flows are cold](docs/flow.md#flows-are-cold)
- * [Flow cancellation](docs/flow.md#flow-cancellation)
+ * [Flow cancellation basics](docs/flow.md#flow-cancellation-basics)
* [Flow builders](docs/flow.md#flow-builders)
* [Intermediate flow operators](docs/flow.md#intermediate-flow-operators)
* [Transform operator](docs/flow.md#transform-operator)
@@ -79,6 +79,8 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
* [Successful completion](docs/flow.md#successful-completion)
* [Imperative versus declarative](docs/flow.md#imperative-versus-declarative)
* [Launching flow](docs/flow.md#launching-flow)
+ * [Flow cancellation checks](docs/flow.md#flow-cancellation-checks)
+ * [Making busy flow cancellable](docs/flow.md#making-busy-flow-cancellable)
* [Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams)
* [Channels](docs/channels.md#channels)
diff --git a/docs/flow.md b/docs/flow.md
index 3f14b10417..9b331588c3 100644
--- a/docs/flow.md
+++ b/docs/flow.md
@@ -10,7 +10,7 @@
* [Suspending functions](#suspending-functions)
* [Flows](#flows)
* [Flows are cold](#flows-are-cold)
- * [Flow cancellation](#flow-cancellation)
+ * [Flow cancellation basics](#flow-cancellation-basics)
* [Flow builders](#flow-builders)
* [Intermediate flow operators](#intermediate-flow-operators)
* [Transform operator](#transform-operator)
@@ -42,6 +42,8 @@
* [Successful completion](#successful-completion)
* [Imperative versus declarative](#imperative-versus-declarative)
* [Launching flow](#launching-flow)
+ * [Flow cancellation checks](#flow-cancellation-checks)
+ * [Making busy flow cancellable](#making-busy-flow-cancellable)
* [Flow and Reactive Streams](#flow-and-reactive-streams)
@@ -267,12 +269,10 @@ This is a key reason the `foo()` function (which returns a flow) is not marked w
By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
that is why we see "Flow started" when we call `collect` again.
-### Flow cancellation
-
-Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
-additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
-cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
+### Flow cancellation basics
+Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be
+cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
and stops executing its code:
@@ -316,6 +316,8 @@ Done
+See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
+
### Flow builders
The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
@@ -1777,6 +1779,127 @@ as cancellation and structured concurrency serve this purpose.
Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
coroutine only without cancelling the whole scope or to [join][Job.join] it.
+### Flow cancellation checks
+
+For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
+It means that a busy loop emitting from a `flow { ... }` is cancellable:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ for (i in 1..5) {
+ println("Emitting $i")
+ emit(i)
+ }
+}
+
+fun main() = runBlocking {
+ foo().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
+
+We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:
+
+```text
+Emitting 1
+1
+Emitting 2
+2
+Emitting 3
+3
+Emitting 4
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
+```
+
+
+
+However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
+For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
+then there are no checks for cancellation:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun main() = runBlocking {
+ (1..5).asFlow().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
+
+All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
+
+```text
+1
+2
+3
+4
+5
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
+```
+
+
+
+#### Making busy flow cancellable
+
+In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
+You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
+[cancellable] operator provided to do that:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun main() = runBlocking {
+ (1..5).asFlow().cancellable().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
+
+With the `cancellable` operator only the numbers from 1 to 3 are collected:
+
+```text
+1
+2
+3
+Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
+```
+
+
+
### Flow and Reactive Streams
For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
@@ -1813,6 +1936,8 @@ Integration modules include conversions from and to `Flow`, integration with Rea
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
+[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
+[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
@@ -1845,4 +1970,6 @@ Integration modules include conversions from and to `Flow`, integration with Rea
[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
+[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
+[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt
index 74a0ad2583..8fd9ae76a4 100644
--- a/kotlinx-coroutines-core/common/src/flow/Builders.kt
+++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt
@@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* Creates a flow from the given suspendable [block].
*
* Example of usage:
+ *
* ```
* fun fibonacci(): Flow = flow {
* var x = BigInteger.ZERO
@@ -33,10 +34,13 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
*
* fibonacci().take(100).collect { println(it) }
* ```
- * Emissions from [flow] builder are [cancellable] by default.
+ *
+ * Emissions from [flow] builder are [cancellable] by default — each call to [emit][FlowCollector.emit]
+ * also calls [ensureActive][CoroutineContext.ensureActive].
*
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
* For example, the following code will result in an [IllegalStateException]:
+ *
* ```
* flow {
* emit(1) // Ok
@@ -45,6 +49,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* }
* }
* ```
+ *
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
*/
public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
index afdcd9ed18..010d781c02 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt
@@ -212,6 +212,9 @@ public fun Flow.flowOn(context: CoroutineContext): Flow {
* Returns a flow which checks cancellation status on each emission and throws
* the corresponding cancellation cause if flow collector was cancelled.
* Note that [flow] builder is [cancellable] by default.
+ *
+ * This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
+ * See [ensureActive][CoroutineContext.ensureActive] for details.
*/
public fun Flow.cancellable(): Flow {
if (this is AbstractFlow<*>) return this // Fast-path, already cancellable
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt
new file mode 100644
index 0000000000..229e55b130
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt
@@ -0,0 +1,23 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from flow.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.exampleFlow37
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun foo(): Flow = flow {
+ for (i in 1..5) {
+ println("Emitting $i")
+ emit(i)
+ }
+}
+
+fun main() = runBlocking {
+ foo().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt
new file mode 100644
index 0000000000..fcbbb24ef7
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt
@@ -0,0 +1,16 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from flow.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.exampleFlow38
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+ (1..5).asFlow().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt
new file mode 100644
index 0000000000..86275c7c4a
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt
@@ -0,0 +1,16 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from flow.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.exampleFlow39
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+ (1..5).asFlow().cancellable().collect { value ->
+ if (value == 3) cancel()
+ println(value)
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
index 39a7853b5f..15c9e1e4c6 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
@@ -381,4 +381,40 @@ class FlowGuideTest {
"Event: 3"
)
}
+
+ @Test
+ fun testExampleFlow37() {
+ test("ExampleFlow37") { kotlinx.coroutines.guide.exampleFlow37.main() }.verifyExceptions(
+ "Emitting 1",
+ "1",
+ "Emitting 2",
+ "2",
+ "Emitting 3",
+ "3",
+ "Emitting 4",
+ "Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@6d7b4f4c"
+ )
+ }
+
+ @Test
+ fun testExampleFlow38() {
+ test("ExampleFlow38") { kotlinx.coroutines.guide.exampleFlow38.main() }.verifyExceptions(
+ "1",
+ "2",
+ "3",
+ "4",
+ "5",
+ "Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@3327bd23"
+ )
+ }
+
+ @Test
+ fun testExampleFlow39() {
+ test("ExampleFlow39") { kotlinx.coroutines.guide.exampleFlow39.main() }.verifyExceptions(
+ "1",
+ "2",
+ "3",
+ "Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@5ec0a365"
+ )
+ }
}