Skip to content

Commit 572daa6

Browse files
committed
Improve the docs and guide on flow cancellation
* Remove outdated phrase on "flow infrastructure does not introduce additional cancellation points". They were introduced by #2028 * Add a section on "Flow cancellation check" with examples on `Flow.cancellable()`` operator. * Add a bit more detail in `flow` and `cancellable` docs with links to `ensureActive()`.
1 parent 1eeed50 commit 572daa6

File tree

5 files changed

+170
-8
lines changed

5 files changed

+170
-8
lines changed

coroutines-guide.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
4747
* <a name='suspending-functions'></a>[Suspending functions](docs/flow.md#suspending-functions)
4848
* <a name='flows'></a>[Flows](docs/flow.md#flows)
4949
* <a name='flows-are-cold'></a>[Flows are cold](docs/flow.md#flows-are-cold)
50-
* <a name='flow-cancellation'></a>[Flow cancellation](docs/flow.md#flow-cancellation)
50+
* <a name='flow-cancellation-basics'></a>[Flow cancellation basics](docs/flow.md#flow-cancellation-basics)
5151
* <a name='flow-builders'></a>[Flow builders](docs/flow.md#flow-builders)
5252
* <a name='intermediate-flow-operators'></a>[Intermediate flow operators](docs/flow.md#intermediate-flow-operators)
5353
* <a name='transform-operator'></a>[Transform operator](docs/flow.md#transform-operator)
@@ -79,6 +79,8 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
7979
* <a name='successful-completion'></a>[Successful completion](docs/flow.md#successful-completion)
8080
* <a name='imperative-versus-declarative'></a>[Imperative versus declarative](docs/flow.md#imperative-versus-declarative)
8181
* <a name='launching-flow'></a>[Launching flow](docs/flow.md#launching-flow)
82+
* <a name='flow-cancellation-checks'></a>[Flow cancellation checks](docs/flow.md#flow-cancellation-checks)
83+
* <a name='making-busy-flow-cancellable'></a>[Making busy flow cancellable](docs/flow.md#making-busy-flow-cancellable)
8284
* <a name='flow-and-reactive-streams'></a>[Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams)
8385
<!--- TOC_REF docs/channels.md -->
8486
* <a name='channels'></a>[Channels](docs/channels.md#channels)

docs/flow.md

+126-6
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* [Suspending functions](#suspending-functions)
1111
* [Flows](#flows)
1212
* [Flows are cold](#flows-are-cold)
13-
* [Flow cancellation](#flow-cancellation)
13+
* [Flow cancellation basics](#flow-cancellation-basics)
1414
* [Flow builders](#flow-builders)
1515
* [Intermediate flow operators](#intermediate-flow-operators)
1616
* [Transform operator](#transform-operator)
@@ -42,6 +42,8 @@
4242
* [Successful completion](#successful-completion)
4343
* [Imperative versus declarative](#imperative-versus-declarative)
4444
* [Launching flow](#launching-flow)
45+
* [Flow cancellation checks](#flow-cancellation-checks)
46+
* [Making busy flow cancellable](#making-busy-flow-cancellable)
4547
* [Flow and Reactive Streams](#flow-and-reactive-streams)
4648

4749
<!--- END -->
@@ -267,12 +269,10 @@ This is a key reason the `foo()` function (which returns a flow) is not marked w
267269
By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
268270
that is why we see "Flow started" when we call `collect` again.
269271

270-
### Flow cancellation
271-
272-
Flow adheres to the general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
273-
additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
274-
cancelled when the flow is suspended in a cancellable suspending function (like [delay]), and cannot be cancelled otherwise.
272+
### Flow cancellation basics
275273

274+
Flow adheres to the general cooperative cancellation of coroutines. As usual, flow collection can be
275+
cancelled when the flow is suspended in a cancellable suspending function (like [delay]).
276276
The following example shows how the flow gets cancelled on a timeout when running in a [withTimeoutOrNull] block
277277
and stops executing its code:
278278

@@ -316,6 +316,8 @@ Done
316316

317317
<!--- TEST -->
318318

319+
See [Flow cancellation checks](#flow-cancellation-checks) section for more details.
320+
319321
### Flow builders
320322

321323
The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
@@ -1777,6 +1779,120 @@ as cancellation and structured concurrency serve this purpose.
17771779
Note that [launchIn] also returns a [Job], which can be used to [cancel][Job.cancel] the corresponding flow collection
17781780
coroutine only without cancelling the whole scope or to [join][Job.join] it.
17791781

1782+
### Flow cancellation checks
1783+
1784+
For convenience, the [flow] builder performs additional [ensureActive] checks for cancellation on each emitted value.
1785+
It means that a busy loop emitting from a `flow { ... }` is cancellable:
1786+
1787+
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1788+
1789+
```kotlin
1790+
import kotlinx.coroutines.*
1791+
import kotlinx.coroutines.flow.*
1792+
1793+
//sampleStart
1794+
fun foo(): Flow<Int> = flow {
1795+
for (i in 1..5) { emit(i) }
1796+
}
1797+
1798+
fun main() = runBlocking<Unit> {
1799+
foo().collect { value ->
1800+
if (value == 3) cancel()
1801+
println(value)
1802+
}
1803+
}
1804+
//sampleEnd
1805+
```
1806+
1807+
</div>
1808+
1809+
> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-37.kt).
1810+
1811+
We get only numbers up to 3 and a [CancellationException]:
1812+
1813+
```text
1814+
1
1815+
2
1816+
3
1817+
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
1818+
```
1819+
1820+
<!--- TEST EXCEPTION -->
1821+
1822+
However, most other flow operators do not do additional cancellation checks on their own for performance reasons.
1823+
For example, if you use [IntRange.asFlow] extension to write the same busy loop and don't suspend anywhere,
1824+
then there are no checks for cancellation:
1825+
1826+
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1827+
1828+
```kotlin
1829+
import kotlinx.coroutines.*
1830+
import kotlinx.coroutines.flow.*
1831+
1832+
//sampleStart
1833+
fun main() = runBlocking<Unit> {
1834+
(1..5).asFlow().collect { value ->
1835+
if (value == 3) cancel()
1836+
println(value)
1837+
}
1838+
}
1839+
//sampleEnd
1840+
```
1841+
1842+
</div>
1843+
1844+
> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-38.kt).
1845+
1846+
All numbers from 1 to 5 are collected and cancellation gets detected only before return from `runBlocking`:
1847+
1848+
```text
1849+
1
1850+
2
1851+
3
1852+
4
1853+
5
1854+
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
1855+
```
1856+
1857+
<!--- TEST EXCEPTION -->
1858+
1859+
#### Making busy flow cancellable
1860+
1861+
In the case where you have a busy loop with coroutines you must explicitly check for cancellation.
1862+
You can add `.onEach { currentCoroutineContext().ensureActive() }`, but there is a ready-to-use
1863+
[cancellable] operator provided to do that:
1864+
1865+
<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">
1866+
1867+
```kotlin
1868+
import kotlinx.coroutines.*
1869+
import kotlinx.coroutines.flow.*
1870+
1871+
//sampleStart
1872+
fun main() = runBlocking<Unit> {
1873+
(1..5).asFlow().().collect { value ->
1874+
if (value == 3) cancel()
1875+
println(value)
1876+
}
1877+
}
1878+
//sampleEnd
1879+
```
1880+
1881+
</div>
1882+
1883+
> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-39.kt).
1884+
1885+
With the `cancellable` operator only the numbers from 1 to 3 are collected:
1886+
1887+
```text
1888+
1
1889+
2
1890+
3
1891+
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
1892+
```
1893+
1894+
<!--- TEST EXCEPTION -->
1895+
17801896
### Flow and Reactive Streams
17811897

17821898
For those who are familiar with [Reactive Streams](https://www.reactive-streams.org/) or reactive frameworks such as RxJava and project Reactor,
@@ -1813,6 +1929,8 @@ Integration modules include conversions from and to `Flow`, integration with Rea
18131929
[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
18141930
[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
18151931
[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
1932+
[ensureActive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/ensure-active.html
1933+
[CancellationException]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-cancellation-exception/index.html
18161934
<!--- INDEX kotlinx.coroutines.flow -->
18171935
[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
18181936
[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
@@ -1845,4 +1963,6 @@ Integration modules include conversions from and to `Flow`, integration with Rea
18451963
[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
18461964
[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
18471965
[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
1966+
[IntRange.asFlow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/kotlin.ranges.-int-range/as-flow.html
1967+
[cancellable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/cancellable.html
18481968
<!--- END -->

kotlinx-coroutines-core/common/src/flow/Builders.kt

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
1919
* Creates a flow from the given suspendable [block].
2020
*
2121
* Example of usage:
22+
*
2223
* ```
2324
* fun fibonacci(): Flow<BigInteger> = flow {
2425
* var x = BigInteger.ZERO
@@ -33,10 +34,13 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
3334
*
3435
* fibonacci().take(100).collect { println(it) }
3536
* ```
36-
* Emissions from [flow] builder are [cancellable] by default.
37+
*
38+
* Emissions from [flow] builder are [cancellable] by default &mdash; each call to [emit][FlowCollector.emit]
39+
* also calls [ensureActive][CoroutineContext.ensureActive].
3740
*
3841
* `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
3942
* For example, the following code will result in an [IllegalStateException]:
43+
*
4044
* ```
4145
* flow {
4246
* emit(1) // Ok
@@ -45,6 +49,7 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
4549
* }
4650
* }
4751
* ```
52+
*
4853
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
4954
*/
5055
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

kotlinx-coroutines-core/common/src/flow/operators/Context.kt

+3
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
212212
* Returns a flow which checks cancellation status on each emission and throws
213213
* the corresponding cancellation cause if flow collector was cancelled.
214214
* Note that [flow] builder is [cancellable] by default.
215+
*
216+
* This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
217+
* See [ensureActive][CoroutineContext.ensureActive] for details.
215218
*/
216219
public fun <T> Flow<T>.cancellable(): Flow<T> {
217220
if (this is AbstractFlow<*>) return this // Fast-path, already cancellable

kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt

+32
Original file line numberDiff line numberDiff line change
@@ -381,4 +381,36 @@ class FlowGuideTest {
381381
"Event: 3"
382382
)
383383
}
384+
385+
@Test
386+
fun testExampleFlow37() {
387+
test("ExampleFlow37") { kotlinx.coroutines.guide.exampleFlow37.main() }.verifyExceptions(
388+
"1",
389+
"2",
390+
"3",
391+
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@6d7b4f4c"
392+
)
393+
}
394+
395+
@Test
396+
fun testExampleFlow38() {
397+
test("ExampleFlow38") { kotlinx.coroutines.guide.exampleFlow38.main() }.verifyExceptions(
398+
"1",
399+
"2",
400+
"3",
401+
"4",
402+
"5",
403+
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@3327bd23"
404+
)
405+
}
406+
407+
@Test
408+
fun testExampleFlow39() {
409+
test("ExampleFlow39") { kotlinx.coroutines.guide.exampleFlow39.main() }.verifyExceptions(
410+
"1",
411+
"2",
412+
"3",
413+
"Exception in thread \"main\" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=\"coroutine#1\":BlockingCoroutine{Cancelled}@5ec0a365"
414+
)
415+
}
384416
}

0 commit comments

Comments
 (0)