This is a guide on core features of kotlinx.coroutines
with a series of examples.
Kotlin, as a language, provides only minimal low-level APIs in its standard library to enable various other
libraries to utilize coroutines. Unlike many other languages with similar capabilities, async
and await
are not keywords in Kotlin and are not even part of its standard library. Moreover, Kotlin's concept
of suspending function provides a safer and less error-prone abstraction for for asynchronous
operations than futures and promises.
kotlinx.coroutines
is a rich library for coroutines developed by JetBrains. It contains a number of high-level
coroutine-enabled primitives that this guide covers, including launch
, async
and others.
You need to add a dependency on kotlinx-coroutines-core
module as explained
here to use primitives from this guide in your projects.
- Coroutine basics
- Cancellation and timeouts
- Composing suspending functions
- Coroutine context and dispatchers
- Exception handling
- Channels (experimental)
- Shared mutable state and concurrency
- Select expression (experimental)
- Further reading
This section covers basic coroutine concepts.
Run the following code:
fun main(args: Array<String>) {
GlobalScope.launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
You can get full code here
Run this code:
Hello,
World!
Essentially, coroutines are light-weight threads. They are launched with launch coroutine builder in a context of some CoroutineScope. Here we are launching a new coroutine in the GlobalScope, meaning that the lifetime of the new coroutine is limited only by the lifetime of the whole application.
You can achieve the same result replacing
GlobalScope.launch { ... }
with thread { ... }
and delay(...)
with Thread.sleep(...)
. Try it.
If you start by replacing GlobalScope.launch
by thread
, the compiler produces the following error:
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
That is because delay is a special suspending function that does not block a thread, but suspends coroutine and it can be only used from a coroutine.
The first example mixes non-blocking delay(...)
and blocking Thread.sleep(...)
in the same code.
It is easy to get lost which one is blocking and which one is not.
Let's be explicit about blocking using runBlocking coroutine builder:
fun main(args: Array<String>) {
GlobalScope.launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main thread continues here immediately
runBlocking { // but this expression blocks the main thread
delay(2000L) // ... while we delay for 2 seconds to keep JVM alive
}
}
You can get full code here
The result is the same, but this code uses only non-blocking delay.
The main thread, that invokes runBlocking
, blocks until the coroutine inside runBlocking
completes.
This example can be also rewritten in a more idiomatic way, using runBlocking
to wrap
the execution of the main function:
fun main(args: Array<String>) = runBlocking<Unit> { // start main coroutine
GlobalScope.launch { // launch new coroutine in background and continue
delay(1000L)
println("World!")
}
println("Hello,") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
You can get full code here
Here runBlocking<Unit> { ... }
works as an adaptor that is used to start the top-level main coroutine.
We explicitly specify its Unit
return type, because a well-formed main
function in Kotlin has to return Unit
.
This is also a way to write unit-tests for suspending functions:
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// here we can use suspending functions using any assertion style that we like
}
}
Delaying for a time while another coroutine is working is not a good approach. Let's explicitly wait (in a non-blocking way) until the background Job that we have launched is complete:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = GlobalScope.launch { // launch new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // wait until child coroutine completes
}
You can get full code here
Now the result is still the same, but the code of the main coroutine is not tied to the duration of the background job in any way. Much better.
There is still something to be desired for practical usage of coroutines.
When we use GlobalScope.launch
we create a top-level coroutine. Even though it is light-weight, it still
consumes some memory resources while it runs. If we forget to keep a reference to the newly launched
coroutine it still runs. What if the code in the coroutine hangs (for example, we erroneously
delay for too long), what if we launched too many coroutines and ran out of memory?
Having to manually keep a reference to all the launched coroutines and join them is error-prone.
There is a better solution. We can use structured concurrency in our code. Instead of launching coroutines in the GlobalScope, just like we usually do with threads (threads are always global), we can launch coroutines in the specific scope of the operation we are performing.
In our example, we have main
function that is turned into a coroutine using runBlocking coroutine builder.
Every coroutine builder, including runBlocking
, adds an instance of CoroutineScope to the scope its code block.
We can launch coroutines in this scope without having to join
them explicitly, because
an outer coroutine (runBlocking
in our example) does not complete until all the coroutines launched
in its scope complete. Thus, we can make our example simpler:
fun main(args: Array<String>) = runBlocking<Unit> { // this: CoroutineScope
launch { // launch new coroutine in the scope of runBlocking
delay(1000L)
println("World!")
}
println("Hello,")
}
You can get full code here
In addition to the coroutine scope provided by different builders, it is possible to declare your own scope using coroutineScope builder. It creates new coroutine scope and does not complete until all launched children complete. The main difference between runBlocking and coroutineScope is that the latter does not block the current thread while waiting for all children to complete.
fun main(args: Array<String>) = runBlocking<Unit> { // this: CoroutineScope
launch {
delay(200L)
println("Task from runBlocking")
}
coroutineScope { // Creates a new coroutine scope
launch {
delay(500L)
println("Task from nested launch")
}
delay(100L)
println("Task from coroutine scope") // This line will be printed before nested launch
}
println("Coroutine scope is over") // This line is not printed until nested launch completes
}
You can get full code here
Let's extract the block of code inside launch { ... }
into a separate function. When you
perform "Extract function" refactoring on this code you get a new function with suspend
modifier.
That is your first suspending function. Suspending functions can be used inside coroutines
just like regular functions, but their additional feature is that they can, in turn,
use other suspending functions, like delay
in this example, to suspend execution of a coroutine.
fun main(args: Array<String>) = runBlocking<Unit> {
launch { doWorld() }
println("Hello,")
}
// this is your first suspending function
suspend fun doWorld() {
delay(1000L)
println("World!")
}
You can get full code here
But what if the extracted function contains a coroutine builder which is invoked on the current scope?
In this case suspend
modifier on the extracted function is not enough. Making doWorld
extension
method on CoroutineScope
is one of the solutions, but it may not always be applicable as it does not make API clearer.
currentScope builder comes to help: it inherits current CoroutineScope from the coroutine context it is invoked.
fun main(args: Array<String>) = runBlocking<Unit> {
launchDoWorld()
println("Hello,")
}
// this is your first suspending function
suspend fun launchDoWorld() = currentScope {
launch {
println("World!")
}
}
You can get full code here
Run the following code:
fun main(args: Array<String>) = runBlocking<Unit> {
repeat(100_000) { // launch a lot of coroutines
launch {
delay(1000L)
print(".")
}
}
}
You can get full code here
It launches 100K coroutines and, after a second, each coroutine prints a dot. Now, try that with threads. What would happen? (Most likely your code will produce some sort of out-of-memory error)
The following code launches a long-running coroutine in GlobalScope that prints "I'm sleeping" twice a second and then returns from the main function after some delay:
fun main(args: Array<String>) = runBlocking<Unit> {
GlobalScope.launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // just quit after delay
}
You can get full code here
You can run and see that it prints three lines and terminates:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Active coroutines that were launched in GlobalScope do not keep the process alive. They are like daemon threads.
This section covers coroutine cancellation and timeouts.
In a long-running application you might need fine-grained control on your background coroutines. For example, a user might have closed the page that launched a coroutine and now its result is no longer needed and its operation can be cancelled. The launch function returns a Job that can be used to cancel running coroutine:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
}
You can get full code here
It produces the following output:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
As soon as main invokes job.cancel
, we don't see any output from the other coroutine because it was cancelled.
There is also a Job extension function cancelAndJoin
that combines cancel and join invocations.
Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable.
All the suspending functions in kotlinx.coroutines
are cancellable. They check for cancellation of
coroutine and throw CancellationException when cancelled. However, if a coroutine is working in
a computation and does not check for cancellation, then it cannot be cancelled, like the following
example shows:
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
Run it to see that it continues to print "I'm sleeping" even after cancellation until the job completes by itself after five iterations.
There are two approaches to making computation code cancellable. The first one is to periodically invoke a suspending function that checks for cancellation. There is a yield function that is a good choice for that purpose. The other one is to explicitly check the cancellation status. Let us try the later approach.
Replace while (i < 5)
in the previous example with while (isActive)
and rerun it.
fun main(args: Array<String>) = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
As you can see, now this loop is cancelled. isActive is an extension property that is available inside the code of coroutine via CoroutineScope object.
Cancellable suspending functions throw CancellationException on cancellation which can be handled in
a usual way. For example, try {...} finally {...}
expression and Kotlin use
function execute their
finalization actions normally when coroutine is cancelled:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
Both join and cancelAndJoin wait for all the finalization actions to complete, so the example above produces the following output:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
main: I'm tired of waiting!
I'm running finally
main: Now I can quit.
Any attempt to use a suspending function in the finally
block of the previous example causes
CancellationException, because the coroutine running this code is cancelled. Usually, this is not a
problem, since all well-behaving closing operations (closing a file, cancelling a job, or closing any kind of a
communication channel) are usually non-blocking and do not involve any suspending functions. However, in the
rare case when you need to suspend in the cancelled coroutine you can wrap the corresponding code in
withContext(NonCancellable) {...}
using withContext function and NonCancellable context as the following example shows:
fun main(args: Array<String>) = runBlocking<Unit> {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
}
You can get full code here
The most obvious reason to cancel coroutine execution in practice is because its execution time has exceeded some timeout. While you can manually track the reference to the corresponding Job and launch a separate coroutine to cancel the tracked one after delay, there is a ready to use withTimeout function that does it. Look at the following example:
fun main(args: Array<String>) = runBlocking<Unit> {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
You can get full code here
It produces the following output:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.experimental.TimeoutCancellationException: Timed out waiting for 1300 MILLISECONDS
The TimeoutCancellationException
that is thrown by withTimeout is a subclass of CancellationException.
We have not seen its stack trace printed on the console before. That is because
inside a cancelled coroutine CancellationException
is considered to be a normal reason for coroutine completion.
However, in this example we have used withTimeout
right inside the main
function.
Because cancellation is just an exception, all the resources are closed in a usual way.
You can wrap the code with timeout in try {...} catch (e: TimeoutCancellationException) {...}
block if
you need to do some additional action specifically on any kind of timeout or use withTimeoutOrNull function
that is similar to withTimeout, but returns null
on timeout instead of throwing an exception:
fun main(args: Array<String>) = runBlocking<Unit> {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
println("Result is $result")
}
You can get full code here
There is no longer an exception when running this code:
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
This section covers various approaches to composition of suspending functions.
Assume that we have two suspending functions defined elsewhere that do something useful like some kind of remote service call or computation. We just pretend they are useful, but actually each one just delays for a second for the purpose of this example:
suspend fun doSomethingUsefulOne(): Int {
delay(1000L) // pretend we are doing something useful here
return 13
}
suspend fun doSomethingUsefulTwo(): Int {
delay(1000L) // pretend we are doing something useful here, too
return 29
}
What do we do if need to invoke them sequentially -- first doSomethingUsefulOne
and then
doSomethingUsefulTwo
and compute the sum of their results?
In practice we do this if we use the results of the first function to make a decision on whether we need
to invoke the second one or to decide on how to invoke it.
We use a normal sequential invocation, because the code in the coroutine, just like in the regular code, is sequential by default. The following example demonstrates it by measuring the total time it takes to execute both suspending functions:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
println("The answer is ${one + two}")
}
println("Completed in $time ms")
}
You can get full code here
It produces something like this:
The answer is 42
Completed in 2017 ms
What if there are no dependencies between invocation of doSomethingUsefulOne
and doSomethingUsefulTwo
and
we want to get the answer faster, by doing both concurrently? This is where async comes to help.
Conceptually, async is just like launch. It starts a separate coroutine which is a light-weight thread
that works concurrently with all the other coroutines. The difference is that launch
returns a Job and
does not carry any resulting value, while async
returns a Deferred -- a light-weight non-blocking future
that represents a promise to provide a result later. You can use .await()
on a deferred value to get its eventual result,
but Deferred
is also a Job
, so you can cancel it if needed.
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
You can get full code here
It produces something like this:
The answer is 42
Completed in 1017 ms
This is twice as fast, because we have concurrent execution of two coroutines. Note, that concurrency with coroutines is always explicit.
There is a laziness option to async using an optional start
parameter with a value of CoroutineStart.LAZY.
It starts coroutine only when its result is needed by some
await or if a start function
is invoked. Run the following example:
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
// some computation
one.start() // start the first one
two.start() // start the second one
println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
You can get full code here
It produces something like this:
The answer is 42
Completed in 1017 ms
So, here the two coroutines are defined but not executed as in the previous example, but the control is given to
the programmer on when exactly to start the execution by calling start. We first
start one
, then start two
, and then await for the individual coroutines to finish.
Note, that if we have called await in println
and omitted start on individual
coroutines, then we would have got the sequential behaviour as await starts the coroutine
execution and waits for the execution to finish, which is not the intended use-case for laziness.
The use-case for async(start = CoroutineStart.LAZY)
is a replacement for the
standard lazy
function in cases when computation of the value involves suspending functions.
We can define async-style functions that invoke doSomethingUsefulOne
and doSomethingUsefulTwo
asynchronously using async coroutine builder with an explicit GlobalScope reference.
We name such functions with
"Async" suffix to highlight the fact that they only start asynchronous computation and one needs
to use the resulting deferred value to get the result.
// The result type of somethingUsefulOneAsync is Deferred<Int>
fun somethingUsefulOneAsync() = GlobalScope.async {
doSomethingUsefulOne()
}
// The result type of somethingUsefulTwoAsync is Deferred<Int>
fun somethingUsefulTwoAsync() = GlobalScope.async {
doSomethingUsefulTwo()
}
Note, that these xxxAsync
functions are not suspending functions. They can be used from anywhere.
However, their use always implies asynchronous (here meaning concurrent) execution of their action
with the invoking code.
The following example shows their use outside of coroutine:
// note, that we don't have `runBlocking` to the right of `main` in this example
fun main(args: Array<String>) {
val time = measureTimeMillis {
// we can initiate async actions outside of a coroutine
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync()
// but waiting for a result must involve either suspending or blocking.
// here we use `runBlocking { ... }` to block the main thread while waiting for the result
runBlocking {
println("The answer is ${one.await() + two.await()}")
}
}
println("Completed in $time ms")
}
You can get full code here
This programming style with async functions is provided here only for illustration, because it is a popular style in other programming languages. Using this style with Kotlin coroutines is strongly discouraged for the reasons that are explained below.
Consider what happens if between val one = somethingUsefulOneAsync()
line and one.await()
expression there is some logic
error in the code and the program throws an exception and the operation that was being performed by the program aborts.
Normally, a global error-handler could catch this exception, log and report the error for developers, but the program
could otherwise continue doing other operations. But here we have somethingUsefulOneAsync
still running in background,
despite the fact, that operation that had initiated it aborts. This problem does not happen with structured
concurrency, as shown in the section below.
Let us take Concurrent using async example and extract a function that
concurrently performs doSomethingUsefulOne
and doSomethingUsefulOne
and returns the sum of their results.
Because async coroutines builder is defined as extension on CoroutineScope we need to have it in the
scope and that is what coroutineScope function provides:
suspend fun concurrentSum(): Int = coroutineScope {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
awaitAll(one, two)
one.await() + two.await()
}
This way, if something goes wrong inside the code of concurrentSum
function and it throws an exception,
all the coroutines that were launched in its scope are cancelled.
fun main(args: Array<String>) = runBlocking<Unit> {
val time = measureTimeMillis {
println("The answer is ${concurrentSum()}")
}
println("Completed in $time ms")
}
You can get full code here
We still have concurrent execution of both operations as evident from the output of the above main function:
The answer is 42
Completed in 1017 ms
Cancellation is always propagated through coroutines hierarchy:
fun main(args: Array<String>) = runBlocking<Unit> {
try {
failedConcurrentSum()
} catch(e: ArithmeticException) {
println("Computation failed with ArithmeticException")
}
}
suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async<Int> {
try {
delay(Long.MAX_VALUE) // Emulates very long computation
42
} finally {
println("First child was cancelled")
}
}
val two = async<Int> {
println("Second child throws an exception")
throw ArithmeticException()
}
awaitAll(one, two)
one.await() + two.await()
}
You can get full code here
Note, how both first async
and awaiting parent are cancelled on the one child failure:
Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException
Coroutines always execute in some context which is represented by the value of CoroutineContext type, defined in the Kotlin standard library.
The coroutine context is a set of various elements. The main elements are the Job of the coroutine, which we've seen before, and its dispatcher, which is covered in this section.
Coroutine context includes a coroutine dispatcher (see CoroutineDispatcher) that determines what thread or threads the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution to a specific thread, dispatch it to a thread pool, or let it run unconfined.
All coroutines builders like launch and async accept an optional CoroutineContext parameter that can be used to explicitly specify the dispatcher for new coroutine and other context elements.
Try the following example:
fun main(args: Array<String>) = runBlocking<Unit> {
launch { // context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
}
You can get full code here
It produces the following output (maybe in different order):
Unconfined : I'm working in thread main
Default : I'm working in thread CommonPool-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking : I'm working in thread main
When launch { ... }
is used without parameters, it inherits the context (and thus dispatcher)
from the CoroutineScope that it is being launched from. In this case, it inherits the
context of the main runBlocking
coroutine which runs in the main
thread.
Dispatchers.Unconfined is a special dispatcher that also appears to run in the main
thread, but it is,
in fact, a different mechanism that is explained later.
The default dispatcher, that is used when coroutines are launched in GlobalScope,
is represented by Dispatchers.Default and uses shared background pool of threads,
so launch(Dispatchers.Default) { ... }
uses the same dispatcher as GlobalScope.launch { ... }
.
newSingleThreadContext creates a new thread for the coroutine to run. A dedicated thread is a very expensive resource. In a real application it must be either released, when no longer needed, using close function, or stored in a top-level variable and reused throughout the application.
The Dispatchers.Unconfined coroutine dispatcher starts coroutine in the caller thread, but only until the first suspension point. After suspension it resumes in the thread that is fully determined by the suspending function that was invoked. Unconfined dispatcher is appropriate when coroutine does not consume CPU time nor updates any shared data (like UI) that is confined to a specific thread.
On the other side, by default, a dispatcher for the outer CoroutineScope is inherited. The default dispatcher for runBlocking coroutine, in particular, is confined to the invoker thread, so inheriting it has the effect of confining execution to this thread with a predictable FIFO scheduling.
fun main(args: Array<String>) = runBlocking<Unit> {
launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(500)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
launch { // context of the parent, main runBlocking coroutine
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
delay(1000)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
}
You can get full code here
Produces the output:
Unconfined : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main
So, the coroutine that had inherited context of runBlocking {...}
continues to execute
in the main
thread, while the unconfined one had resumed in the default executor thread that delay
function is using.
Unconfined dispatcher is an advanced mechanism that can be helpful in certain corner cases where dispatching of coroutine for its execution later is not needed or produces undesirable side-effects, because some operation in a coroutine must be performed right away. Unconfined dispatcher should not be used in general code.
Coroutines can suspend on one thread and resume on another thread.
Even with a single-threaded dispatcher it might be hard to
figure out what coroutine was doing, where, and when. The common approach to debugging applications with
threads is to print the thread name in the log file on each log statement. This feature is universally supported
by logging frameworks. When using coroutines, the thread name alone does not give much of a context, so
kotlinx.coroutines
includes debugging facilities to make it easier.
Run the following code with -Dkotlinx.coroutines.debug
JVM option:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking<Unit> {
val a = async {
log("I'm computing a piece of the answer")
6
}
val b = async {
log("I'm computing another piece of the answer")
7
}
log("The answer is ${a.await() * b.await()}")
}
You can get full code here
There are three coroutines. The main coroutine (#1) -- runBlocking
one,
and two coroutines computing deferred values a
(#2) and b
(#3).
They are all executing in the context of runBlocking
and are confined to the main thread.
The output of this code is:
[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42
The log
function prints the name of the thread in square brackets and you can see, that it is the main
thread, but the identifier of the currently executing coroutine is appended to it. This identifier
is consecutively assigned to all created coroutines when debugging mode is turned on.
You can read more about debugging facilities in the documentation for newCoroutineContext function.
Run the following code with -Dkotlinx.coroutines.debug
JVM option (see debug):
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1")
withContext(ctx2) {
log("Working in ctx2")
}
log("Back to ctx1")
}
}
}
}
You can get full code here
It demonstrates several new techniques. One is using runBlocking with an explicitly specified context, and the other one is using withContext function to change a context of a coroutine while still staying in the same coroutine as you can see in the output below:
[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1
Note, that this example also uses use
function from the Kotlin standard library to release threads that
are created with newSingleThreadContext when they are no longer needed.
The coroutine's Job is part of its context. The coroutine can retrieve it from its own context
using coroutineContext[Job]
expression:
fun main(args: Array<String>) = runBlocking<Unit> {
println("My job is ${coroutineContext[Job]}")
}
You can get full code here
It produces something like that when running in debug mode:
My job is "coroutine#1":BlockingCoroutine{Active}@6d311334
Note, that isActive in CoroutineScope is just a convenient shortcut for
coroutineContext[Job]?.isActive == true
.
When a coroutine is launched in the CoroutineScope of another coroutine, it inherits its context via CoroutineScope.coroutineContext and the Job of the new coroutine becomes a child of the parent coroutine's job. When the parent coroutine is cancelled, all its children are recursively cancelled, too.
However, when GlobalScope is used to launch a coroutine, it is not tied to the scope it was launched from and operates independently.
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
// it spawns two other jobs, one with GlobalScope
GlobalScope.launch {
println("job1: I run in GlobalScope and execute independently!")
delay(1000)
println("job1: I am not affected by cancellation of the request")
}
// and the other inherits the parent context
launch {
delay(100)
println("job2: I am a child of the request coroutine")
delay(1000)
println("job2: I will not execute this line if my parent request is cancelled")
}
}
delay(500)
request.cancel() // cancel processing of the request
delay(1000) // delay a second to see what happens
println("main: Who has survived request cancellation?")
}
You can get full code here
The output of this code is:
job1: I run in GlobalScope and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?
A parent coroutine always waits for completion of all its children. Parent does not have to explicitly track all the children it launches and it does not have to use Job.join to wait for them at the end:
fun main(args: Array<String>) = runBlocking<Unit> {
// launch a coroutine to process some kind of incoming request
val request = launch {
repeat(3) { i -> // launch a few children jobs
launch {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
println("Coroutine $i is done")
}
}
println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")
}
You can get full code here
The result is going to be:
request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete
Automatically assigned ids are good when coroutines log often and you just need to correlate log records coming from the same coroutine. However, when coroutine is tied to the processing of a specific request or doing some specific background task, it is better to name it explicitly for debugging purposes. CoroutineName context element serves the same function as a thread name. It'll get displayed in the thread name that is executing this coroutine when debugging mode is turned on.
The following example demonstrates this concept:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun main(args: Array<String>) = runBlocking(CoroutineName("main")) {
log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1")
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2")
6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}
You can get full code here
The output it produces with -Dkotlinx.coroutines.debug
JVM option is similar to:
[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42
Sometimes we need to define multiple elements for coroutine context. We can use +
operator for that.
For example, we can launch a coroutine with an explicitly specified dispatcher and an explicitly specified
name at the same time:
fun main(args: Array<String>) = runBlocking<Unit> {
launch(Dispatchers.Default + CoroutineName("test")) {
println("I'm working in thread ${Thread.currentThread().name}")
}
}
You can get full code here
The output of this code with -Dkotlinx.coroutines.debug
JVM option is:
I'm working in thread CommonPool-worker-1 @test#2
Let us put our knowledge about contexts, children and jobs together. Assume that our application has an object with a lifecycle, but that object is not a coroutine. For example, we are writing an Android application and launch various coroutines in the context of an Android activity to perform asynchronous operations to fetch and update data, do animations, etc. All of these coroutines must be cancelled when activity is destroyed to avoid memory leaks.
We manage a lifecycle of our coroutines by creating an instance of Job that is tied to the lifecycle of our activity. A job instance is created using Job() factory function when activity is created and it is cancelled when an activity is destroyed like this:
class Activity : CoroutineScope {
lateinit var job: Job
fun create() {
job = Job()
}
fun destroy() {
job.cancel()
}
// to be continued ...
We also implement CoroutineScope interface in this Actvity
class. We only need to provide an override
for its CoroutineScope.coroutineContext property to specify the context for coroutines launched in its
scope. We combine the desired dispatcher (we used Dispatchers.Default in this example) and a job:
// class Activity continues
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + job
// to be continued ...
Now, we can launch coroutines in the scope of this Activity
without having to explicitly
specify their context. For the demo, we launch ten coroutines that delay for a different time:
// class Activity continues
fun doSomething() {
// launch ten coroutines for a demo, each working for a different time
repeat(10) { i ->
launch {
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
}
}
} // class Activity ends
In our main function we create activity, call our test doSomething
function, and destroy activity after 500ms.
This cancels all the coroutines that were launched which we can confirm by noting that it does not print
onto the screen anymore if we wait:
fun main(args: Array<String>) = runBlocking<Unit> {
val activity = Activity()
activity.create() // create an activity
activity.doSomething() // run test function
println("Launched coroutines")
delay(500L) // delay for half a second
println("Destroying activity!")
activity.destroy() // cancels all coroutines
delay(1000) // visually confirm that they don't work
}
You can get full code here
The output of this example is:
Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!
As you can see, only the first two coroutines had printed a message and the others were cancelled
by a single invocation of job.cancel()
in Activity.destroy()
.
Sometimes it is convenient to have an ability to pass some thread-local data, but, for coroutines, which are not bound to any particular thread, it is hard to achieve it manually without writing a lot of boilerplate.
For ThreadLocal
,
asContextElement extension function is here for the rescue. It creates an additional context element,
which keep the value of the given ThreadLocal
and restores it every time the coroutine switches its context.
It is easy to demonstrate it in action:
val threadLocal = ThreadLocal<String?>() // declare thread-local variable
fun main(args: Array<String>) = runBlocking<Unit> {
threadLocal.set("main")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
yield()
println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
job.join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
You can get full code here
In this example we launch new coroutine in a background thread pool using Dispatchers.Default, so
it works on a different threads from a thread pool, but it still has the value of thread local variable,
that we've specified using threadLocal.asContextElement(value = "launch")
,
no matter on what thread the coroutine is executed.
Thus, output (with debug) is:
Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[CommonPool-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[CommonPool-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
ThreadLocal
has first-class support and can be used with any primitive kotlinx.corotuines
provides.
It has one key limitation: when thread-local is mutated, a new value is not propagated to the coroutine caller
(as context element cannot track all ThreadLocal
object accesses) and updated value is lost on the next suspension.
Use withContext to update the value of the thread-local in a coroutine, see asContextElement for more details.
Alternatively, a value can be stored in a mutable box like class Counter(var i: Int)
, which is, in turn,
is stored in a thread-local variable. However, in this case you are fully responsible to synchronize
potentially concurrent modifications to the variable in this mutable box.
For advanced usage, for example for integration with logging MDC, transactional contexts or any other libraries which internally use thread-locals for passing data, see documentation for ThreadContextElement interface that should be implemented.
This section covers exception handling and cancellation on exceptions. We already know that cancelled coroutine throws CancellationException in suspension points and that it is ignored by coroutines machinery. But what happens if an exception is thrown during cancellation or multiple children of the same coroutine throw an exception?
Coroutine builders come in two flavors: propagating exceptions automatically (launch and actor) or
exposing them to users (async and produce).
The former treat exceptions as unhandled, similar to Java's Thread.uncaughExceptionHandler
,
while the latter are relying on the user to consume the final
exception, for example via await or receive
(produce and receive are covered later in Channels section).
It can be demonstrated by a simple example that creates new coroutines in GlobalScope:
fun main(args: Array<String>) = runBlocking {
val job = GlobalScope.launch {
println("Throwing exception from launch")
throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async {
println("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}
You can get full code here
The output of this code is (with debug):
Throwing exception from launch
Exception in thread "ForkJoinPool.commonPool-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException
Joined failed job
Throwing exception from async
Caught ArithmeticException
But what if one does not want to print all exceptions to the console?
CoroutineExceptionHandler context element is used as generic catch
block of coroutine where custom logging or exception handling may take place.
It is similar to using Thread.uncaughtExceptionHandler
.
On JVM it is possible to redefine global exception handler for all coroutines by registering CoroutineExceptionHandler via
ServiceLoader
.
Global exception handler is similar to
Thread.defaultUncaughtExceptionHandler
which is used when no more specific handlers are registered.
On Android, uncaughtExceptionPreHandler
is installed as a global coroutine exception handler.
CoroutineExceptionHandler is invoked only on exceptions which are not expected to be handled by the user, so registering it in async builder and the like of it has no effect.
fun main(args: Array<String>) = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
throw AssertionError()
}
val deferred = GlobalScope.async(handler) {
throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await()
}
joinAll(job, deferred)
}
You can get full code here
The output of this code is:
Caught java.lang.AssertionError
Cancellation is tightly bound with exceptions. Coroutines internally use CancellationException
for cancellation, these
exceptions are ignored by all handlers, so they should be used only as the source of additional debug information, which can
be obtained by catch
block.
When a coroutine is cancelled using Job.cancel without a cause, it terminates, but it does not cancel its parent.
Cancelling without cause is a mechanism for parent to cancel its children without cancelling itself.
fun main(args: Array<String>) = runBlocking {
val job = launch {
val child = launch {
try {
delay(Long.MAX_VALUE)
} finally {
println("Child is cancelled")
}
}
yield()
println("Cancelling child")
child.cancel()
child.join()
yield()
println("Parent is not cancelled")
}
job.join()
}
You can get full code here
The output of this code is:
Cancelling child
Child is cancelled
Parent is not cancelled
If a coroutine encounters exception other than CancellationException
, it cancels its parent with that exception.
This behaviour cannot be overridden and is used to provide stable coroutines hierarchies for
structured concurrency which do not depend on
CoroutineExceptionHandler implementation.
The original exception is handled by the parent when all its children terminate.
This also a reason why, in these examples, CoroutineExceptionHandler is always installed to a coroutine that is created in GlobalScope. It does not make sense to install an exception handler to a coroutine that is launched in the scope of the main runBlocking, since the main coroutine is going to be always cancelled when its child completes with exception despite the installed handler.
fun main(args: Array<String>) = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
launch { // the first child
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
println("Children are cancelled, but exception is not handled until all children terminate")
delay(100)
println("The first child finished its non cancellable block")
}
}
}
launch { // the second child
delay(10)
println("Second child throws an exception")
throw ArithmeticException()
}
}
job.join()
}
You can get full code here
The output of this code is:
Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
Caught java.lang.ArithmeticException
What happens if multiple children of a coroutine throw an exception?
The general rule is "the first exception wins", so the first thrown exception is exposed to the handler.
But that may cause lost exceptions, for example if coroutine throws an exception in its finally
block.
So, additional exceptions are suppressed.
One of the solutions would have been to report each exception separately, but then Deferred.await should have had the same mechanism to avoid behavioural inconsistency and this would cause implementation details of a coroutines (whether it had delegate parts of its work to its children or not) to leak to its exception handler.
fun main(args: Array<String>) = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception with suppressed ${exception.suppressed().contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally {
throw ArithmeticException()
}
}
launch {
throw IOException()
}
delay(Long.MAX_VALUE)
}
job.join()
}
You can get full code here
The output of this code is:
Caught java.io.IOException with suppressed [java.lang.ArithmeticException]
Note, this mechanism currently works only on Java version 1.7+. Limitation on JS and Native is temporary and will be fixed in the future.
Cancellation exceptions are transparent and unwrapped by default:
fun main(args: Array<String>) = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught original $exception")
}
val job = GlobalScope.launch(handler) {
val inner = launch {
launch {
launch {
throw IOException()
}
}
}
try {
inner.join()
} catch (e: JobCancellationException) {
println("Rethrowing JobCancellationException with original cause")
throw e
}
}
job.join()
}
You can get full code here
The output of this code is:
Rethrowing JobCancellationException with original cause
Caught original java.io.IOException
Deferred values provide a convenient way to transfer a single value between coroutines. Channels provide a way to transfer a stream of values.
Channels are an experimental feature of
kotlinx.coroutines
. Their API is expected to evolve in the upcoming updates of thekotlinx.coroutines
library with potentially breaking changes.
A Channel is conceptually very similar to BlockingQueue
. One key difference is that
instead of a blocking put
operation it has a suspending send, and instead of
a blocking take
operation it has a suspending receive.
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
You can get full code here
The output of this code is:
1
4
9
16
25
Done!
Unlike a queue, a channel can be closed to indicate that no more elements are coming.
On the receiver side it is convenient to use a regular for
loop to receive elements
from the channel.
Conceptually, a close is like sending a special close token to the channel. The iteration stops as soon as this close token is received, so there is a guarantee that all previously sent elements before the close are received:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
You can get full code here
The pattern where a coroutine is producing a sequence of elements is quite common. This is a part of producer-consumer pattern that is often found in concurrent code. You could abstract such a producer into a function that takes channel as its parameter, but this goes contrary to common sense that results must be returned from functions.
There is a convenience coroutine builder named produce that makes it easy to do it right on producer side,
and an extension function consumeEach, that replaces a for
loop on the consumer side:
fun CoroutineScope.produceSquares() = produce<Int> {
for (x in 1..5) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
You can get full code here
A pipeline is a pattern where one coroutine is producing, possibly infinite, stream of values:
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
And another coroutine or coroutines are consuming that stream, doing some processing, and producing some other results. In the below example the numbers are just squared:
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) = produce<Int> {
for (x in numbers) send(x * x)
}
The main code starts and connects the whole pipeline:
fun main(args: Array<String>) = runBlocking<Unit> {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
for (i in 1..5) println(squares.receive()) // print first five
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
}
You can get full code here
All functions that create coroutines are defined as extensions on CoroutineScope, so that we can rely on structured concurrency to make sure that we don't have lingering global coroutines in our application.
Let's take pipelines to the extreme with an example that generates prime numbers using a pipeline of coroutines. We start with an infinite sequence of numbers.
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
The following pipeline stage filters an incoming stream of numbers, removing all the numbers that are divisible by the given prime number:
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
Now we build our pipeline by starting a stream of numbers from 2, taking a prime number from the current channel, and launching new pipeline stage for each prime number found:
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
The following example prints the first ten prime numbers, running the whole pipeline in the context of the main thread. Since all the coroutines are launched in the scope of the main runBlocking coroutine we don't have to keep an explicit list of all the coroutines we have started. We use cancelChildren extension function to cancel all the children coroutines after we have printed the first ten prime numbers.
fun main(args: Array<String>) = runBlocking<Unit> {
var cur = numbersFrom(2)
for (i in 1..10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
You can get full code here
The output of this code is:
2
3
5
7
11
13
17
19
23
29
Note, that you can build the same pipeline using
buildIterator
coroutine builder from the standard library.
Replace produce
with buildIterator
, send
with yield
, receive
with next
,
ReceiveChannel
with Iterator
, and get rid of the coroutine scope. You will not need runBlocking
either.
However, the benefit of a pipeline that uses channels as shown above is that it can actually use
multiple CPU cores if you run it in Dispatchers.Default context.
Anyway, this is an extremely impractical way to find prime numbers. In practice, pipelines do involve some
other suspending invocations (like asynchronous calls to remote services) and these pipelines cannot be
built using buildSequence
/buildIterator
, because they do not allow arbitrary suspension, unlike
produce
, which is fully asynchronous.
Multiple coroutines may receive from the same channel, distributing work between themselves. Let us start with a producer coroutine that is periodically producing integers (ten numbers per second):
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
Then we can have several processor coroutines. In this example, they just print their id and received number:
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
Now let us launch five processors and let them work for almost a second. See what happens:
fun main(args: Array<String>) = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
You can get full code here
The output will be similar to the the following one, albeit the processor ids that receive each specific integer may be different:
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
Note, that cancelling a producer coroutine closes its channel, thus eventually terminating iteration over the channel that processor coroutines are doing.
Also, pay attention to how we explicitly iterate over channel with for
loop to perform fan-out in launchProcessor
code.
Unlike consumeEach
, this for
loop pattern is perfectly safe to use from multiple coroutines. If one of the processor
coroutines fails, then others would still be processing the channel, while a processor that is written via consumeEach
always consumes (cancels) the underlying channel on its normal or abnormal completion.
Multiple coroutines may send to the same channel. For example, let us have a channel of strings, and a suspending function that repeatedly sends a specified string to this channel with a specified delay:
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
Now, let us see what happens if we launch a couple of coroutines sending strings (in this example we launch them in the context of the main thread as main coroutine's children):
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
You can get full code here
The output is:
foo
foo
BAR!
foo
foo
BAR!
The channels shown so far had no buffer. Unbuffered channels transfer elements when sender and receiver meet each other (aka rendezvous). If send is invoked first, then it is suspended until receive is invoked, if receive is invoked first, it is suspended until send is invoked.
Both Channel() factory function and produce builder take an optional capacity
parameter to
specify buffer size. Buffer allows senders to send multiple elements before suspending,
similar to the BlockingQueue
with a specified capacity, which blocks when buffer is full.
Take a look at the behavior of the following code:
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
You can get full code here
It prints "sending" five times using a buffered channel with capacity of four:
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4
The first four elements are added to the buffer and the sender suspends when trying to send the fifth one.
Send and receive operations to channels are fair with respect to the order of their invocation from
multiple coroutines. They are served in first-in first-out order, e.g. the first coroutine to invoke receive
gets the element. In the following example two coroutines "ping" and "pong" are
receiving the "ball" object from the shared "table" channel.
data class Ball(var hits: Int)
fun main(args: Array<String>) = runBlocking<Unit> {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
You can get full code here
The "ping" coroutine is started first, so it is the first one to receive the ball. Even though "ping" coroutine immediately starts receiving the ball again after sending it back to the table, the ball gets received by the "pong" coroutine, because it was already waiting for it:
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
Note, that sometimes channels may produce executions that look unfair due to the nature of the executor that is being used. See this issue for details.
Ticker channel is a special rendezvous channel that produces Unit
every time given delay passes since last consumption from this channel.
Though it may seem to be useless standalone, it is a useful building block to create complex time-based produce
pipelines and operators that do windowing and other time-dependent processing.
Ticker channel can be used in select to perform "on tick" action.
To create such channel use a factory method ticker. To indicate that no further elements are needed use ReceiveChannel.cancel method on it.
Now let's see how it works in practice:
fun main(args: Array<String>) = runBlocking<Unit> {
val tickerChannel = ticker(delay = 100, initialDelay = 0) // create ticker channel
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
println("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
You can get full code here
It prints following lines:
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
Note that ticker is aware of possible consumer pauses and, by default, adjusts next produced element delay if a pause occurs, trying to maintain a fixed rate of produced elements.
Optionally, a mode
parameter equal to TickerMode.FIXED_DELAY can be specified to maintain a fixed
delay between elements.
Coroutines can be executed concurrently using a multi-threaded dispatcher like the Dispatchers.Default. It presents all the usual concurrency problems. The main problem being synchronization of access to shared mutable state. Some solutions to this problem in the land of coroutines are similar to the solutions in the multi-threaded world, but others are unique.
Let us launch a hundred coroutines all doing the same action thousand times. We'll also measure their completion time for further comparisons:
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
We start with a very simple action that increments a shared mutable variable using multi-threaded Dispatchers.Default that is used in GlobalScope.
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
GlobalScope.massiveRun {
counter++
}
println("Counter = $counter")
}
You can get full code here
What does it print at the end? It is highly unlikely to ever print "Counter = 100000", because a thousand coroutines
increment the counter
concurrently from multiple threads without any synchronization.
Note: if you have an old system with 2 or fewer CPUs, then you will consistently see 100000, because the thread pool is running in only one thread in this case. To reproduce the problem you'll need to make the following change:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
CoroutineScope(mtContext).massiveRun { // use it instead of Dispatchers.Default in this sample and below
counter++
}
println("Counter = $counter")
}
You can get full code here
There is common misconception that making a variable volatile
solves concurrency problem. Let us try it:
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
GlobalScope.massiveRun {
counter++
}
println("Counter = $counter")
}
You can get full code here
This code works slower, but we still don't get "Counter = 100000" at the end, because volatile variables guarantee linearizable (this is a technical term for "atomic") reads and writes to the corresponding variable, but do not provide atomicity of larger actions (increment in our case).
The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized,
linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding
operations that needs to be performed on a shared state.
In the case of a simple counter we can use AtomicInteger
class which has atomic incrementAndGet
operations:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
GlobalScope.massiveRun {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
You can get full code here
This is the fastest solution for this particular problem. It works for plain counters, collections, queues and other standard data structures and basic operations on them. However, it does not easily scale to complex state or to complex operations that do not have ready-to-use thread-safe implementations.
Thread confinement is an approach to the problem of shared mutable state where all access to the particular shared
state is confined to a single thread. It is typically used in UI applications, where all UI state is confined to
the single event-dispatch/application thread. It is easy to apply with coroutines by using a
single-threaded context.
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
GlobalScope.massiveRun { // run each coroutine with DefaultDispathcer
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
You can get full code here
This code works very slowly, because it does fine-grained thread-confinement. Each individual increment switches from multi-threaded Dispatchers.Default context to the single-threaded context using withContext block.
In practice, thread confinement is performed in large chunks, e.g. big pieces of state-updating business logic are confined to the single thread. The following example does it like that, running each coroutine in the single-threaded context to start with. Here we use CoroutineScope() function to convert coroutine context reference to CoroutineScope:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
CoroutineScope(counterContext).massiveRun { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
You can get full code here
This now works much faster and produces correct result.
Mutual exclusion solution to the problem is to protect all modifications of the shared state with a critical section
that is never executed concurrently. In a blocking world you'd typically use synchronized
or ReentrantLock
for that.
Coroutine's alternative is called Mutex. It has lock and unlock functions to
delimit a critical section. The key difference is that Mutex.lock()
is a suspending function. It does not block a thread.
There is also withLock extension function that conveniently represents
mutex.lock(); try { ... } finally { mutex.unlock() }
pattern:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
GlobalScope.massiveRun {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
You can get full code here
The locking in this example is fine-grained, so it pays the price. However, it is a good choice for some situations where you absolutely must modify some shared state periodically, but there is no natural thread that this state is confined to.
An actor is an entity made up of a combination of a coroutine, the state that is confined and encapsulated into this coroutine, and a channel to communicate with other coroutines. A simple actor can be written as a function, but an actor with a complex state is better suited for a class.
There is an actor coroutine builder that conveniently combines actor's mailbox channel into its scope to receive messages from and combines the send channel into the resulting job object, so that a single reference to the actor can be carried around as its handle.
The first step of using an actor is to define a class of messages that an actor is going to process.
Kotlin's sealed classes are well suited for that purpose.
We define CounterMsg
sealed class with IncCounter
message to increment a counter and GetCounter
message
to get its value. The later needs to send a response. A CompletableDeferred communication
primitive, that represents a single value that will be known (communicated) in the future,
is used here for that purpose.
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
Then we define a function that launches an actor using an actor coroutine builder:
// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
The main code is straightforward:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
GlobalScope.massiveRun {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
You can get full code here
It does not matter (for correctness) what context the actor itself is executed in. An actor is a coroutine and a coroutine is executed sequentially, so confinement of the state to the specific coroutine works as a solution to the problem of shared mutable state. Indeed, actors may modify their own private state, but can only affect each other through messages (avoiding the need for any locks).
Actor is more efficient than locking under load, because in this case it always has work to do and it does not have to switch to a different context at all.
Note, that an actor coroutine builder is a dual of produce coroutine builder. An actor is associated with the channel that it receives messages from, while a producer is associated with the channel that it sends elements to.
Select expression makes it possible to await multiple suspending functions simultaneously and select the first one that becomes available.
Select expressions are an experimental feature of
kotlinx.coroutines
. Their API is expected to evolve in the upcoming updates of thekotlinx.coroutines
library with potentially breaking changes.
Let us have two producers of strings: fizz
and buzz
. The fizz
produces "Fizz" string every 300 ms:
fun CoroutineScope.fizz() = produce<String> {
while (true) { // sends "Fizz" every 300 ms
delay(300)
send("Fizz")
}
}
And the buzz
produces "Buzz!" string every 500 ms:
fun CoroutineScope.buzz() = produce<String> {
while (true) { // sends "Buzz!" every 500 ms
delay(500)
send("Buzz!")
}
}
Using receive suspending function we can receive either from one channel or the other. But select expression allows us to receive from both simultaneously using its onReceive clauses:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}
Let us run it all seven times:
fun main(args: Array<String>) = runBlocking<Unit> {
val fizz = fizz()
val buzz = buzz()
repeat(7) {
selectFizzBuzz(fizz, buzz)
}
coroutineContext.cancelChildren() // cancel fizz & buzz coroutines
}
You can get full code here
The result of this code is:
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
buzz -> 'Buzz!'
The onReceive clause in select
fails when the channel is closed causing the corresponding
select
to throw an exception. We can use onReceiveOrNull clause to perform a
specific action when the channel is closed. The following example also shows that select
is an expression that returns
the result of its selected clause:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
select<String> {
a.onReceiveOrNull { value ->
if (value == null)
"Channel 'a' is closed"
else
"a -> '$value'"
}
b.onReceiveOrNull { value ->
if (value == null)
"Channel 'b' is closed"
else
"b -> '$value'"
}
}
Let's use it with channel a
that produces "Hello" string four times and
channel b
that produces "World" four times:
fun main(args: Array<String>) = runBlocking<Unit> {
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(8) { // print first eight results
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
You can get full code here
The result of this code is quite interesting, so we'll analyze it in mode detail:
a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed
There are couple of observations to make out of it.
First of all, select
is biased to the first clause. When several clauses are selectable at the same time,
the first one among them gets selected. Here, both channels are constantly producing strings, so a
channel,
being the first clause in select, wins. However, because we are using unbuffered channel, the a
gets suspended from
time to time on its send invocation and gives a chance for b
to send, too.
The second observation, is that onReceiveOrNull gets immediately selected when the channel is already closed.
Select expression has onSend clause that can be used for a great good in combination with a biased nature of selection.
Let us write an example of producer of integers that sends its values to a side
channel when
the consumers on its primary channel cannot keep up with it:
fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
for (num in 1..10) { // produce 10 numbers from 1 to 10
delay(100) // every 100 ms
select<Unit> {
onSend(num) {} // Send to the primary channel
side.onSend(num) {} // or to the side channel
}
}
}
Consumer is going to be quite slow, taking 250 ms to process each number:
fun main(args: Array<String>) = runBlocking<Unit> {
val side = Channel<Int>() // allocate side channel
launch { // this is a very fast consumer for the side channel
side.consumeEach { println("Side channel has $it") }
}
produceNumbers(side).consumeEach {
println("Consuming $it")
delay(250) // let us digest the consumed number properly, do not hurry
}
println("Done consuming")
coroutineContext.cancelChildren()
}
You can get full code here
So let us see what happens:
Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming
Deferred values can be selected using onAwait clause. Let us start with an async function that returns a deferred string value after a random delay:
fun CoroutineScope.asyncString(time: Int) = async {
delay(time.toLong())
"Waited for $time ms"
}
Let us start a dozen of them with a random delay.
fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
val random = Random(3)
return List(12) { asyncString(random.nextInt(1000)) }
}
Now the main function awaits for the first of them to complete and counts the number of deferred values
that are still active. Note, that we've used here the fact that select
expression is a Kotlin DSL,
so we can provide clauses for it using an arbitrary code. In this case we iterate over a list
of deferred values to provide onAwait
clause for each deferred value.
fun main(args: Array<String>) = runBlocking<Unit> {
val list = asyncStringsList()
val result = select<String> {
list.withIndex().forEach { (index, deferred) ->
deferred.onAwait { answer ->
"Deferred $index produced answer '$answer'"
}
}
}
println(result)
val countActive = list.count { it.isActive }
println("$countActive coroutines are still active")
}
You can get full code here
The output is:
Deferred 4 produced answer 'Waited for 128 ms'
11 coroutines are still active
Let us write a channel producer function that consumes a channel of deferred string values, waits for each received
deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together
onReceiveOrNull and onAwait clauses in the same select
:
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
var current = input.receive() // start with first received deferred value
while (isActive) { // loop while not cancelled/closed
val next = select<Deferred<String>?> { // return next deferred value from this select or null
input.onReceiveOrNull { update ->
update // replaces next value to wait
}
current.onAwait { value ->
send(value) // send value that current deferred has produced
input.receiveOrNull() // and use the next deferred from the input channel
}
}
if (next == null) {
println("Channel was closed")
break // out of loop
} else {
current = next
}
}
}
To test it, we'll use a simple async function that resolves to a specified string after a specified time:
fun CoroutineScope.asyncString(str: String, time: Long) = async {
delay(time)
str
}
The main function just launches a coroutine to print results of switchMapDeferreds
and sends some test
data to it:
fun main(args: Array<String>) = runBlocking<Unit> {
val chan = Channel<Deferred<String>>() // the channel for test
launch { // launch printing coroutine
for (s in switchMapDeferreds(chan))
println(s) // print each received string
}
chan.send(asyncString("BEGIN", 100))
delay(200) // enough time for "BEGIN" to be produced
chan.send(asyncString("Slow", 500))
delay(100) // not enough time to produce slow
chan.send(asyncString("Replace", 100))
delay(500) // give it time before the last one
chan.send(asyncString("END", 500))
delay(1000) // give it time to process
chan.close() // close the channel ...
delay(500) // and wait some time to let it finish
}
You can get full code here
The result of this code:
BEGIN
Replace
END
Channel was closed