Skip to content

Improve the docs and guide on flow cancellation #2043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion coroutines-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
* <a name='suspending-functions'></a>[Suspending functions](docs/flow.md#suspending-functions)
* <a name='flows'></a>[Flows](docs/flow.md#flows)
* <a name='flows-are-cold'></a>[Flows are cold](docs/flow.md#flows-are-cold)
* <a name='flow-cancellation'></a>[Flow cancellation](docs/flow.md#flow-cancellation)
* <a name='flow-cancellation-basics'></a>[Flow cancellation basics](docs/flow.md#flow-cancellation-basics)
* <a name='flow-builders'></a>[Flow builders](docs/flow.md#flow-builders)
* <a name='intermediate-flow-operators'></a>[Intermediate flow operators](docs/flow.md#intermediate-flow-operators)
* <a name='transform-operator'></a>[Transform operator](docs/flow.md#transform-operator)
Expand Down Expand Up @@ -79,6 +79,8 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
* <a name='successful-completion'></a>[Successful completion](docs/flow.md#successful-completion)
* <a name='imperative-versus-declarative'></a>[Imperative versus declarative](docs/flow.md#imperative-versus-declarative)
* <a name='launching-flow'></a>[Launching flow](docs/flow.md#launching-flow)
* <a name='flow-cancellation-checks'></a>[Flow cancellation checks](docs/flow.md#flow-cancellation-checks)
* <a name='making-busy-flow-cancellable'></a>[Making busy flow cancellable](docs/flow.md#making-busy-flow-cancellable)
* <a name='flow-and-reactive-streams'></a>[Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams)
<!--- TOC_REF docs/channels.md -->
* <a name='channels'></a>[Channels](docs/channels.md#channels)
Expand Down
132 changes: 126 additions & 6 deletions docs/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* [Suspending functions](#suspending-functions)
* [Flows](#flows)
* [Flows are cold](#flows-are-cold)
* [Flow cancellation](#flow-cancellation)
* [Flow cancellation basics](#flow-cancellation-basics)
* [Flow builders](#flow-builders)
* [Intermediate flow operators](#intermediate-flow-operators)
* [Transform operator](#transform-operator)
Expand Down Expand Up @@ -42,6 +42,8 @@
* [Successful completion](#successful-completion)
* [Imperative versus declarative](#imperative-versus-declarative)
* [Launching flow](#launching-flow)
* [Flow cancellation checks](#flow-cancellation-checks)
* [Making busy flow cancellable](#making-busy-flow-cancellable)
* [Flow and Reactive Streams](#flow-and-reactive-streams)

<!--- END -->
Expand Down Expand Up @@ -267,12 +269,10 @@ This is a key reason the `foo()` function (which returns a flow) is not marked w
By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
that is why we see "Flow started" when we call `collect` again.

### Flow cancellation

Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
### Flow cancellation basics

Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be
cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
and stops executing its code:

Expand Down Expand Up @@ -316,6 +316,8 @@ Done

<!--- TEST -->

See [Flow cancellation checks](#flow-cancellation-checks) section for more details.

### Flow builders

The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
Expand Down Expand Up @@ -1777,6 +1779,120 @@ as cancellation and structured concurrency serve this purpose.
Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
coroutine only without cancelling the whole scope or to [join][Job.join] it.

### Flow cancellation checks

For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
It means that a busy loop emitting from a `flow { ... }` is cancellable:

<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun foo(): Flow<Int> = flow {
for (i in 1..5) { emit(i) }
}

fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
//sampleEnd
```

</div>

> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).

We get only numbers up to 3 and a [CancellationException]:

```text
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
```

<!--- TEST EXCEPTION -->

However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
then there are no checks for cancellation:

<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
//sampleEnd
```

</div>

> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).

All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:

```text
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
```

<!--- TEST EXCEPTION -->

#### Making busy flow cancellable

In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
[cancellable] operator provided to do that:

<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">

```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

//sampleStart
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
//sampleEnd
```

</div>

> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).

With the `cancellable` operator only the numbers from 1 to 3 are collected:

```text
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
```

<!--- TEST EXCEPTION -->

### Flow and Reactive Streams

For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
Expand Down Expand Up @@ -1813,6 +1929,8 @@ Integration modules include conversions from and to `Flow`, integration with Rea
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
<!--- INDEX kotlinx.coroutines.flow -->
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
Expand Down Expand Up @@ -1845,4 +1963,6 @@ Integration modules include conversions from and to `Flow`, integration with Rea
[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
<!--- END -->
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* Creates a flow from the given suspendable [block].
*
* Example of usage:
*
* ```
* fun fibonacci(): Flow<BigInteger> = flow {
* var x = BigInteger.ZERO
Expand All @@ -33,10 +34,13 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
*
* fibonacci().take(100).collect { println(it) }
* ```
* Emissions from [flow] builder are [cancellable] by default.
*
* Emissions from [flow] builder are [cancellable] by default &mdash; each call to [emit][FlowCollector.emit]
* also calls [ensureActive][CoroutineContext.ensureActive].
*
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
* For example, the following code will result in an [IllegalStateException]:
*
* ```
* flow {
* emit(1) // Ok
Expand All @@ -45,6 +49,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* }
* }
* ```
*
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
*/
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
Expand Down
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
* Returns a flow which checks cancellation status on each emission and throws
* the corresponding cancellation cause if flow collector was cancelled.
* Note that [flow] builder is [cancellable] by default.
*
* This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
* See [ensureActive][CoroutineContext.ensureActive] for details.
*/
public fun <T> Flow<T>.cancellable(): Flow<T> {
if (this is AbstractFlow<*>) return this // Fast-path, already cancellable
Expand Down
20 changes: 20 additions & 0 deletions kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

// This file was automatically generated from flow.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleFlow37

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun foo(): Flow<Int> = flow {
for (i in 1..5) { emit(i) }
}

fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
16 changes: 16 additions & 0 deletions kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

// This file was automatically generated from flow.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleFlow38

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
16 changes: 16 additions & 0 deletions kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

// This file was automatically generated from flow.md by Knit tool. Do not edit.
package kotlinx.coroutines.guide.exampleFlow39

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
32 changes: 32 additions & 0 deletions kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,36 @@ class FlowGuideTest {
"Event: 3"
)
}

@Test
fun testExampleFlow37() {
test("ExampleFlow37") { kotlinx.coroutines.guide.exampleFlow37.main() }.verifyExceptions(
"1",
"2",
"3",
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@6d7b4f4c"
)
}

@Test
fun testExampleFlow38() {
test("ExampleFlow38") { kotlinx.coroutines.guide.exampleFlow38.main() }.verifyExceptions(
"1",
"2",
"3",
"4",
"5",
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@3327bd23"
)
}

@Test
fun testExampleFlow39() {
test("ExampleFlow39") { kotlinx.coroutines.guide.exampleFlow39.main() }.verifyExceptions(
"1",
"2",
"3",
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@5ec0a365"
)
}
}