` type for synchronously computed values:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow { // flow builder
+ for (i in 1..3) {
+ delay(100) // pretend we are doing something useful here
+ emit(i) // emit next value
+ }
+}
+
+fun main() = runBlocking {
+ // Launch a concurrent coroutine to see that the main thread is not blocked
+ launch {
+ for (k in 1..3) {
+ println("I'm not blocked $k")
+ delay(100)
+ }
+ }
+ // Collect the flow
+ foo().collect { value -> println(value) }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt).
+
+This code waits 100ms before printing each number without blocking the main thread. This is verified
+by printing "I'm not blocked" every 100ms from a separate coroutine that is running in the main thread:
+
+```text
+I'm not blocked 1
+1
+I'm not blocked 2
+2
+I'm not blocked 3
+3
+```
+
+
+
+Notice the following differences of the code with the [Flow] from the earlier examples:
+
+* A builder function for [Flow] type is called [flow].
+* Code inside the `flow { ... }` builder block can suspend.
+* The function `foo()` is no longer marked with `suspend` modifier.
+* Values are _emitted_ from the flow using [emit][FlowCollector.emit] function.
+* Values are _collected_ from the flow using [collect][collect] function.
+
+> You can replace [delay] with `Thread.sleep` in the body of `foo`'s `flow { ... }` and see that the main
+thread is blocked in this case.
+
+### Flows are cold
+
+Flows are _cold_ streams similarly to sequences — the code inside a [flow] builder does not
+run until the flow is collected. This becomes clear in the following example:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ println("Flow started")
+ for (i in 1..3) {
+ delay(100)
+ emit(i)
+ }
+}
+
+fun main() = runBlocking {
+ println("Calling foo...")
+ val flow = foo()
+ println("Calling collect...")
+ flow.collect { value -> println(value) }
+ println("Calling collect again...")
+ flow.collect { value -> println(value) }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt).
+
+Which prints:
+
+```text
+Calling foo...
+Calling collect...
+Flow started
+1
+2
+3
+Calling collect again...
+Flow started
+1
+2
+3
+```
+
+
+
+That is a key reason why the `foo()` function (which returns a flow) is not marked with `suspend` modifier.
+By itself, `foo()` returns quickly and does not wait for anything. The flow starts every time it is collected,
+that is why we see that when we call `collect` again, we get "Flow started" printed again.
+
+### Flow cancellation
+
+Flow adheres to general cooperative cancellation of coroutines. However, flow infrastructure does not introduce
+additional cancellation points. It is fully transparent for cancellation. As usual, flow collection can be
+cancelled when the flow is suspended in a cancellable suspending function (like [delay]) and cannot be cancelled otherwise.
+
+The following example shows how the flow gets cancelled on timeout when running in [withTimeoutOrNull] block
+and stops executing its code:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ delay(100)
+ println("Emitting $i")
+ emit(i)
+ }
+}
+
+fun main() = runBlocking {
+ withTimeoutOrNull(250) { // Timeout after 250ms
+ foo().collect { value -> println(value) }
+ }
+ println("Done")
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-06.kt).
+
+Notice how only two numbers get emitted by the flow in `foo()` function, producing the following output:
+
+```text
+Emitting 1
+1
+Emitting 2
+2
+Done
+```
+
+
+
+### Flow builders
+
+The `flow { ... }` builder from the previous examples is the most basic one. There are other builders for
+convenient declaration of flows:
+
+* [flowOf] builder that defines a flow emitting a fixed set of values.
+* Various collections and sequences can be converted to flows using `.asFlow()` extension functions.
+
+Thus, the example that prints numbers from 1 to 3 from a flow can be written as:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+//sampleStart
+ // Convert an integer range to a flow
+ (1..3).asFlow().collect { value -> println(value) }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-07.kt).
+
+
+
+### Intermediate flow operators
+
+Flows can be transformed with operators similarly to collections and sequences.
+Intermediate operators are applied to an upstream flow and return a downstream flow.
+These operators are cold, just like flows are. A call to such an operator is not
+a suspending function itself. It works quickly, returning the definition of a new transformed flow.
+
+The basic operators have familiar names like [map] and [filter].
+The important difference from sequences is that blocks of
+code inside those operators can call suspending functions.
+
+For example, a flow of incoming requests can be
+mapped to results with the [map] operator even when performing a request is a long-running
+operation that is implemented by a suspending function:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+suspend fun performRequest(request: Int): String {
+ delay(1000) // imitate long-running asynchronous work
+ return "response $request"
+}
+
+fun main() = runBlocking {
+ (1..3).asFlow() // a flow of requests
+ .map { request -> performRequest(request) }
+ .collect { response -> println(response) }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-08.kt).
+
+It produces the following three lines, each line appearing after a second:
+
+```text
+response 1
+response 2
+response 3
+```
+
+
+
+#### Transform operator
+
+Among the flow transformation operators, the most general one is called [transform]. It can be used to imitate
+simple transformations like [map] and [filter] as well as implement more complex transformations.
+Using `transform` operator, you can [emit][FlowCollector.emit] arbitrary values an arbitrary number of times.
+
+For example, using `transform` we can emit a string before performing a long-running asynchronous request
+and follow it with a response:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+suspend fun performRequest(request: Int): String {
+ delay(1000) // imitate long-running asynchronous work
+ return "response $request"
+}
+
+fun main() = runBlocking {
+//sampleStart
+ (1..3).asFlow() // a flow of requests
+ .transform { request ->
+ emit("Making request $request")
+ emit(performRequest(request))
+ }
+ .collect { response -> println(response) }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-09.kt).
+
+The output of this code is:
+
+```text
+Making request 1
+response 1
+Making request 2
+response 2
+Making request 3
+response 3
+```
+
+
+
+#### Size-limiting operators
+
+Size-limiting intermediate operators like [take] cancel the execution of the flow when the corresponding limit
+is reached. Cancellation in coroutines is always performed by throwing an exception so that all the resource-management
+functions (like `try { ... } finally { ... }` blocks) operate normally in case of cancellation:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun numbers(): Flow = flow {
+ try {
+ emit(1)
+ emit(2)
+ println("This line will not execute")
+ emit(3)
+ } finally {
+ println("Finally in numbers")
+ }
+}
+
+fun main() = runBlocking {
+ numbers()
+ .take(2) // take only the first two
+ .collect { value -> println(value) }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-10.kt).
+
+The output of this code clearly shows that execution of the `flow { ... }` body in `numbers()` function
+had stopped after emitting the second number:
+
+```text
+1
+2
+Finally in numbers
+```
+
+
+
+### Terminal flow operators
+
+Terminal operators on flows are _suspending functions_ that start a collection of the flow.
+The [collect] operator is the most basic one, but there are other terminal operators for
+convenience:
+
+* Conversion to various collections like [toList] and [toSet].
+* Operators to get the [first] value and to ensure that a flow emits a [single] value.
+* Reducing a flow to a value with [reduce] and [fold].
+
+For example:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+//sampleStart
+ val sum = (1..5).asFlow()
+ .map { it * it } // squares of numbers from 1 to 5
+ .reduce { a, b -> a + b } // sum them (terminal operator)
+ println(sum)
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-11.kt).
+
+Prints a single number:
+
+```text
+55
+```
+
+
+
+### Flows are sequential
+
+Each individual collection of a flow is performed sequentially unless special operators that operate
+on multiple flows are used. The collection works directly in the coroutine that calls a terminal operator.
+No new coroutines are launched by default.
+Each emitted value is processed by all intermediate operators from
+upstream to downstream and is delivered to the terminal operator after that.
+
+See the following example that filters even integers and maps them to strings:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+//sampleStart
+ (1..5).asFlow()
+ .filter {
+ println("Filter $it")
+ it % 2 == 0
+ }
+ .map {
+ println("Map $it")
+ "string $it"
+ }.collect {
+ println("Collect $it")
+ }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-12.kt).
+
+Producing:
+
+```text
+Filter 1
+Filter 2
+Map 2
+Collect string 2
+Filter 3
+Filter 4
+Map 4
+Collect string 4
+Filter 5
+```
+
+
+
+### Flow context
+
+Collection of a flow always happens in the context of the calling coroutine. For example, if there is
+a `foo` flow, then the following code runs in the context specified
+by the author of this code, regardless of implementation details of the `foo` flow:
+
+
+
+```kotlin
+withContext(context) {
+ foo.collect { value ->
+ println(value) // run in the specified context
+ }
+}
+```
+
+
+
+
+
+This property of a flow is called _context preservation_.
+
+So, by default, code in the `flow { ... }` builder runs in the context that is provided by a collector
+of the corresponding flow. For example, consider the implementation of `foo` that prints the thread
+it is called on and emits three numbers:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
+
+//sampleStart
+fun foo(): Flow = flow {
+ log("Started foo flow")
+ for (i in 1..3) {
+ emit(i)
+ }
+}
+
+fun main() = runBlocking {
+ foo().collect { value -> log("Collected $value") }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-13.kt).
+
+Running this code produces:
+
+```text
+[main @coroutine#1] Started foo flow
+[main @coroutine#1] Collected 1
+[main @coroutine#1] Collected 2
+[main @coroutine#1] Collected 3
+```
+
+
+
+Since `foo().collect` is called from the main thread, the body of `foo`'s flow is also called in the main thread.
+This is a perfect default for fast-running or asynchronous code that does not care about the execution context and
+does not block the caller.
+
+#### Wrong emission withContext
+
+However, the long-running CPU-consuming code might need to be executed in the context of [Dispatchers.Default] and UI-updating
+code might need to be executed in the context of [Dispatchers.Main]. Usually, [withContext] is used
+to change the context in code using Kotlin coroutines, but code in the `flow { ... }` builder has to honor context
+preservation property and is not allowed to [emit][FlowCollector.emit] from a different context.
+
+Try running the following code:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ // WRONG way to change context for CPU-consuming code in flow builder
+ kotlinx.coroutines.withContext(Dispatchers.Default) {
+ for (i in 1..3) {
+ Thread.sleep(100) // pretend we are computing it in CPU-consuming way
+ emit(i) // emit next value
+ }
+ }
+}
+
+fun main() = runBlocking {
+ foo().collect { value -> println(value) }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-14.kt).
+
+This code produces the following exception:
+
+
+
+> Note that we had to use a fully qualified name of [kotlinx.coroutines.withContext][withContext] function in this example to
+demonstrate this exception. A short name of `withContext` would have resolved to a special stub function that
+produces compilation error to prevent us from running into this problem.
+
+#### flowOn operator
+
+The exception refers to [flowOn] function that shall be used to change the context of flow emission.
+The correct way of changing the context of a flow is shown in the below example, which also prints
+names of the corresponding threads to show how it all works:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
+
+//sampleStart
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ Thread.sleep(100) // pretend we are computing it in CPU-consuming way
+ log("Emitting $i")
+ emit(i) // emit next value
+ }
+}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
+
+fun main() = runBlocking {
+ foo().collect { value ->
+ log("Collected $value")
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-15.kt).
+
+Notice how `flow { ... }` works in the background thread, while collection happens in the main thread:
+
+
+
+Another observation here is that [flowOn] operator had changed the default sequential nature of the flow.
+Now collection happens in one coroutine ("coroutine#1") and emission happens in another coroutine
+("coroutine#2") that is running in another thread concurrently with collecting coroutine. The [flowOn] operator
+creates another coroutine for an upstream flow when it has to change the [CoroutineDispatcher] in its context.
+
+### Buffering
+
+Running different parts of a flow in different coroutines can be helpful from the standpoint of overall time it takes
+to collect the flow, especially when long-running asynchronous operations are involved. For example, consider a case when
+emission by `foo()` flow is slow, taking 100 ms to produce an element; and collector is also slow,
+taking 300 ms to process an element. Let us see how long does it take to collect such a flow with three numbers:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlin.system.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ delay(100) // pretend we are asynchronously waiting 100 ms
+ emit(i) // emit next value
+ }
+}
+
+fun main() = runBlocking {
+ val time = measureTimeMillis {
+ foo().collect { value ->
+ delay(300) // pretend we are processing it for 300 ms
+ println(value)
+ }
+ }
+ println("Collected in $time ms")
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-16.kt).
+
+It produces something like this, the whole collection taking around 1200 ms (three numbers times 400 ms each):
+
+```text
+1
+2
+3
+Collected in 1220 ms
+```
+
+
+
+We can use [buffer] operator on a flow to run emitting code of `foo()` concurrently with collecting code,
+as opposed to running them sequentially:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlin.system.*
+
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ delay(100) // pretend we are asynchronously waiting 100 ms
+ emit(i) // emit next value
+ }
+}
+
+fun main() = runBlocking {
+//sampleStart
+ val time = measureTimeMillis {
+ foo()
+ .buffer() // buffer emissions, don't wait
+ .collect { value ->
+ delay(300) // pretend we are processing it for 300 ms
+ println(value)
+ }
+ }
+ println("Collected in $time ms")
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-17.kt).
+
+It produces the same numbers faster, as we have effectively created a processing pipeline,
+only having to wait 100 ms for the first number and then spending only 300 ms to process
+each number. This way it takes around 1000 ms to run:
+
+```text
+1
+2
+3
+Collected in 1071 ms
+```
+
+
+
+> Note that [flowOn] operator uses the same buffering mechanism when it has to change [CoroutineDispatcher],
+but here we explicitly request buffering without changing execution context.
+
+#### Conflation
+
+When flow represents partial results of some operation or operation status updates, it may not be necessary
+to process each value, but only to process the most recent ones. In this case, [conflate] operator can be used to skip
+intermediate values when a collector is too slow to process them. Building on the previous example:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlin.system.*
+
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ delay(100) // pretend we are asynchronously waiting 100 ms
+ emit(i) // emit next value
+ }
+}
+
+fun main() = runBlocking {
+//sampleStart
+ val time = measureTimeMillis {
+ foo()
+ .conflate() // conflate emissions, don't process each one
+ .collect { value ->
+ delay(300) // pretend we are processing it for 300 ms
+ println(value)
+ }
+ }
+ println("Collected in $time ms")
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-18.kt).
+
+We see that while the first number was being processed the second and the third ones were already produced, so
+the second one was _conflated_ and only the most recent (the third one) was delivered to the collector:
+
+```text
+1
+3
+Collected in 758 ms
+```
+
+
+
+#### Processing the latest value
+
+Conflation is one way to speed up processing when both emitter and collector are slow. It does that by dropping emitted values.
+The other way is to cancel slow collector and restart it every time a new value is emitted. There is
+a family of `xxxLatest` operators that perform the same essential logic of `xxx` operator, but cancel the
+code in their block on a new value. Let us change the previous example from [conflate] to [collectLatest]:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlin.system.*
+
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ delay(100) // pretend we are asynchronously waiting 100 ms
+ emit(i) // emit next value
+ }
+}
+
+fun main() = runBlocking {
+//sampleStart
+ val time = measureTimeMillis {
+ foo()
+ .collectLatest { value -> // cancel & restart on the latest value
+ println("Collecting $value")
+ delay(300) // pretend we are processing it for 300 ms
+ println("Done $value")
+ }
+ }
+ println("Collected in $time ms")
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-19.kt).
+
+Since the body of [collectLatest] takes 300 ms, but new values are emitted every 100 ms, we see that the block
+is run on every value, but completes only for the last value:
+
+```text
+Collecting 1
+Collecting 2
+Collecting 3
+Done 3
+Collected in 741 ms
+```
+
+
+
+### Composing multiple flows
+
+There are several ways to compose multiple flows.
+
+#### Zip
+
+Similarly to [Sequence.zip] extension function in the Kotlin standard library,
+flows have [zip] operator that combines the corresponding values of two flows:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+//sampleStart
+ val nums = (1..3).asFlow() // numbers 1..3
+ val strs = flowOf("one", "two", "three") // strings
+ nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
+ .collect { println(it) } // collect and print
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-20.kt).
+
+This example prints:
+
+```text
+1 -> one
+2 -> two
+3 -> three
+```
+
+
+
+#### Combine
+
+When flow represents the most recent value of some variable or operation (see also a related
+section on [conflation](#conflation)) it might be needed to perform a computation that depends on
+the most recent values of the corresponding flows and to recompute it whenever any of upstream
+flows emit a value. The corresponding family of operators is called [combine].
+
+For example, if the numbers in the previous example update every 300ms, but strings update every 400 ms,
+then zipping them using [zip] operator would still produce the same result,
+albeit results are going to be printed every 400 ms:
+
+> We use [onEach] intermediate operator in this example to delay each element and thus make the code
+that emits sample flows more declarative and shorter.
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+//sampleStart
+ val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
+ val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
+ val startTime = System.currentTimeMillis() // remember the start time
+ nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
+ .collect { value -> // collect and print
+ println("$value at ${System.currentTimeMillis() - startTime} ms from start")
+ }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-21.kt).
+
+
+
+However, using [combine] operator here instead of [zip]:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun main() = runBlocking {
+//sampleStart
+ val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
+ val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
+ val startTime = currentTimeMillis() // remember the start time
+ nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
+ .collect { value -> // collect and print
+ println("$value at ${System.currentTimeMillis() - startTime} ms from start")
+ }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-22.kt).
+
+We get quite a different output, where a line is printed at each emission from either `nums` or `strs` flows:
+
+```text
+1 -> one at 452 ms from start
+2 -> one at 651 ms from start
+2 -> two at 854 ms from start
+3 -> two at 952 ms from start
+3 -> three at 1256 ms from start
+```
+
+
+
+### Flattening flows
+
+Flows represent asynchronously received sequences of values, so it is quite easy to get in a situation where
+each value triggers a request for another sequence of values. For example, we can have the following
+function that returns a flow of two strings 500 ms apart:
+
+
+
+```kotlin
+fun requestFlow(i: Int): Flow = flow {
+ emit("$i: First")
+ delay(500) // wait 500 ms
+ emit("$i: Second")
+}
+```
+
+
+
+
+
+Now if we have a flow of three integers and call `requestFlow` for each of them like this:
+
+
+
+```kotlin
+(1..3).asFlow().map { requestFlow(it) }
+```
+
+
+
+
+
+Then we end up with a flow of flows (`Flow>`) that needs to be _flattened_ into a single flow for
+further processing. Collections and sequences have [flatten][Sequence.flatten] and [flatMap][Sequence.flatMap]
+operators for this purpose. However, the asynchronous nature of flows calls for different _modes_ of flattening
+thus there is a family of flattening operators on flows.
+
+#### flatMapConcat
+
+Concatenating mode is implemented by [flatMapConcat] and [flattenConcat] operators. They are the most direct
+analogues of the corresponding sequence operators. They wait for inner flow to complete before
+starting to collect the next one as the following example shows:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun requestFlow(i: Int): Flow = flow {
+ emit("$i: First")
+ delay(500) // wait 500 ms
+ emit("$i: Second")
+}
+
+fun main() = runBlocking {
+//sampleStart
+ val startTime = currentTimeMillis() // remember the start time
+ (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
+ .flatMapConcat { requestFlow(it) }
+ .collect { value -> // collect and print
+ println("$value at ${System.currentTimeMillis() - startTime} ms from start")
+ }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-23.kt).
+
+The sequential nature of [flatMapConcat] is clearly seen in the output:
+
+```text
+1: First at 121 ms from start
+1: Second at 622 ms from start
+2: First at 727 ms from start
+2: Second at 1227 ms from start
+3: First at 1328 ms from start
+3: Second at 1829 ms from start
+```
+
+
+
+#### flatMapMerge
+
+Another flattening mode is to concurrently collect all the incoming flows and merge their values into
+a single flow so that values are emitted as soon as possible.
+It is implemented by [flatMapMerge] and [flattenMerge] operators. They both accept an optional
+`concurrency` parameter that limits the number of concurrent flows that are collected at the same time
+(it is equal to [DEFAULT_CONCURRENCY] by default).
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun requestFlow(i: Int): Flow = flow {
+ emit("$i: First")
+ delay(500) // wait 500 ms
+ emit("$i: Second")
+}
+
+fun main() = runBlocking {
+//sampleStart
+ val startTime = currentTimeMillis() // remember the start time
+ (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
+ .flatMapMerge { requestFlow(it) }
+ .collect { value -> // collect and print
+ println("$value at ${System.currentTimeMillis() - startTime} ms from start")
+ }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-24.kt).
+
+The concurrent nature of [flatMapMerge] is obvious:
+
+```text
+1: First at 136 ms from start
+2: First at 231 ms from start
+3: First at 333 ms from start
+1: Second at 639 ms from start
+2: Second at 732 ms from start
+3: Second at 833 ms from start
+```
+
+
+
+> Note that [flatMapMerge] call its block of code (`{ requestFlow(it) }` in this example) sequentially, but
+collects the resulting flows concurrently, so it is equivalent to performing a sequential
+`map { requestFlow(it) }` first and then calling [flattenMerge] on the result.
+
+#### flatMapLatest
+
+In a similar way to [collectLatest] operator that was shown in
+["Processing the latest value"](#processing-the-latest-value) section, there is the corresponding "Latest"
+flattening mode where collection of the previous flow is cancelled as soon as new flow is emitted.
+It is implemented by [flatMapLatest] operator.
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun requestFlow(i: Int): Flow = flow {
+ emit("$i: First")
+ delay(500) // wait 500 ms
+ emit("$i: Second")
+}
+
+fun main() = runBlocking {
+//sampleStart
+ val startTime = currentTimeMillis() // remember the start time
+ (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
+ .flatMapLatest { requestFlow(it) }
+ .collect { value -> // collect and print
+ println("$value at ${System.currentTimeMillis() - startTime} ms from start")
+ }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-25.kt).
+
+The output of this example speaks for the way [flatMapLatest] works:
+
+```text
+1: First at 142 ms from start
+2: First at 322 ms from start
+3: First at 425 ms from start
+3: Second at 931 ms from start
+```
+
+
+
+> Note that [flatMapLatest] cancels all the code in its block (`{ requestFlow(it) }` in this example) on a new value.
+It makes no difference in this particular example, because the call to `requestFlow` itself is fast, not-suspending,
+and cannot be cancelled. However, it would show up if we were to use suspending functions like `delay` in there.
+
+### Flow exceptions
+
+Flow collection can complete with an exception when emitter or any code inside any of the operators throw an exception.
+There are several ways to handle these exceptions.
+
+#### Collector try and catch
+
+A collector can use Kotlin's [`try/catch`][exceptions] block to handle exceptions:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ println("Emitting $i")
+ emit(i) // emit next value
+ }
+}
+
+fun main() = runBlocking {
+ try {
+ foo().collect { value ->
+ println(value)
+ check(value <= 1) { "Collected $value" }
+ }
+ } catch (e: Throwable) {
+ println("Caught $e")
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-26.kt).
+
+This code successfully catches an exception in [collect] terminal operator and,
+as you can see, no more values are emitted after that:
+
+```text
+Emitting 1
+1
+Emitting 2
+2
+Caught java.lang.IllegalStateException: Collected 2
+```
+
+
+
+#### Everything is caught
+
+The previous example actually catches any exception happening in emitter or in any intermediate or terminal operators.
+For example, let us change the code so that emitted values are [mapped][map] to strings,
+but the corresponding code produces an exception:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow =
+ flow {
+ for (i in 1..3) {
+ println("Emitting $i")
+ emit(i) // emit next value
+ }
+ }
+ .map { value ->
+ check(value <= 1) { "Crashed on $value" }
+ "string $value"
+ }
+
+fun main() = runBlocking {
+ try {
+ foo().collect { value -> println(value) }
+ } catch (e: Throwable) {
+ println("Caught $e")
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-27.kt).
+
+This exception is still caught and collection is stopped:
+
+```text
+Emitting 1
+string 1
+Emitting 2
+Caught java.lang.IllegalStateException: Crashed on 2
+```
+
+
+
+### Exception transparency
+
+But how can code of emitter encapsulate its exception handling behavior?
+
+Flows must be _transparent to exceptions_ and it is a violation of exception transparency to [emit][FlowCollector.emit] values in the
+`flow { ... }` builder from inside of `try/catch` block. This guarantees that a collector throwing an exception
+can always catch it using `try/catch` as in the previous example.
+
+The emitter can use [catch] operator that preserves this exception transparency and allows encapsulation
+of its exception handling. The body of the `catch` operator can analyze an exception
+and react to it in different ways depending on which exception was caught:
+
+* Exceptions can be rethrown using `throw`.
+* Exceptions can be turned into emission of values using [emit][FlowCollector.emit] from the body of [catch].
+* Exceptions can be ignored, logged, or processed by some other code.
+
+For example, let us emit a text on catching an exception:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun foo(): Flow =
+ flow {
+ for (i in 1..3) {
+ println("Emitting $i")
+ emit(i) // emit next value
+ }
+ }
+ .map { value ->
+ check(value <= 1) { "Crashed on $value" }
+ "string $value"
+ }
+
+fun main() = runBlocking {
+//sampleStart
+ foo()
+ .catch { e -> emit("Caught $e") } // emit on exception
+ .collect { value -> println(value) }
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-28.kt).
+
+The output of the example is the same, even though we do not have `try/catch` around the code anymore.
+
+
+
+#### Transparent catch
+
+The [catch] intermediate operator, honoring exception transparency, catches only upstream exceptions
+(that is an exception from all the operators above `catch`, but not below it).
+If the block in `collect { ... }` (placed below `catch`) throws an exception then it escapes:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ println("Emitting $i")
+ emit(i)
+ }
+}
+
+fun main() = runBlocking {
+ foo()
+ .catch { e -> println("Caught $e") } // does not catch downstream exceptions
+ .collect { value ->
+ check(value <= 1) { "Collected $value" }
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-29.kt).
+
+The "Caught ..." message is not printed despite the `catch` operator:
+
+
+
+#### Catching declaratively
+
+We can combine a declarative nature of [catch] operator with a desire to handle all exceptions by moving the body
+of [collect] operator into [onEach] and putting it before the `catch` operator. Collection of this flow must
+be triggered by a call to `collect()` without parameters:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun foo(): Flow = flow {
+ for (i in 1..3) {
+ println("Emitting $i")
+ emit(i)
+ }
+}
+
+fun main() = runBlocking {
+//sampleStart
+ foo()
+ .onEach { value ->
+ check(value <= 1) { "Collected $value" }
+ println(value)
+ }
+ .catch { e -> println("Caught $e") }
+ .collect()
+//sampleEnd
+}
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-30.kt).
+
+Now we can see that "Caught ..." message is printed and thus we can catch all exceptions without explicitly
+using a `try/catch` block:
+
+
+
+### Flow completion
+
+When flow collection completes (normally or exceptionally) it may be needed to execute some action.
+As you might have already noticed, it also can be done in two ways: imperative and declarative.
+
+#### Imperative finally block
+
+In addition to `try`/`catch`, a collector can also use `finally` block to execute an action
+upon `collect` completion.
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = (1..3).asFlow()
+
+fun main() = runBlocking {
+ try {
+ foo().collect { value -> println(value) }
+ } finally {
+ println("Done")
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-31.kt).
+
+This code prints three numbers produced by the `foo()` flow followed by "Done" string:
+
+```text
+1
+2
+3
+Done
+```
+
+
+
+#### Declarative handling
+
+For declarative approach, flow has [onCompletion] intermediate operator that is invoked
+when the flow is completely collected.
+
+The previous example can be rewritten using [onCompletion] operator and produces the same output:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun foo(): Flow = (1..3).asFlow()
+
+fun main() = runBlocking {
+//sampleStart
+ foo()
+ .onCompletion { println("Done") }
+ .collect { value -> println(value) }
+//sampleEnd
+}
+```
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-32.kt).
+
+
+
+The key advantage of [onCompletion] is a nullable `Throwable` parameter of the lambda that can be used
+to determine whether flow collection was completed normally or exceptionally. In the following
+example `foo()` flow throws exception after emitting number 1:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = flow {
+ emit(1)
+ throw RuntimeException()
+}
+
+fun main() = runBlocking {
+ foo()
+ .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
+ .catch { cause -> println("Caught exception") }
+ .collect { value -> println(value) }
+}
+//sampleEnd
+```
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-33.kt).
+
+As you may expect, it prints:
+
+```text
+1
+Flow completed exceptionally
+Caught exception
+```
+
+
+
+[onCompletion] operator, unlike [catch], does not handle the exception. As we can see from the above
+example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
+and can be handled with `catch` operator.
+
+#### Upstream exceptions only
+
+Just like [catch] operator, [onCompletion] sees only exception coming from upstream and does not
+see downstream exceptions. For example, run the following code:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+fun foo(): Flow = (1..3).asFlow()
+
+fun main() = runBlocking {
+ foo()
+ .onCompletion { cause -> println("Flow completed with $cause") }
+ .collect { value ->
+ check(value <= 1) { "Collected $value" }
+ println(value)
+ }
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
+
+And you can see the completion cause is null, yet collection failed with exception:
+
+```text
+1
+Flow completed with null
+Exception in thread "main" java.lang.IllegalStateException: Collected 2
+```
+
+
+
+### Imperative versus declarative
+
+Now we know how to collect flow, handle its completion and exceptions in both imperative and declarative ways.
+The natural question here is which approach should be preferred and why.
+As a library, we do not advocate for any particular approach and believe that both options
+are valid and should be selected according to your own preferences and code style.
+
+### Launching flow
+
+It is convenient to use flows to represent asynchronous events that are coming from some source.
+In this case, we need an analogue of `addEventListener` function that registers a piece of code with a reaction
+on incoming events and continues further work. The [onEach] operator can serve this role.
+However, `onEach` is an intermediate operator. We also need a terminal operator to collect the flow.
+Otherwise, just calling `onEach` has no effect.
+
+If we use [collect] terminal operator after `onEach`, then code after it waits until the flow is collected:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+//sampleStart
+// Imitate a flow of events
+fun events(): Flow = (1..3).asFlow().onEach { delay(100) }
+
+fun main() = runBlocking {
+ events()
+ .onEach { event -> println("Event: $event") }
+ .collect() // <--- Collecting the flow waits
+ println("Done")
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-35.kt).
+
+As you can see, it prints:
+
+```text
+Event: 1
+Event: 2
+Event: 3
+Done
+```
+
+
+
+Here [launchIn] terminal operator comes in handy. Replacing `collect` with `launchIn` we can
+launch collection of the flow in a separate coroutine, so that execution of further code
+immediately continues:
+
+
+
+```kotlin
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+// Imitate a flow of events
+fun events(): Flow = (1..3).asFlow().onEach { delay(100) }
+
+//sampleStart
+fun main() = runBlocking {
+ events()
+ .onEach { event -> println("Event: $event") }
+ .launchIn(this) // <--- Launching the flow in a separate coroutine
+ println("Done")
+}
+//sampleEnd
+```
+
+
+
+> You can get full code [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-36.kt).
+
+It prints:
+
+```text
+Done
+Event: 1
+Event: 2
+Event: 3
+```
+
+
+
+The required parameter to `launchIn` must specify a [CoroutineScope] in which the coroutine to collect the flow is
+launched. In the above example this scope comes from [runBlocking]
+coroutine builder, so while the flow is running this [runBlocking] scope waits for completion of its child coroutine
+and keeps the main function from returning and terminating this example.
+
+In real applications a scope is going to come from some entity with a limited
+lifetime. As soon as the lifetime of this entity is terminated the corresponding scope is cancelled, cancelling
+collection of the corresponding flow. This way the pair of `onEach { ... }.launchIn(scope)` works
+like `addEventListener`. However, there is no need for the corresponding `removeEventListener` function,
+as cancellation and structured concurrency serve this purpose.
+
+Note, that [launchIn] also returns a [Job] which can be used to [cancel][Job.cancel] the corresponding flow collection
+coroutine only without cancelling the whole scope or to [join][Job.join] it.
+
+
+
+[collections]: https://kotlinlang.org/docs/reference/collections-overview.html
+[List]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/-list/index.html
+[forEach]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.collections/for-each.html
+[Sequence]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/index.html
+[Sequence.zip]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/zip.html
+[Sequence.flatten]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flatten.html
+[Sequence.flatMap]: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/flat-map.html
+[exceptions]: https://kotlinlang.org/docs/reference/exceptions.html
+
+
+
+[delay]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
+[withTimeoutOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-timeout-or-null.html
+[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
+[Dispatchers.Main]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-main.html
+[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
+[CoroutineDispatcher]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/index.html
+[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html
+[runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html
+[Job]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/index.html
+[Job.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/cancel.html
+[Job.join]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-job/join.html
+
+[Flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html
+[flow]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow.html
+[FlowCollector.emit]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow-collector/emit.html
+[collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect.html
+[flowOf]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-of.html
+[map]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/map.html
+[filter]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/filter.html
+[transform]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/transform.html
+[take]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/take.html
+[toList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-list.html
+[toSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/to-set.html
+[first]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/first.html
+[single]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/single.html
+[reduce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/reduce.html
+[fold]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/fold.html
+[flowOn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flow-on.html
+[buffer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/buffer.html
+[conflate]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/conflate.html
+[collectLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/collect-latest.html
+[zip]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/zip.html
+[combine]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/combine.html
+[onEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-each.html
+[flatMapConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-concat.html
+[flattenConcat]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-concat.html
+[flatMapMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-merge.html
+[flattenMerge]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flatten-merge.html
+[DEFAULT_CONCURRENCY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-d-e-f-a-u-l-t_-c-o-n-c-u-r-r-e-n-c-y.html
+[flatMapLatest]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/flat-map-latest.html
+[catch]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html
+[onCompletion]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/on-completion.html
+[launchIn]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/launch-in.html
+
diff --git a/docs/select-expression.md b/docs/select-expression.md
index 35480abf36..f36fa09b6b 100644
--- a/docs/select-expression.md
+++ b/docs/select-expression.md
@@ -21,7 +21,7 @@ class SelectGuideTest {
-* [Select expression (experimental)](#select-expression-experimental)
+* [Select Expression (experimental)](#select-expression-experimental)
* [Selecting from channels](#selecting-from-channels)
* [Selecting on close](#selecting-on-close)
* [Selecting to send](#selecting-to-send)
@@ -32,7 +32,7 @@ class SelectGuideTest {
-## Select expression (experimental)
+## Select Expression (experimental)
Select expression makes it possible to await multiple suspending functions simultaneously and _select_
the first one that becomes available.
diff --git a/gradle.properties b/gradle.properties
index 1fd92b1e16..6b3060e680 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,11 +1,11 @@
# Kotlin
-version=1.3.0-RC2-SNAPSHOT
+version=1.3.0-SNAPSHOT
group=org.jetbrains.kotlinx
-kotlin_version=1.3.41
+kotlin_version=1.3.50
# Dependencies
junit_version=4.12
-atomicfu_version=0.12.9
+atomicfu_version=0.12.10
html_version=0.6.8
lincheck_version=2.0
dokka_version=0.9.16-rdev-2-mpp-hacks
diff --git a/knit/src/Knit.kt b/knit/src/Knit.kt
index abb66dfac4..d3e0a35894 100644
--- a/knit/src/Knit.kt
+++ b/knit/src/Knit.kt
@@ -28,6 +28,9 @@ const val INCLUDE_DIRECTIVE = "INCLUDE"
const val CLEAR_DIRECTIVE = "CLEAR"
const val TEST_DIRECTIVE = "TEST"
+const val KNIT_AUTONUMBER_PLACEHOLDER = '#'
+const val KNIT_AUTONUMBER_REGEX = "([0-9a-z]+)"
+
const val TEST_OUT_DIRECTIVE = "TEST_OUT"
const val MODULE_DIRECTIVE = "MODULE"
@@ -36,6 +39,9 @@ const val INDEX_DIRECTIVE = "INDEX"
const val CODE_START = "```kotlin"
const val CODE_END = "```"
+const val SAMPLE_START = "//sampleStart"
+const val SAMPLE_END = "//sampleEnd"
+
const val TEST_START = "```text"
const val TEST_END = "```"
@@ -73,6 +79,9 @@ fun knit(markdownFile: File): Boolean {
println("*** Reading $markdownFile")
val tocLines = arrayListOf()
var knitRegex: Regex? = null
+ var knitAutonumberGroup = 0
+ var knitAutonumberDigits = 0
+ var knitAutonumberIndex = 1
val includes = arrayListOf()
val codeLines = arrayListOf()
val testLines = arrayListOf()
@@ -122,7 +131,18 @@ fun knit(markdownFile: File): Boolean {
requireSingleLine(directive)
require(!directive.param.isEmpty()) { "$KNIT_DIRECTIVE directive must include regex parameter" }
require(knitRegex == null) { "Only one KNIT directive is supported"}
- knitRegex = Regex("\\((" + directive.param + ")\\)")
+ var str = directive.param
+ val i = str.indexOf(KNIT_AUTONUMBER_PLACEHOLDER)
+ if (i >= 0) {
+ val j = str.lastIndexOf(KNIT_AUTONUMBER_PLACEHOLDER)
+ knitAutonumberDigits = j - i + 1
+ require(str.substring(i, j + 1) == KNIT_AUTONUMBER_PLACEHOLDER.toString().repeat(knitAutonumberDigits)) {
+ "$KNIT_DIRECTIVE can only use a contiguous range of '$KNIT_AUTONUMBER_PLACEHOLDER' for auto-numbering"
+ }
+ knitAutonumberGroup = str.substring(0, i).count { it == '(' } + 2 // note: it does not understand escaped open braces
+ str = str.substring(0, i) + KNIT_AUTONUMBER_REGEX + str.substring(j + 1)
+ }
+ knitRegex = Regex("\\((" + str + ")\\)")
continue@mainLoop
}
INCLUDE_DIRECTIVE -> {
@@ -183,7 +203,9 @@ fun knit(markdownFile: File): Boolean {
if (inLine.startsWith(CODE_START)) {
require(testOut == null || testLines.isEmpty()) { "Previous test was not emitted with $TEST_DIRECTIVE" }
codeLines += ""
- readUntilTo(CODE_END, codeLines)
+ readUntilTo(CODE_END, codeLines) { line ->
+ !line.startsWith(SAMPLE_START) && !line.startsWith(SAMPLE_END)
+ }
continue@mainLoop
}
if (inLine.startsWith(TEST_START)) {
@@ -212,8 +234,19 @@ fun knit(markdownFile: File): Boolean {
remainingApiRefNames += apiRef.name
}
}
- knitRegex?.find(inLine)?.let { knitMatch ->
+ knitRegex?.find(inLine)?.let knitRegexMatch@{ knitMatch ->
val fileName = knitMatch.groups[1]!!.value
+ if (knitAutonumberDigits != 0) {
+ val numGroup = knitMatch.groups[knitAutonumberGroup]!!
+ val num = knitAutonumberIndex.toString().padStart(knitAutonumberDigits, '0')
+ if (numGroup.value != num) { // update and retry with this line if a different number
+ val r = numGroup.range
+ val newLine = inLine.substring(0, r.first) + num + inLine.substring(r.last + 1)
+ updateLineAndRetry(newLine)
+ return@knitRegexMatch
+ }
+ }
+ knitAutonumberIndex++
val file = File(markdownFile.parentFile, fileName)
require(files.add(file)) { "Duplicate file: $file"}
println("Knitting $file ...")
@@ -328,11 +361,11 @@ private fun flushTestOut(parentDir: File?, testOut: String?, testOutLines: Mutab
private fun MarkdownTextReader.readUntil(marker: String): List =
arrayListOf().also { readUntilTo(marker, it) }
-private fun MarkdownTextReader.readUntilTo(marker: String, list: MutableList) {
+private fun MarkdownTextReader.readUntilTo(marker: String, list: MutableList, linePredicate: (String) -> Boolean = { true }) {
while (true) {
val line = readLine() ?: break
if (line.startsWith(marker)) break
- list += line
+ if (linePredicate(line)) list += line
}
}
@@ -404,6 +437,12 @@ class MarkdownTextReader(r: Reader) : LineNumberReader(r) {
return line
}
+ fun updateLineAndRetry(line: String) {
+ outText.removeAt(outText.lastIndex)
+ outText += line
+ putBackLine = line
+ }
+
fun replaceUntilNextDirective(lines: List): Boolean {
skip = true
while (true) {
diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
index bed497909d..7b8f96b6e4 100644
--- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
@@ -96,10 +96,10 @@ internal abstract class AbstractSendChannel : SendChannel {
* @suppress **This is unstable API and it is subject to change.**
*/
protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
- queue.addLastIfPrev(SendBuffered(element), { prev ->
+ queue.addLastIfPrev(SendBuffered(element)) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
true
- })
+ }
return null
}
@@ -112,9 +112,10 @@ internal abstract class AbstractSendChannel : SendChannel {
queue: LockFreeLinkedListHead,
element: E
) : AddLastDesc>(queue, SendBuffered(element)) {
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected is ReceiveOrClosed<*>) return OFFER_FAILED
- return null
+ override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
+ is Closed<*> -> affected
+ is ReceiveOrClosed<*> -> OFFER_FAILED
+ else -> null
}
}
@@ -168,18 +169,23 @@ internal abstract class AbstractSendChannel : SendChannel {
}
private suspend fun sendSuspend(element: E): Unit = suspendAtomicCancellableCoroutine sc@ { cont ->
- val send = SendElement(element, cont)
loop@ while (true) {
- val enqueueResult = enqueueSend(send)
- when (enqueueResult) {
- null -> { // enqueued successfully
- cont.removeOnCancellation(send)
- return@sc
- }
- is Closed<*> -> {
- helpClose(enqueueResult)
- cont.resumeWithException(enqueueResult.sendException)
- return@sc
+ if (full) {
+ val send = SendElement(element, cont)
+ val enqueueResult = enqueueSend(send)
+ when {
+ enqueueResult == null -> { // enqueued successfully
+ cont.removeOnCancellation(send)
+ return@sc
+ }
+ enqueueResult is Closed<*> -> {
+ helpClose(enqueueResult)
+ cont.resumeWithException(enqueueResult.sendException)
+ return@sc
+ }
+ enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
+ enqueueResult is Receive<*> -> {} // try to offer instead
+ else -> error("enqueueSend returned $enqueueResult")
}
}
// hm... receiver is waiting or buffer is not full. try to offer
@@ -206,12 +212,12 @@ internal abstract class AbstractSendChannel : SendChannel {
* * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
* * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
*/
- private fun enqueueSend(send: SendElement): Any? {
+ private fun enqueueSend(send: Send): Any? {
if (isBufferAlwaysFull) {
- queue.addLastIfPrev(send, { prev ->
+ queue.addLastIfPrev(send) { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
true
- })
+ }
} else {
if (!queue.addLastIfPrevAndIf(send, { prev ->
if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
@@ -333,10 +339,10 @@ internal abstract class AbstractSendChannel : SendChannel {
) : RemoveFirstDesc>(queue) {
@JvmField var resumeToken: Any? = null
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected !is ReceiveOrClosed<*>) return OFFER_FAILED
- if (affected is Closed<*>) return affected
- return null
+ override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
+ is Closed<*> -> affected
+ !is ReceiveOrClosed<*> -> OFFER_FAILED
+ else -> null
}
override fun validatePrepared(node: ReceiveOrClosed): Boolean {
@@ -346,30 +352,6 @@ internal abstract class AbstractSendChannel : SendChannel {
}
}
- private inner class TryEnqueueSendDesc(
- element: E,
- select: SelectInstance,
- block: suspend (SendChannel) -> R
- ) : AddLastDesc>(queue, SendSelect(element, this@AbstractSendChannel, select, block)) {
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected is ReceiveOrClosed<*>) {
- return affected as? Closed<*> ?: ENQUEUE_FAILED
- }
- return null
- }
-
- override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
- if (!isBufferFull) return ENQUEUE_FAILED
- return super.onPrepare(affected, next)
- }
-
- override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
- super.finishOnSuccess(affected, next)
- // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
- node.disposeOnSelect()
- }
- }
-
final override val onSend: SelectClause2>
get() = object : SelectClause2> {
override fun registerSelectClause2(select: SelectInstance, param: E, block: suspend (SendChannel) -> R) {
@@ -381,26 +363,36 @@ internal abstract class AbstractSendChannel : SendChannel {
while (true) {
if (select.isSelected) return
if (full) {
- val enqueueOp = TryEnqueueSendDesc(element, select, block)
- val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
+ val node = SendSelect(element, this, select, block)
+ val enqueueResult = enqueueSend(node)
when {
- enqueueResult === ALREADY_SELECTED -> return
- enqueueResult === ENQUEUE_FAILED -> {} // retry
- enqueueResult is Closed<*> -> throw recoverStackTrace(enqueueResult.sendException)
- else -> error("performAtomicIfNotSelected(TryEnqueueSendDesc) returned $enqueueResult")
- }
- } else {
- val offerResult = offerSelectInternal(element, select)
- when {
- offerResult === ALREADY_SELECTED -> return
- offerResult === OFFER_FAILED -> {} // retry
- offerResult === OFFER_SUCCESS -> {
- block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
+ enqueueResult == null -> { // enqueued successfully
+ select.disposeOnSelect(node)
return
}
- offerResult is Closed<*> -> throw recoverStackTrace(offerResult.sendException)
- else -> error("offerSelectInternal returned $offerResult")
+ enqueueResult is Closed<*> -> {
+ helpClose(enqueueResult)
+ throw recoverStackTrace(enqueueResult.sendException)
+ }
+ enqueueResult === ENQUEUE_FAILED -> {} // try to offer
+ enqueueResult is Receive<*> -> {} // try to offer
+ else -> error("enqueueSend returned $enqueueResult ")
+ }
+ }
+ // hm... receiver is waiting or buffer is not full. try to offer
+ val offerResult = offerSelectInternal(element, select)
+ when {
+ offerResult === ALREADY_SELECTED -> return
+ offerResult === OFFER_FAILED -> {} // retry
+ offerResult === OFFER_SUCCESS -> {
+ block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
+ return
+ }
+ offerResult is Closed<*> -> {
+ helpClose(offerResult)
+ throw recoverStackTrace(offerResult.sendException)
}
+ else -> error("offerSelectInternal returned $offerResult")
}
}
}
@@ -443,7 +435,7 @@ internal abstract class AbstractSendChannel : SendChannel {
@JvmField val channel: SendChannel,
@JvmField val select: SelectInstance,
@JvmField val block: suspend (SendChannel) -> R
- ) : LockFreeLinkedListNode(), Send, DisposableHandle {
+ ) : Send(), DisposableHandle {
override fun tryResumeSend(idempotent: Any?): Any? =
if (select.trySelect(idempotent)) SELECT_STARTED else null
@@ -452,11 +444,7 @@ internal abstract class AbstractSendChannel : SendChannel {
block.startCoroutine(receiver = channel, completion = select.completion)
}
- fun disposeOnSelect() {
- select.disposeOnSelect(this)
- }
-
- override fun dispose() {
+ override fun dispose() { // invoked on select completion
remove()
}
@@ -470,7 +458,7 @@ internal abstract class AbstractSendChannel : SendChannel {
internal class SendBuffered(
@JvmField val element: E
- ) : LockFreeLinkedListNode(), Send {
+ ) : Send() {
override val pollResult: Any? get() = element
override fun tryResumeSend(idempotent: Any?): Any? = SEND_RESUMED
override fun completeResumeSend(token: Any) { assert { token === SEND_RESUMED } }
@@ -556,8 +544,8 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel receiveSuspend(onClose: Int): R = suspendAtomicCancellableCoroutine sc@ { cont ->
- val receive = ReceiveElement(cont as CancellableContinuation, onClose)
+ private suspend fun receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutine sc@ { cont ->
+ val receive = ReceiveElement(cont as CancellableContinuation, receiveMode)
while (true) {
if (enqueueReceive(receive)) {
removeReceiveOnCancel(cont, receive)
@@ -578,7 +566,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel): Boolean {
val result = if (isBufferAlwaysEmpty)
- queue.addLastIfPrev(receive, { it !is Send }) else
+ queue.addLastIfPrev(receive) { it !is Send } else
queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
if (result) onReceiveEnqueued()
return result
@@ -659,10 +647,10 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) return affected
- if (affected !is Send) return POLL_FAILED
- return null
+ override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
+ is Closed<*> -> affected
+ !is Send -> POLL_FAILED
+ else -> null
}
@Suppress("UNCHECKED_CAST")
@@ -674,30 +662,6 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel(
- select: SelectInstance,
- block: suspend (Any?) -> R,
- receiveMode: Int
- ) : AddLastDesc>(queue, ReceiveSelect(select, block, receiveMode)) {
- override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
- if (affected is Send) return ENQUEUE_FAILED
- return null
- }
-
- override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
- if (!isBufferEmpty) return ENQUEUE_FAILED
- return super.onPrepare(affected, next)
- }
-
- override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
- super.finishOnSuccess(affected, next)
- // notify the there is one more receiver
- onReceiveEnqueued()
- // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
- node.removeOnSelectCompletion()
- }
- }
-
final override val onReceive: SelectClause1
get() = object : SelectClause1 {
override fun registerSelectClause1(select: SelectInstance, block: suspend (E) -> R) {
@@ -710,7 +674,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, RECEIVE_THROWS_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -738,7 +702,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, RECEIVE_NULL_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -775,7 +739,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R, RECEIVE_RESULT)) return
} else {
val pollResult = pollSelectInternal(select)
when {
@@ -794,18 +758,15 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel registerEnqueueDesc(
- select: SelectInstance, block: suspend (E) -> R,
+ private fun enqueueReceiveSelect(
+ select: SelectInstance,
+ block: suspend (Any?) -> R,
receiveMode: Int
): Boolean {
- @Suppress("UNCHECKED_CAST")
- val enqueueOp = TryEnqueueReceiveDesc(select, block as suspend (Any?) -> R, receiveMode)
- val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true
- return when {
- enqueueResult === ALREADY_SELECTED -> true
- enqueueResult === ENQUEUE_FAILED -> false // retry
- else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
- }
+ val node = ReceiveSelect(this, select, block, receiveMode)
+ val result = enqueueReceive(node)
+ if (result) select.disposeOnSelect(node)
+ return result
}
// ------ protected ------
@@ -917,7 +878,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel cont.resumeWithException(closed.receiveException)
}
}
- override fun toString(): String = "ReceiveElement[$cont,receiveMode=$receiveMode]"
+ override fun toString(): String = "ReceiveElement[receiveMode=$receiveMode]"
}
private class ReceiveHasNext(
@@ -957,10 +918,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel(
+ private class ReceiveSelect(
+ @JvmField val channel: AbstractChannel,
@JvmField val select: SelectInstance,
@JvmField val block: suspend (Any?) -> R,
@JvmField val receiveMode: Int
@@ -987,13 +949,9 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel Unit
/**
* Represents sending waiter in the queue.
*/
-internal interface Send {
- val pollResult: Any? // E | Closed
- fun tryResumeSend(idempotent: Any?): Any?
- fun completeResumeSend(token: Any)
- fun resumeSendClosed(closed: Closed<*>)
+internal abstract class Send : LockFreeLinkedListNode() {
+ abstract val pollResult: Any? // E | Closed
+ abstract fun tryResumeSend(idempotent: Any?): Any?
+ abstract fun completeResumeSend(token: Any)
+ abstract fun resumeSendClosed(closed: Closed<*>)
}
/**
@@ -1074,11 +1032,11 @@ internal interface ReceiveOrClosed {
internal class SendElement(
override val pollResult: Any?,
@JvmField val cont: CancellableContinuation
-) : LockFreeLinkedListNode(), Send {
+) : Send() {
override fun tryResumeSend(idempotent: Any?): Any? = cont.tryResume(Unit, idempotent)
override fun completeResumeSend(token: Any) = cont.completeResume(token)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
- override fun toString(): String = "SendElement($pollResult)[$cont]"
+ override fun toString(): String = "SendElement($pollResult)"
}
/**
@@ -1086,7 +1044,7 @@ internal class SendElement(
*/
internal class Closed(
@JvmField val closeCause: Throwable?
-) : LockFreeLinkedListNode(), Send, ReceiveOrClosed {
+) : Send(), ReceiveOrClosed {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt
index a2a6da2e0c..f13a15c2ec 100644
--- a/kotlinx-coroutines-core/common/src/channels/Channel.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt
@@ -530,7 +530,7 @@ public interface Channel : SendChannel, ReceiveChannel {
/**
* Requests buffered channel with a default buffer capacity in `Channel(...)` factory function --
* the `ArrayChannel` gets created with a default capacity.
- * This capacity is equal to 16 by default and can be overridden by setting
+ * This capacity is equal to 64 by default and can be overridden by setting
* [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
*/
public const val BUFFERED = -2
@@ -545,7 +545,7 @@ public interface Channel : SendChannel, ReceiveChannel {
public const val DEFAULT_BUFFER_PROPERTY_NAME = "kotlinx.coroutines.channels.defaultBuffer"
internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
- 16, 1, UNLIMITED - 1
+ 64, 1, UNLIMITED - 1
)
}
}
diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt
index bf88b6a062..a579d7a247 100644
--- a/kotlinx-coroutines-core/common/src/channels/Produce.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt
@@ -126,7 +126,7 @@ public fun CoroutineScope.produce(
return coroutine
}
-private class ProducerCoroutine(
+internal open class ProducerCoroutine(
parentContext: CoroutineContext, channel: Channel
) : ChannelCoroutine(parentContext, channel, active = true), ProducerScope {
override val isActive: Boolean
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
index 3bae2ebd38..4711b88418 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
@@ -58,7 +58,7 @@ public abstract class ChannelFlow(
protected abstract suspend fun collectTo(scope: ProducerScope)
// shared code to create a suspend lambda from collectTo function in one place
- private val collectToFun: suspend (ProducerScope) -> Unit
+ internal val collectToFun: suspend (ProducerScope) -> Unit
get() = { collectTo(it) }
private val produceCapacity: Int
@@ -140,13 +140,11 @@ internal class ChannelFlowOperatorImpl(
private fun FlowCollector.withUndispatchedContextCollector(emitContext: CoroutineContext): FlowCollector = when (this) {
// SendingCollector & NopCollector do not care about the context at all and can be used as is
is SendingCollector, is NopCollector -> this
- // Original collector is concurrent, so wrap into ConcurrentUndispatchedContextCollector (also concurrent)
- is ConcurrentFlowCollector -> ConcurrentUndispatchedContextCollector(this, emitContext)
// Otherwise just wrap into UndispatchedContextCollector interface implementation
else -> UndispatchedContextCollector(this, emitContext)
}
-private open class UndispatchedContextCollector(
+private class UndispatchedContextCollector(
downstream: FlowCollector,
private val emitContext: CoroutineContext
) : FlowCollector {
@@ -157,12 +155,6 @@ private open class UndispatchedContextCollector(
withContextUndispatched(emitContext, countOrElement, emitRef, value)
}
-// named class for a combination of UndispatchedContextCollector & ConcurrentFlowCollector interface
-private class ConcurrentUndispatchedContextCollector(
- downstream: ConcurrentFlowCollector,
- emitContext: CoroutineContext
-) : UndispatchedContextCollector(downstream, emitContext), ConcurrentFlowCollector
-
// Efficiently computes block(value) in the newContext
private suspend fun withContextUndispatched(
newContext: CoroutineContext,
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt b/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt
deleted file mode 100644
index f37cc1caec..0000000000
--- a/kotlinx-coroutines-core/common/src/flow/internal/Concurrent.kt
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
- */
-
-package kotlinx.coroutines.flow.internal
-
-import kotlinx.atomicfu.*
-import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.*
-import kotlinx.coroutines.channels.ArrayChannel
-import kotlinx.coroutines.flow.*
-
-internal fun FlowCollector.asConcurrentFlowCollector(): ConcurrentFlowCollector =
- this as? ConcurrentFlowCollector ?: SerializingCollector(this)
-
-// Flow collector that supports concurrent emit calls.
-// It is internal for now but may be public in the future.
-// Two basic implementations are here: SendingCollector and ConcurrentFlowCollector
-internal interface ConcurrentFlowCollector : FlowCollector
-
-/**
- * Collection that sends to channel. It is marked as [ConcurrentFlowCollector] because it can be used concurrently.
- *
- * @suppress **This an internal API and should not be used from general code.**
- */
-@InternalCoroutinesApi
-public class SendingCollector(
- private val channel: SendChannel
-) : ConcurrentFlowCollector {
- override suspend fun emit(value: T) = channel.send(value)
-}
-
-// Effectively serializes access to downstream collector for merging
-// This is basically a converted from FlowCollector interface to ConcurrentFlowCollector
-private class SerializingCollector(
- private val downstream: FlowCollector
-) : ConcurrentFlowCollector {
- // Let's try to leverage the fact that merge is never contended
- // Should be Any, but KT-30796
- private val _channel = atomic?>(null)
- private val inProgressLock = atomic(false)
-
- private val channel: ArrayChannel
- get() = _channel.updateAndGet { value ->
- if (value != null) return value
- ArrayChannel(Channel.CHANNEL_DEFAULT_CAPACITY)
- }!!
-
- public override suspend fun emit(value: T) {
- if (!inProgressLock.tryAcquire()) {
- channel.send(value ?: NULL)
- if (inProgressLock.tryAcquire()) {
- helpEmit()
- }
- return
- }
- downstream.emit(value)
- helpEmit()
- }
-
- @Suppress("UNCHECKED_CAST")
- private suspend fun helpEmit() {
- while (true) {
- while (true) {
- val element = _channel.value?.poll() ?: break // todo: pollOrClosed
- downstream.emit(NULL.unbox(element))
- }
- inProgressLock.release()
- // Enforce liveness
- if (_channel.value?.isEmpty != false || !inProgressLock.tryAcquire()) break
- }
- }
-}
-
-@Suppress("NOTHING_TO_INLINE")
-private inline fun AtomicBoolean.tryAcquire(): Boolean = compareAndSet(false, true)
-
-@Suppress("NOTHING_TO_INLINE")
-private inline fun AtomicBoolean.release() {
- value = false
-}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
index adc3a17d16..1917afb8d7 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/FlowCoroutine.kt
@@ -52,6 +52,18 @@ internal fun scopedFlow(@BuilderInference block: suspend CoroutineScope.(Flo
flowScope { block(collector) }
}
+internal fun CoroutineScope.flowProduce(
+ context: CoroutineContext,
+ capacity: Int = 0,
+ @BuilderInference block: suspend ProducerScope.() -> Unit
+): ReceiveChannel {
+ val channel = Channel(capacity)
+ val newContext = newCoroutineContext(context)
+ val coroutine = FlowProduceCoroutine(newContext, channel)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+ return coroutine
+}
+
private class FlowCoroutine(
context: CoroutineContext,
uCont: Continuation
@@ -61,3 +73,13 @@ private class FlowCoroutine(
return cancelImpl(cause)
}
}
+
+private class FlowProduceCoroutine(
+ parentContext: CoroutineContext,
+ channel: Channel
+) : ProducerCoroutine(parentContext, channel) {
+ public override fun childCancelled(cause: Throwable): Boolean {
+ if (cause is ChildCancelledException) return true
+ return cancelImpl(cause)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
index f621be034e..289a4ebcab 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
@@ -38,17 +38,21 @@ internal class ChannelFlowTransformLatest(
}
internal class ChannelFlowMerge(
- flow: Flow>,
+ private val flow: Flow>,
private val concurrency: Int,
context: CoroutineContext = EmptyCoroutineContext,
- capacity: Int = Channel.OPTIONAL_CHANNEL
-) : ChannelFlowOperator, T>(flow, context, capacity) {
+ capacity: Int = Channel.BUFFERED
+) : ChannelFlow(context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow =
ChannelFlowMerge(flow, concurrency, context, capacity)
- // The actual merge implementation with concurrency limit
- private suspend fun mergeImpl(scope: CoroutineScope, collector: ConcurrentFlowCollector) {
+ override fun produceImpl(scope: CoroutineScope): ReceiveChannel {
+ return scope.flowProduce(context, capacity, block = collectToFun)
+ }
+
+ override suspend fun collectTo(scope: ProducerScope) {
val semaphore = Semaphore(concurrency)
+ val collector = SendingCollector(scope)
val job: Job? = coroutineContext[Job]
flow.collect { inner ->
/*
@@ -68,19 +72,6 @@ internal class ChannelFlowMerge(
}
}
- // Fast path in ChannelFlowOperator calls this function (channel was not created yet)
- override suspend fun flowCollect(collector: FlowCollector) {
- // this function should not have been invoked when channel was explicitly requested
- assert { capacity == Channel.OPTIONAL_CHANNEL }
- flowScope {
- mergeImpl(this, collector.asConcurrentFlowCollector())
- }
- }
-
- // Slow path when output channel is required (and was created)
- override suspend fun collectTo(scope: ProducerScope) =
- mergeImpl(scope, SendingCollector(scope))
-
override fun additionalToStringProps(): String =
"concurrency=$concurrency, "
}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt
index f83f31348f..d1b6ad2bf5 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/NopCollector.kt
@@ -4,7 +4,9 @@
package kotlinx.coroutines.flow.internal
-internal object NopCollector : ConcurrentFlowCollector {
+import kotlinx.coroutines.flow.*
+
+internal object NopCollector : FlowCollector {
override suspend fun emit(value: Any?) {
// does nothing
}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
index 8761058e71..fec0ee96e0 100644
--- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt
@@ -77,8 +77,11 @@ internal class SafeCollector(
*/
if (emissionParentJob !== collectJob) {
error(
- "Flow invariant is violated: emission from another coroutine is detected (child of $emissionParentJob, expected child of $collectJob). " +
- "FlowCollector is not thread-safe and concurrent emissions are prohibited. To mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
+ "Flow invariant is violated:\n" +
+ "\t\tEmission from another coroutine is detected.\n" +
+ "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
+ "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
+ "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
)
}
@@ -91,8 +94,10 @@ internal class SafeCollector(
}
if (result != collectContextSize) {
error(
- "Flow invariant is violated: flow was collected in $collectContext, but emission happened in $currentContext. " +
- "Please refer to 'flow' documentation or use 'flowOn' instead"
+ "Flow invariant is violated:\n" +
+ "\t\tFlow was collected in $collectContext,\n" +
+ "\t\tbut emission happened in $currentContext.\n" +
+ "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
)
}
}
diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SendingCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SendingCollector.kt
new file mode 100644
index 0000000000..b6d578fedc
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/flow/internal/SendingCollector.kt
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow.internal
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlinx.coroutines.flow.*
+
+/**
+ * Collection that sends to channel
+ * @suppress **This an internal API and should not be used from general code.**
+ */
+@InternalCoroutinesApi
+public class SendingCollector(
+ private val channel: SendChannel
+) : FlowCollector {
+ override suspend fun emit(value: T) = channel.send(value)
+}
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
index 85b9b07c6b..5f2f46629f 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
@@ -23,13 +23,13 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* ```
* flow {
* emit(1)
- * delay(99)
+ * delay(90)
* emit(2)
- * delay(99)
+ * delay(90)
* emit(3)
- * delay(1001)
+ * delay(1010)
* emit(4)
- * delay(1001)
+ * delay(1010)
* emit(5)
* }.debounce(1000)
* ```
diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt
index c053307099..fc1c72f067 100644
--- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt
+++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.internal
@@ -66,7 +66,7 @@ public expect open class RemoveFirstDesc(queue: LockFreeLinkedListNode): Abst
public expect abstract class AbstractAtomicDesc : AtomicDesc {
final override fun prepare(op: AtomicOp<*>): Any?
final override fun complete(op: AtomicOp<*>, failure: Any?)
- protected open fun failure(affected: LockFreeLinkedListNode, next: Any): Any?
+ protected open fun failure(affected: LockFreeLinkedListNode): Any?
protected open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean
protected abstract fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? // non-null on failure
protected abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt
index b42fde3e07..4626fe1d38 100644
--- a/kotlinx-coroutines-core/common/src/selects/Select.kt
+++ b/kotlinx-coroutines-core/common/src/selects/Select.kt
@@ -110,11 +110,6 @@ public interface SelectInstance {
*/
public fun performAtomicTrySelect(desc: AtomicDesc): Any?
- /**
- * Performs action atomically when [isSelected] is `false`.
- */
- public fun performAtomicIfNotSelected(desc: AtomicDesc): Any?
-
/**
* Returns completion continuation of this select instance.
* This select instance must be _selected_ first.
@@ -129,6 +124,7 @@ public interface SelectInstance {
/**
* Disposes the specified handle when this instance is selected.
+ * Note, that [DisposableHandle.dispose] could be called multiple times.
*/
public fun disposeOnSelect(handle: DisposableHandle)
}
@@ -311,10 +307,17 @@ internal class SelectBuilderImpl(
internal fun handleBuilderException(e: Throwable) {
if (trySelect(null)) {
resumeWithException(e)
- } else {
- // Cannot handle this exception -- builder was already resumed with a different exception,
- // so treat it as "unhandled exception"
- handleCoroutineException(context, e)
+ } else if (e !is CancellationException) {
+ /*
+ * Cannot handle this exception -- builder was already resumed with a different exception,
+ * so treat it as "unhandled exception". But only if it is not the completion reason
+ * and it's not the cancellation. Otherwise, in the face of structured concurrency
+ * the same exception will be reported to theglobal exception handler.
+ */
+ val result = getResult()
+ if (result !is CompletedExceptionally || unwrap(result.cause) !== unwrap(e)) {
+ handleCoroutineException(context, e)
+ }
}
}
@@ -322,16 +325,14 @@ internal class SelectBuilderImpl(
override fun disposeOnSelect(handle: DisposableHandle) {
val node = DisposeNode(handle)
- while (true) { // lock-free loop on state
- val state = this.state
- if (state === this) {
- if (addLastIf(node, { this.state === this }))
- return
- } else { // already selected
- handle.dispose()
- return
- }
+ // check-add-check pattern is Ok here since handle.dispose() is safe to be called multiple times
+ if (!isSelected) {
+ addLast(node) // add handle to list
+ // double-check node after adding
+ if (!isSelected) return // all ok - still not selected
}
+ // already selected
+ handle.dispose()
}
private fun doAfterSelect() {
@@ -361,12 +362,11 @@ internal class SelectBuilderImpl(
}
}
- override fun performAtomicTrySelect(desc: AtomicDesc): Any? = AtomicSelectOp(desc, true).perform(null)
- override fun performAtomicIfNotSelected(desc: AtomicDesc): Any? = AtomicSelectOp(desc, false).perform(null)
+ override fun performAtomicTrySelect(desc: AtomicDesc): Any? =
+ AtomicSelectOp(desc).perform(null)
private inner class AtomicSelectOp(
- @JvmField val desc: AtomicDesc,
- @JvmField val select: Boolean
+ @JvmField val desc: AtomicDesc
) : AtomicOp() {
override fun prepare(affected: Any?): Any? {
// only originator of operation makes preparation move of installing descriptor into this selector's state
@@ -398,7 +398,7 @@ internal class SelectBuilderImpl(
}
private fun completeSelect(failure: Any?) {
- val selectSuccess = select && failure == null
+ val selectSuccess = failure == null
val update = if (selectSuccess) null else this@SelectBuilderImpl
if (_state.compareAndSet(this@AtomicSelectOp, update)) {
if (selectSuccess)
diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
index fa198e1371..3c72915379 100644
--- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
@@ -246,16 +246,11 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 {
}
is LockedQueue -> {
check(state.owner !== owner) { "Already locked by $owner" }
- val enqueueOp = TryEnqueueLockDesc(this, owner, state, select, block)
- val failure = select.performAtomicIfNotSelected(enqueueOp)
- when {
- failure == null -> { // successfully enqueued
- select.disposeOnSelect(enqueueOp.node)
- return
- }
- failure === ALREADY_SELECTED -> return // already selected -- bail out
- failure === ENQUEUE_FAIL -> {} // retry
- else -> error("performAtomicIfNotSelected(TryEnqueueLockDesc) returned $failure")
+ val node = LockSelect(owner, this, select, block)
+ if (state.addLastIf(node) { _state.value === state }) {
+ // successfully enqueued
+ select.disposeOnSelect(node)
+ return
}
}
is OpDescriptor -> state.perform(this) // help
@@ -291,19 +286,6 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 {
}
}
- private class TryEnqueueLockDesc(
- @JvmField val mutex: MutexImpl,
- owner: Any?,
- queue: LockedQueue,
- select: SelectInstance,
- block: suspend (Mutex) -> R
- ) : AddLastDesc>(queue, LockSelect(owner, mutex, select, block)) {
- override fun onPrepare(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode): Any? {
- if (mutex._state.value !== queue) return ENQUEUE_FAIL
- return super.onPrepare(affected, next)
- }
- }
-
public override fun holdsLock(owner: Any) =
_state.value.let { state ->
when (state) {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
index 0b1b208fea..b68e115637 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/BufferTest.kt
@@ -11,7 +11,7 @@ import kotlin.test.*
class BufferTest : TestBase() {
private val n = 50 // number of elements to emit for test
- private val defaultBufferSize = 16 // expected default buffer size (per docs)
+ private val defaultBufferSize = 64 // expected default buffer size (per docs)
// Use capacity == -1 to check case of "no buffer"
private fun checkBuffer(capacity: Int, op: suspend Flow.() -> Flow) = runTest {
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt
new file mode 100644
index 0000000000..a92189c45c
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlatMapMergeFastPathTest.kt
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class FlatMapMergeFastPathTest : FlatMapMergeBaseTest() {
+
+ override fun Flow.flatMap(mapper: suspend (T) -> Flow): Flow = flatMapMerge(transform = mapper).buffer(64)
+
+ @Test
+ override fun testFlatMapConcurrency() = runTest {
+ var concurrentRequests = 0
+ val flow = (1..100).asFlow().flatMapMerge(concurrency = 2) { value ->
+ flow {
+ ++concurrentRequests
+ emit(value)
+ delay(Long.MAX_VALUE)
+ }
+ }.buffer(64)
+
+ val consumer = launch {
+ flow.collect { value ->
+ expect(value)
+ }
+ }
+
+ repeat(4) {
+ yield()
+ }
+
+ assertEquals(2, concurrentRequests)
+ consumer.cancelAndJoin()
+ finish(3)
+ }
+
+ @Test
+ fun testCancellationExceptionDownstream() = runTest {
+ val flow = flow {
+ emit(1)
+ hang { expect(2) }
+ }.flatMapMerge {
+ flow {
+ emit(it)
+ expect(1)
+ throw CancellationException("")
+ }
+ }.buffer(64)
+
+ assertFailsWith(flow)
+ finish(3)
+ }
+
+ @Test
+ fun testCancellationExceptionUpstream() = runTest {
+ val flow = flow {
+ expect(1)
+ emit(1)
+ expect(2)
+ yield()
+ throw CancellationException("")
+ }.flatMapMerge {
+ flow {
+ expect(3)
+ emit(it)
+ hang { expect(4) }
+ }
+ }.buffer(64)
+
+ assertFailsWith(flow)
+ finish(5)
+ }
+
+ @Test
+ fun testCancellation() = runTest {
+ val result = flow {
+ emit(1)
+ emit(2)
+ emit(3)
+ emit(4)
+ expectUnreached() // Cancelled by take
+ emit(5)
+ }.flatMapMerge(2) { v -> flow { emit(v) } }
+ .buffer(64)
+ .take(2)
+ .toList()
+ assertEquals(listOf(1, 2), result)
+ }
+}
diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt
index e1f1d4a624..c15f503c45 100644
--- a/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt
+++ b/kotlinx-coroutines-core/common/test/flow/operators/FlattenMergeTest.kt
@@ -14,7 +14,7 @@ class FlattenMergeTest : FlatMapMergeBaseTest() {
@Test
override fun testFlatMapConcurrency() = runTest {
var concurrentRequests = 0
- val flow = (1..100).asFlow().map() { value ->
+ val flow = (1..100).asFlow().map { value ->
flow {
++concurrentRequests
emit(value)
@@ -36,4 +36,19 @@ class FlattenMergeTest : FlatMapMergeBaseTest() {
consumer.cancelAndJoin()
finish(3)
}
+
+ @Test
+ fun testContextPreservationAcrossFlows() = runTest {
+ val result = flow {
+ flowOf(1, 2).flatMapMerge {
+ flow {
+ yield()
+ emit(it)
+ }
+ }.collect {
+ emit(it)
+ }
+ }.toList()
+ assertEquals(listOf(1, 2), result)
+ }
}
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt
new file mode 100644
index 0000000000..a066f6b3a9
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/selects/SelectLinkedListChannelTest.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.selects
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.channels.*
+import kotlin.test.*
+
+class SelectLinkedListChannelTest : TestBase() {
+ @Test
+ fun testSelectSendWhenClosed() = runTest {
+ expect(1)
+ val c = Channel(Channel.UNLIMITED)
+ c.send(1) // enqueue buffered element
+ c.close() // then close
+ assertFailsWith {
+ // select sender should fail
+ expect(2)
+ select {
+ c.onSend(2) {
+ expectUnreached()
+ }
+ }
+ }
+ finish(3)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
index ed8b8d3691..6072cc2cbb 100644
--- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
+++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt
@@ -403,6 +403,29 @@ class SelectRendezvousChannelTest : TestBase() {
finish(10)
}
+ @Test
+ fun testSelectSendWhenClosed() = runTest {
+ expect(1)
+ val c = Channel(Channel.RENDEZVOUS)
+ val sender = launch(start = CoroutineStart.UNDISPATCHED) {
+ expect(2)
+ c.send(1) // enqueue sender
+ expectUnreached()
+ }
+ c.close() // then close
+ assertFailsWith {
+ // select sender should fail
+ expect(3)
+ select {
+ c.onSend(2) {
+ expectUnreached()
+ }
+ }
+ }
+ sender.cancel()
+ finish(4)
+ }
+
// only for debugging
internal fun SelectBuilder.default(block: suspend () -> R) {
this as SelectBuilderImpl // type assertion
diff --git a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt
index 3f179d1d81..6050901058 100644
--- a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt
+++ b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
@@ -129,13 +129,13 @@ public actual abstract class AbstractAtomicDesc : AtomicDesc() {
actual final override fun prepare(op: AtomicOp<*>): Any? {
val affected = affectedNode
val next = affected._next
- val failure = failure(affected, next)
+ val failure = failure(affected)
if (failure != null) return failure
return onPrepare(affected, next)
}
actual final override fun complete(op: AtomicOp<*>, failure: Any?) = onComplete()
- protected actual open fun failure(affected: LockFreeLinkedListNode, next: Any): Any? = null // Never fails by default
+ protected actual open fun failure(affected: LockFreeLinkedListNode): Any? = null // Never fails by default
protected actual open fun retry(affected: LockFreeLinkedListNode, next: Any): Boolean = false // Always succeeds
protected actual abstract fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode)
}
diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt
index c5ce53724b..7d7f4ba7be 100644
--- a/kotlinx-coroutines-core/jvm/src/Executors.kt
+++ b/kotlinx-coroutines-core/jvm/src/Executors.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
@@ -35,15 +35,27 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
*/
@JvmName("from") // this is for a nice Java API, see issue #255
public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
- // we know that an implementation of Executor.asCoroutineDispatcher actually returns a closeable one
- (this as Executor).asCoroutineDispatcher() as ExecutorCoroutineDispatcher
+ ExecutorCoroutineDispatcherImpl(this)
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*/
@JvmName("from") // this is for a nice Java API, see issue #255
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
- ExecutorCoroutineDispatcherImpl(this)
+ (this as? DispatcherExecutor)?.dispatcher ?: ExecutorCoroutineDispatcherImpl(this)
+
+/**
+ * Converts an instance of [CoroutineDispatcher] to an implementation of [Executor].
+ *
+ * It returns the original executor when used on the result of [Executor.asCoroutineDispatcher] extensions.
+ */
+public fun CoroutineDispatcher.asExecutor(): Executor =
+ (this as? ExecutorCoroutineDispatcher)?.executor ?: DispatcherExecutor(this)
+
+private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) : Executor {
+ override fun execute(block: Runnable) = dispatcher.dispatch(EmptyCoroutineContext, block)
+ override fun toString(): String = dispatcher.toString()
+}
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
init {
diff --git a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
index 7d28de2574..d3d168a427 100644
--- a/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
+++ b/kotlinx-coroutines-core/jvm/src/internal/LockFreeLinkedList.kt
@@ -254,26 +254,6 @@ public actual open class LockFreeLinkedListNode {
finishRemove(removed.ref)
}
- public open fun describeRemove() : AtomicDesc? {
- if (isRemoved) return null // fast path if was already removed
- return object : AbstractAtomicDesc() {
- private val _originalNext = atomic(null)
- override val affectedNode: Node? get() = this@LockFreeLinkedListNode
- override val originalNext get() = _originalNext.value
- override fun failure(affected: Node, next: Any): Any? =
- if (next is Removed) ALREADY_REMOVED else null
- override fun onPrepare(affected: Node, next: Node): Any? {
- // Note: onPrepare must use CAS to make sure the stale invocation is not
- // going to overwrite the previous decision on successful preparation.
- // Result of CAS is irrelevant, but we must ensure that it is set when invoker completes
- _originalNext.compareAndSet(null, next)
- return null // always success
- }
- override fun updatedNext(affected: Node, next: Node) = next.removed()
- override fun finishOnSuccess(affected: Node, next: Node) = finishRemove(next)
- }
- }
-
public actual fun removeFirstOrNull(): Node? {
while (true) { // try to linearize
val first = next as Node
@@ -376,7 +356,7 @@ public actual open class LockFreeLinkedListNode {
final override val originalNext: Node? get() = _originalNext.value
// check node predicates here, must signal failure if affect is not of type T
- protected override fun failure(affected: Node, next: Any): Any? =
+ protected override fun failure(affected: Node): Any? =
if (affected === queue) LIST_EMPTY else null
// validate the resulting node (return false if it should be deleted)
@@ -408,7 +388,7 @@ public actual open class LockFreeLinkedListNode {
protected abstract val affectedNode: Node?
protected abstract val originalNext: Node?
protected open fun takeAffectedNode(op: OpDescriptor): Node = affectedNode!!
- protected open fun failure(affected: Node, next: Any): Any? = null // next: Node | Removed
+ protected open fun failure(affected: Node): Any? = null // next: Node | Removed
protected open fun retry(affected: Node, next: Any): Boolean = false // next: Node | Removed
protected abstract fun onPrepare(affected: Node, next: Node): Any? // non-null on failure
protected abstract fun updatedNext(affected: Node, next: Node): Any
@@ -460,7 +440,7 @@ public actual open class LockFreeLinkedListNode {
continue // and retry
}
// next: Node | Removed
- val failure = failure(affected, next)
+ val failure = failure(affected)
if (failure != null) return failure // signal failure
if (retry(affected, next)) continue // retry operation
val prepareOp = PrepareOp(next as Node, op as AtomicOp, this)
@@ -684,8 +664,6 @@ public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
// just a defensive programming -- makes sure that list head sentinel is never removed
public actual final override fun remove(): Boolean = throw UnsupportedOperationException()
- public final override fun describeRemove(): Nothing = throw UnsupportedOperationException()
-
internal fun validate() {
var prev: Node = this
var cur: Node = next as Node
diff --git a/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt b/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt
index 296dc81794..4e25da96f5 100644
--- a/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt
@@ -18,5 +18,6 @@ class CancellableContinuationJvmTest : TestBase() {
it.resume(Unit)
assertTrue(it.toString().contains("kotlinx.coroutines.CancellableContinuationJvmTest.checkToString(CancellableContinuationJvmTest.kt"))
}
+ suspend {}() // Eliminate tail-call optimization
}
}
diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt
index a959b4ebb4..2cf4361867 100644
--- a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt
@@ -1,11 +1,13 @@
/*
- * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
-import org.junit.Test
-import java.util.concurrent.Executors
+import org.junit.*
+import org.junit.Assert.*
+import java.util.concurrent.*
+import kotlin.coroutines.*
class ExecutorsTest : TestBase() {
private fun checkThreadName(prefix: String) {
@@ -32,7 +34,7 @@ class ExecutorsTest : TestBase() {
}
@Test
- fun testToExecutor() {
+ fun testExecutorToDispatcher() {
val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") }
runBlocking(executor.asCoroutineDispatcher()) {
checkThreadName("TestExecutor")
@@ -40,6 +42,41 @@ class ExecutorsTest : TestBase() {
executor.shutdown()
}
+ @Test
+ fun testConvertedDispatcherToExecutor() {
+ val executor: ExecutorService = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") }
+ val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher()
+ assertSame(executor, dispatcher.asExecutor())
+ executor.shutdown()
+ }
+
+ @Test
+ fun testDefaultDispatcherToExecutor() {
+ val latch = CountDownLatch(1)
+ Dispatchers.Default.asExecutor().execute {
+ checkThreadName("DefaultDispatcher")
+ latch.countDown()
+ }
+ latch.await()
+ }
+
+ @Test
+ fun testCustomDispatcherToExecutor() {
+ expect(1)
+ val dispatcher = object : CoroutineDispatcher() {
+ override fun dispatch(context: CoroutineContext, block: Runnable) {
+ expect(2)
+ block.run()
+ }
+ }
+ val executor = dispatcher.asExecutor()
+ assertSame(dispatcher, executor.asCoroutineDispatcher())
+ executor.execute {
+ expect(3)
+ }
+ finish(4)
+ }
+
@Test
fun testTwoThreads() {
val ctx1 = newSingleThreadContext("Ctx1")
diff --git a/kotlinx-coroutines-core/jvm/test/flow/CombineStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/CombineStressTest.kt
new file mode 100644
index 0000000000..3b5c36f9e9
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/CombineStressTest.kt
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import org.junit.*
+
+class CombineStressTest : TestBase() {
+
+ @Test
+ public fun testCancellation() = runTest {
+ withContext(Dispatchers.Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
+ flow {
+ expect(1)
+ repeat(1_000 * stressTestMultiplier) {
+ emit(it)
+ }
+ }.flatMapLatest {
+ combine(flowOf(it), flowOf(it)) { arr -> arr[0] }
+ }.collect()
+ finish(2)
+ reset()
+ }
+ }
+
+ @Test
+ public fun testFailure() = runTest {
+ val innerIterations = 100 * stressTestMultiplierSqrt
+ val outerIterations = 10 * stressTestMultiplierSqrt
+ withContext(Dispatchers.Default + CoroutineExceptionHandler { _, _ -> expectUnreached() }) {
+ repeat(outerIterations) {
+ try {
+ flow {
+ expect(1)
+ repeat(innerIterations) {
+ emit(it)
+ }
+ }.flatMapLatest {
+ combine(flowOf(it), flowOf(it)) { arr -> arr[0] }
+ }.onEach {
+ if (it >= innerIterations / 2) throw TestException()
+ }.collect()
+ } catch (e: TestException) {
+ expect(2)
+ }
+ finish(3)
+ reset()
+ }
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
index e6a299efcd..a35e848196 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-03.kt
@@ -8,12 +8,10 @@ package kotlinx.coroutines.guide.basic03
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val job = GlobalScope.launch { // launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt
index a348ef4a39..56e785fb7f 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-basic-07.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.basic07
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
GlobalScope.launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
@@ -16,5 +15,4 @@ fun main() = runBlocking {
}
}
delay(1300L) // just quit after delay
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-01.kt
index e44b703308..ebf5171e2e 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-01.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.cancel01
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
@@ -20,5 +19,4 @@ fun main() = runBlocking {
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-02.kt
index 518c0be541..e3127b41ba 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-02.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.cancel02
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val startTime = currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
@@ -25,5 +24,4 @@ fun main() = runBlocking {
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-03.kt
index 8c1e3f83b0..d47ecd9dda 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-03.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.cancel03
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val startTime = currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
@@ -25,5 +24,4 @@ fun main() = runBlocking {
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-04.kt
index 002521e547..45c97851aa 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-04.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.cancel04
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val job = launch {
try {
repeat(1000) { i ->
@@ -23,5 +22,4 @@ fun main() = runBlocking {
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-05.kt
index 5c7debb6c5..9f2cac1c23 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-05.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.cancel05
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val job = launch {
try {
repeat(1000) { i ->
@@ -27,5 +26,4 @@ fun main() = runBlocking {
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-06.kt
index 299ceb276b..f06d1187e4 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-06.kt
@@ -8,12 +8,10 @@ package kotlinx.coroutines.guide.cancel06
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-07.kt
index 1116f91357..e2880c9129 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-cancel-07.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-cancel-07.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.cancel07
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
@@ -17,5 +16,4 @@ fun main() = runBlocking {
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt
index d3ab53b79e..36c6db316b 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
-//sampleStart
val channel = Channel()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
@@ -18,5 +17,4 @@ fun main() = runBlocking {
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt
index 9ab469f8dc..59f5a76807 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
-//sampleStart
val channel = Channel()
launch {
for (x in 1..5) channel.send(x * x)
@@ -18,5 +17,4 @@ fun main() = runBlocking {
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt
index c6550b4d1d..5c9cfb181f 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt
@@ -13,9 +13,7 @@ fun CoroutineScope.produceSquares(): ReceiveChannel = produce {
}
fun main() = runBlocking {
-//sampleStart
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt
index 02ac7bb0f5..4eb6c37d2c 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt
@@ -9,13 +9,11 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
-//sampleStart
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
-//sampleEnd
}
fun CoroutineScope.produceNumbers() = produce {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt
index 625b52c72a..8b80764af5 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
-//sampleStart
var cur = numbersFrom(2)
for (i in 1..10) {
val prime = cur.receive()
@@ -17,7 +16,6 @@ fun main() = runBlocking {
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
-//sampleEnd
}
fun CoroutineScope.numbersFrom(start: Int) = produce {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt
index b88a1b0091..452e056d34 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt
@@ -9,12 +9,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
-//sampleStart
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
-//sampleEnd
}
fun CoroutineScope.produceNumbers() = produce {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt
index 048729677b..9fc852e5c0 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
-//sampleStart
val channel = Channel()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
@@ -17,7 +16,6 @@ fun main() = runBlocking {
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
-//sampleEnd
}
suspend fun sendString(channel: SendChannel, s: String, time: Long) {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt
index 6c5598097c..c9916d4184 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
-//sampleStart
val channel = Channel(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
@@ -20,5 +19,4 @@ fun main() = runBlocking {
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt
index ae9d95c771..fb293257e8 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.channel09
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
-//sampleStart
data class Ball(var hits: Int)
fun main() = runBlocking {
@@ -28,4 +27,3 @@ suspend fun player(name: String, table: Channel) {
table.send(ball) // send the ball back
}
}
-//sampleEnd
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-01.kt
index 5dfb770046..ab9ef608f4 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-01.kt
@@ -9,14 +9,12 @@ import kotlinx.coroutines.*
import kotlin.system.*
fun main() = runBlocking {
-//sampleStart
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
-//sampleEnd
}
suspend fun doSomethingUsefulOne(): Int {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-02.kt
index d78e5141f4..9e46c6c48f 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-02.kt
@@ -9,14 +9,12 @@ import kotlinx.coroutines.*
import kotlin.system.*
fun main() = runBlocking {
-//sampleStart
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
-//sampleEnd
}
suspend fun doSomethingUsefulOne(): Int {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-03.kt
index aa6dd6f7bf..1dc2fd9bb2 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-03.kt
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import kotlin.system.*
fun main() = runBlocking {
-//sampleStart
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
@@ -19,7 +18,6 @@ fun main() = runBlocking {
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
-//sampleEnd
}
suspend fun doSomethingUsefulOne(): Int {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
index ea6860e01a..ad0b021488 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-04.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.compose04
import kotlinx.coroutines.*
import kotlin.system.*
-//sampleStart
// note that we don't have `runBlocking` to the right of `main` in this example
fun main() {
val time = measureTimeMillis {
@@ -23,7 +22,6 @@ fun main() {
}
println("Completed in $time ms")
}
-//sampleEnd
fun somethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne()
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-compose-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-compose-05.kt
index e3c5403d52..e02f33e0bd 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-compose-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-compose-05.kt
@@ -9,12 +9,10 @@ import kotlinx.coroutines.*
import kotlin.system.*
fun main() = runBlocking {
-//sampleStart
val time = measureTimeMillis {
println("The answer is ${concurrentSum()}")
}
println("Completed in $time ms")
-//sampleEnd
}
suspend fun concurrentSum(): Int = coroutineScope {
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-01.kt
index 13534c7625..c3a9f5afeb 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-01.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-01.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.context01
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
launch { // context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
@@ -21,5 +20,4 @@ fun main() = runBlocking {
launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-02.kt
index d7be586aea..d1ec85fa9b 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-02.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.context02
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(500)
@@ -19,5 +18,4 @@ fun main() = runBlocking {
delay(1000)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-03.kt
index a26d3a0863..e52976d095 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-03.kt
@@ -10,7 +10,6 @@ import kotlinx.coroutines.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main() = runBlocking {
-//sampleStart
val a = async {
log("I'm computing a piece of the answer")
6
@@ -20,5 +19,4 @@ fun main() = runBlocking {
7
}
log("The answer is ${a.await() * b.await()}")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-04.kt
index 55cfecc25b..b4a8a3f821 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-04.kt
@@ -10,7 +10,6 @@ import kotlinx.coroutines.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main() {
-//sampleStart
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
@@ -22,5 +21,4 @@ fun main() {
}
}
}
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-05.kt
index d014f563a6..338e3c9d88 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-05.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-05.kt
@@ -8,7 +8,5 @@ package kotlinx.coroutines.guide.context05
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
println("My job is ${coroutineContext[Job]}")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
index 563d418c1b..b37b06b85c 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-06.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.context06
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with GlobalScope
@@ -29,5 +28,4 @@ fun main() = runBlocking {
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-07.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-07.kt
index 27bff49b2b..825f572a34 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-07.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-07.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.context07
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
@@ -21,5 +20,4 @@ fun main() = runBlocking {
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt
index 2a278d29d2..1083d77da4 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt
@@ -10,7 +10,6 @@ import kotlinx.coroutines.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main() = runBlocking(CoroutineName("main")) {
-//sampleStart
log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
@@ -24,5 +23,4 @@ fun main() = runBlocking(CoroutineName("main")) {
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-09.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-09.kt
index b72041b3ae..386e52544f 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-09.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-09.kt
@@ -8,9 +8,7 @@ package kotlinx.coroutines.guide.context09
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
launch(Dispatchers.Default + CoroutineName("test")) {
println("I'm working in thread ${Thread.currentThread().name}")
}
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-10.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-10.kt
index bce5f14353..3dde44670e 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-10.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-10.kt
@@ -28,7 +28,6 @@ class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {
} // class Activity ends
fun main() = runBlocking {
-//sampleStart
val activity = Activity()
activity.doSomething() // run test function
println("Launched coroutines")
@@ -36,5 +35,4 @@ fun main() = runBlocking {
println("Destroying activity!")
activity.destroy() // cancels all coroutines
delay(1000) // visually confirm that they don't work
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-11.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-11.kt
index a26d8c6a10..4a50d86c0f 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-context-11.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-11.kt
@@ -10,7 +10,6 @@ import kotlinx.coroutines.*
val threadLocal = ThreadLocal() // declare thread-local variable
fun main() = runBlocking {
-//sampleStart
threadLocal.set("main")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
@@ -20,5 +19,4 @@ fun main() = runBlocking {
}
job.join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
index 80da1dfffb..818ab285c8 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-02.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.exceptions02
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
@@ -19,5 +18,4 @@ fun main() = runBlocking {
throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await()
}
joinAll(job, deferred)
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-03.kt
index eadbb9bf01..2b1e8e62b1 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-03.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-03.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.exceptions03
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val job = launch {
val child = launch {
try {
@@ -25,5 +24,4 @@ fun main() = runBlocking {
println("Parent is not cancelled")
}
job.join()
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
index e741d39ea9..02024ce206 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-04.kt
@@ -8,7 +8,6 @@ package kotlinx.coroutines.guide.exceptions04
import kotlinx.coroutines.*
fun main() = runBlocking {
-//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
@@ -31,5 +30,4 @@ fun main() = runBlocking {
}
}
job.join()
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
index 0faf8d4f08..636c4a1f5d 100644
--- a/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-exceptions-06.kt
@@ -9,7 +9,6 @@ import kotlinx.coroutines.*
import java.io.*
fun main() = runBlocking {
-//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught original $exception")
}
@@ -29,5 +28,4 @@ fun main() = runBlocking {
}
}
job.join()
-//sampleEnd
}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt
new file mode 100644
index 0000000000..020f458b44
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-01.kt
@@ -0,0 +1,12 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.flow01
+
+fun foo(): List = listOf(1, 2, 3)
+
+fun main() {
+ foo().forEach { value -> println(value) }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt
new file mode 100644
index 0000000000..66fc1639b5
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-02.kt
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.flow02
+
+fun foo(): Sequence = sequence { // sequence builder
+ for (i in 1..3) {
+ Thread.sleep(100) // pretend we are computing it
+ yield(i) // yield next value
+ }
+}
+
+fun main() {
+ foo().forEach { value -> println(value) }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt
new file mode 100644
index 0000000000..393a0fa3a0
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-03.kt
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.flow03
+
+import kotlinx.coroutines.*
+
+suspend fun foo(): List {
+ delay(1000) // pretend we are doing something asynchronous here
+ return listOf(1, 2, 3)
+}
+
+fun main() = runBlocking {
+ foo().forEach { value -> println(value) }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt
new file mode 100644
index 0000000000..9a3c05cd2c
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-04.kt
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.flow04
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun foo(): Flow = flow { // flow builder
+ for (i in 1..3) {
+ delay(100) // pretend we are doing something useful here
+ emit(i) // emit next value
+ }
+}
+
+fun main() = runBlocking {
+ // Launch a concurrent coroutine to see that the main thread is not blocked
+ launch {
+ for (k in 1..3) {
+ println("I'm not blocked $k")
+ delay(100)
+ }
+ }
+ // Collect the flow
+ foo().collect { value -> println(value) }
+}
diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt b/kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt
new file mode 100644
index 0000000000..c1e05e2e3c
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/guide/example-flow-05.kt
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
+package kotlinx.coroutines.guide.flow05
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+
+fun foo(): Flow = flow {
+ println("Flow started")
+ for (i in 1..3) {
+ delay(100)
+ emit(i)
+ }
+}
+
+fun main() = runBlocking