Skip to content

Commit 9bbb669

Browse files
authored
Improve the docs and guide on flow cancellation (#2043)
* Improve the docs and guide on flow cancellation * Remove outdated phrase on "flow infrastructure does not introduce additional cancellation points". They were introduced by #2028 * Add a section on "Flow cancellation check" with examples on `Flow.cancellable()`` operator. * Add a bit more detail in `flow` and `cancellable` docs with links to `ensureActive()`.
1 parent 41f3755 commit 9bbb669

File tree

8 files changed

+236
-8
lines changed

8 files changed

+236
-8
lines changed

coroutines-guide.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
4747
* <a name='suspending-functions'></a>[Suspending functions](docs/flow.md#suspending-functions)
4848
* <a name='flows'></a>[Flows](docs/flow.md#flows)
4949
* <a name='flows-are-cold'></a>[Flows are cold](docs/flow.md#flows-are-cold)
50-
* <a name='flow-cancellation'></a>[Flow cancellation](docs/flow.md#flow-cancellation)
50+
* <a name='flow-cancellation-basics'></a>[Flow cancellation basics](docs/flow.md#flow-cancellation-basics)
5151
* <a name='flow-builders'></a>[Flow builders](docs/flow.md#flow-builders)
5252
* <a name='intermediate-flow-operators'></a>[Intermediate flow operators](docs/flow.md#intermediate-flow-operators)
5353
* <a name='transform-operator'></a>[Transform operator](docs/flow.md#transform-operator)
@@ -79,6 +79,8 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
7979
* <a name='successful-completion'></a>[Successful completion](docs/flow.md#successful-completion)
8080
* <a name='imperative-versus-declarative'></a>[Imperative versus declarative](docs/flow.md#imperative-versus-declarative)
8181
* <a name='launching-flow'></a>[Launching flow](docs/flow.md#launching-flow)
82+
* <a name='flow-cancellation-checks'></a>[Flow cancellation checks](docs/flow.md#flow-cancellation-checks)
83+
* <a name='making-busy-flow-cancellable'></a>[Making busy flow cancellable](docs/flow.md#making-busy-flow-cancellable)
8284
* <a name='flow-and-reactive-streams'></a>[Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams)
8385
<!--- TOC_REF docs/channels.md -->
8486
* <a name='channels'></a>[Channels](docs/channels.md#channels)

docs/flow.md

+133-6
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* [Suspending functions](#suspending-functions)
1111
* [Flows](#flows)
1212
* [Flows are cold](#flows-are-cold)
13-
* [Flow cancellation](#flow-cancellation)
13+
* [Flow cancellation basics](#flow-cancellation-basics)
1414
* [Flow builders](#flow-builders)
1515
* [Intermediate flow operators](#intermediate-flow-operators)
1616
* [Transform operator](#transform-operator)
@@ -42,6 +42,8 @@
4242
* [Successful completion](#successful-completion)
4343
* [Imperative versus declarative](#imperative-versus-declarative)
4444
* [Launching flow](#launching-flow)
45+
* [Flow cancellation checks](#flow-cancellation-checks)
46+
* [Making busy flow cancellable](#making-busy-flow-cancellable)
4547
* [Flow and Reactive Streams](#flow-and-reactive-streams)
4648

4749
<!--- END -->
@@ -267,12 +269,10 @@ This is a key reason the `foo()` function (which returns a flow) is not marked w
267269
By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
268270
that is why we see "Flow started" when we call `collect` again.
269271

270-
### Flow cancellation
271-
272-
Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
273-
additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
274-
cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
272+
### Flow cancellation basics
275273

274+
Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be
275+
cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
276276
The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
277277
and stops executing its code:
278278

@@ -316,6 +316,8 @@ Done
316316

317317
<!--- TEST -->
318318

319+
See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
320+
319321
### Flow builders
320322

321323
The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
@@ -1777,6 +1779,127 @@ as cancellation and structured concurrency serve this purpose.
17771779
Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
17781780
coroutine only without cancelling the whole scope or to [join][Job.join] it.
17791781

1782+
### Flow cancellation checks
1783+
1784+
For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
1785+
It means that a busy loop emitting from a `flow { ... }` is cancellable:
1786+
1787+
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1788+
1789+
```kotlin
1790+
import kotlinx.coroutines.*
1791+
import kotlinx.coroutines.flow.*
1792+
1793+
//sampleStart
1794+
fun foo(): Flow<Int> = flow {
1795+
for (i in 1..5) {
1796+
println("Emitting $i")
1797+
emit(i)
1798+
}
1799+
}
1800+
1801+
fun main() = runBlocking<Unit> {
1802+
foo().collect { value ->
1803+
if (value == 3) cancel()
1804+
println(value)
1805+
}
1806+
}
1807+
//sampleEnd
1808+
```
1809+
1810+
</div>
1811+
1812+
> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
1813+
1814+
We get only numbers up to 3 and a [CancellationException] after trying to emit number 4:
1815+
1816+
```text
1817+
Emitting 1
1818+
1
1819+
Emitting 2
1820+
2
1821+
Emitting 3
1822+
3
1823+
Emitting 4
1824+
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
1825+
```
1826+
1827+
<!--- TEST EXCEPTION -->
1828+
1829+
However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
1830+
For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
1831+
then there are no checks for cancellation:
1832+
1833+
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1834+
1835+
```kotlin
1836+
import kotlinx.coroutines.*
1837+
import kotlinx.coroutines.flow.*
1838+
1839+
//sampleStart
1840+
fun main() = runBlocking<Unit> {
1841+
(1..5).asFlow().collect { value ->
1842+
if (value == 3) cancel()
1843+
println(value)
1844+
}
1845+
}
1846+
//sampleEnd
1847+
```
1848+
1849+
</div>
1850+
1851+
> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
1852+
1853+
All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
1854+
1855+
```text
1856+
1
1857+
2
1858+
3
1859+
4
1860+
5
1861+
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
1862+
```
1863+
1864+
<!--- TEST EXCEPTION -->
1865+
1866+
#### Making busy flow cancellable
1867+
1868+
In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
1869+
You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
1870+
[cancellable] operator provided to do that:
1871+
1872+
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1873+
1874+
```kotlin
1875+
import kotlinx.coroutines.*
1876+
import kotlinx.coroutines.flow.*
1877+
1878+
//sampleStart
1879+
fun main() = runBlocking<Unit> {
1880+
(1..5).asFlow().cancellable().collect { value ->
1881+
if (value == 3) cancel()
1882+
println(value)
1883+
}
1884+
}
1885+
//sampleEnd
1886+
```
1887+
1888+
</div>
1889+
1890+
> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
1891+
1892+
With the `cancellable` operator only the numbers from 1 to 3 are collected:
1893+
1894+
```text
1895+
1
1896+
2
1897+
3
1898+
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
1899+
```
1900+
1901+
<!--- TEST EXCEPTION -->
1902+
17801903
### Flow and Reactive Streams
17811904

17821905
For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
@@ -1813,6 +1936,8 @@ Integration modules include conversions from and to `Flow`, integration with Rea
18131936
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
18141937
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
18151938
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1939+
[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
1940+
[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
18161941
<!--- INDEX kotlinx.coroutines.flow -->
18171942
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
18181943
[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
@@ -1845,4 +1970,6 @@ Integration modules include conversions from and to `Flow`, integration with Rea
18451970
[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
18461971
[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
18471972
[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1973+
[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
1974+
[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
18481975
<!--- END -->

kotlinx-coroutines-core/common/src/flow/Builders.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1919
* Creates a flow from the given suspendable [block].
2020
*
2121
* Example of usage:
22+
*
2223
* ```
2324
* fun fibonacci(): Flow<BigInteger> = flow {
2425
* var x = BigInteger.ZERO
@@ -33,10 +34,13 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
3334
*
3435
* fibonacci().take(100).collect { println(it) }
3536
* ```
36-
* Emissions from [flow] builder are [cancellable] by default.
37+
*
38+
* Emissions from [flow] builder are [cancellable] by default &mdash; each call to [emit][FlowCollector.emit]
39+
* also calls [ensureActive][CoroutineContext.ensureActive].
3740
*
3841
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
3942
* For example, the following code will result in an [IllegalStateException]:
43+
*
4044
* ```
4145
* flow {
4246
* emit(1) // Ok
@@ -45,6 +49,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
4549
* }
4650
* }
4751
* ```
52+
*
4853
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
4954
*/
5055
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+3
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
212212
* Returns a flow which checks cancellation status on each emission and throws
213213
* the corresponding cancellation cause if flow collector was cancelled.
214214
* Note that [flow] builder is [cancellable] by default.
215+
*
216+
* This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
217+
* See [ensureActive][CoroutineContext.ensureActive] for details.
215218
*/
216219
public fun <T> Flow<T>.cancellable(): Flow<T> {
217220
if (this is AbstractFlow<*>) return this // Fast-path, already cancellable
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
// This file was automatically generated from flow.md by Knit tool. Do not edit.
6+
package kotlinx.coroutines.guide.exampleFlow37
7+
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
10+
11+
fun foo(): Flow<Int> = flow {
12+
for (i in 1..5) {
13+
println("Emitting $i")
14+
emit(i)
15+
}
16+
}
17+
18+
fun main() = runBlocking<Unit> {
19+
foo().collect { value ->
20+
if (value == 3) cancel()
21+
println(value)
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
// This file was automatically generated from flow.md by Knit tool. Do not edit.
6+
package kotlinx.coroutines.guide.exampleFlow38
7+
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
10+
11+
fun main() = runBlocking<Unit> {
12+
(1..5).asFlow().collect { value ->
13+
if (value == 3) cancel()
14+
println(value)
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
// This file was automatically generated from flow.md by Knit tool. Do not edit.
6+
package kotlinx.coroutines.guide.exampleFlow39
7+
8+
import kotlinx.coroutines.*
9+
import kotlinx.coroutines.flow.*
10+
11+
fun main() = runBlocking<Unit> {
12+
(1..5).asFlow().cancellable().collect { value ->
13+
if (value == 3) cancel()
14+
println(value)
15+
}
16+
}

kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt

+36
Original file line numberDiff line numberDiff line change
@@ -381,4 +381,40 @@ class FlowGuideTest {
381381
"Event: 3"
382382
)
383383
}
384+
385+
@Test
386+
fun testExampleFlow37() {
387+
test("ExampleFlow37") { kotlinx.coroutines.guide.exampleFlow37.main() }.verifyExceptions(
388+
"Emitting 1",
389+
"1",
390+
"Emitting 2",
391+
"2",
392+
"Emitting 3",
393+
"3",
394+
"Emitting 4",
395+
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@6d7b4f4c"
396+
)
397+
}
398+
399+
@Test
400+
fun testExampleFlow38() {
401+
test("ExampleFlow38") { kotlinx.coroutines.guide.exampleFlow38.main() }.verifyExceptions(
402+
"1",
403+
"2",
404+
"3",
405+
"4",
406+
"5",
407+
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@3327bd23"
408+
)
409+
}
410+
411+
@Test
412+
fun testExampleFlow39() {
413+
test("ExampleFlow39") { kotlinx.coroutines.guide.exampleFlow39.main() }.verifyExceptions(
414+
"1",
415+
"2",
416+
"3",
417+
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@5ec0a365"
418+
)
419+
}
384420
}

0 commit comments

Comments
 (0)