Description
In one of our production services we had a fixed thread-pool for handling blocking DB calls, and we are using withContext
and asCoroutineDispatcher
to launch such calls in context of DB threads. These calls have to stick to these thread-pool we have created otherwise they fail:
val executor = ThreadPoolExecutor(
affinity,
MAX_IO_THREADS,
DEFAULT_TTL_SECONDS_THREAD,
TimeUnit.SECONDS,
SynchronousQueue()
) { runnable ->
Thread(runnable).also {
it.name = "io-pool-${it.id}"
it.isDaemon = true
it.priority = Thread.NORM_PRIORITY
}
}
val dispatcher = executor.asCoroutineDispatcher()
Notice SynchronousQueue
to block spinning off any further threads/calls in when pool is full. Turns out looking at code of ExecutorCoroutineDispatcherBase
kotlinx.coroutines/kotlinx-coroutines-core/jvm/src/Executors.kt
Lines 80 to 87 in d7de5f5
DefaultExecutor
. This has multiple problems:
- It breaks the foundational assumption of respecting dispatcher that was supplied, worst part of this is that it's undocumented caveat.
- It can result in a flurry of undesired threads; ultimately leading to OOM with it's existing behavior. Having an unbound queue or unbound set of threads is exactly what should be avoided at all costs.
Here is a simple example to reproduce this issue locally:
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, SynchronousQueue()) { runnable ->
Thread(runnable).also {
it.name = "io-pool-${it.id}"
it.isDaemon = true
it.priority = Thread.NORM_PRIORITY
}
}
fun main() {
runBlocking(executor.asCoroutineDispatcher()) {
for (i in 1..5) {
launch {
println(Thread.currentThread().name)
Thread.sleep(1000)
}
}
}
}
I believe the subsequent co-routines should be rejected; launching by default on DefaultExecutor
is an undesired behavior and breaks fundamental assumption.