Skip to content

Coroutines launch on default dispatcher when they should not #2003

Closed
@maxpert

Description

@maxpert

In one of our production services we had a fixed thread-pool for handling blocking DB calls, and we are using withContext and asCoroutineDispatcher to launch such calls in context of DB threads. These calls have to stick to these thread-pool we have created otherwise they fail:

          val executor = ThreadPoolExecutor(
                affinity,
                MAX_IO_THREADS,
                DEFAULT_TTL_SECONDS_THREAD,
                TimeUnit.SECONDS,
                SynchronousQueue()
            ) { runnable ->
                Thread(runnable).also {
                    it.name = "io-pool-${it.id}"
                    it.isDaemon = true
                    it.priority = Thread.NORM_PRIORITY
                }
            }
          
          val dispatcher = executor.asCoroutineDispatcher()

Notice SynchronousQueue to block spinning off any further threads/calls in when pool is full. Turns out looking at code of ExecutorCoroutineDispatcherBase

override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
}
}
turns out we launch/enqueue or coroutine on DefaultExecutor. This has multiple problems:

  • It breaks the foundational assumption of respecting dispatcher that was supplied, worst part of this is that it's undocumented caveat.
  • It can result in a flurry of undesired threads; ultimately leading to OOM with it's existing behavior. Having an unbound queue or unbound set of threads is exactly what should be avoided at all costs.

Here is a simple example to reproduce this issue locally:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, SynchronousQueue()) { runnable ->
    Thread(runnable).also {
        it.name = "io-pool-${it.id}"
        it.isDaemon = true
        it.priority = Thread.NORM_PRIORITY
    }
}

fun main() {
    runBlocking(executor.asCoroutineDispatcher()) {
        for (i in 1..5) {
            launch {
                println(Thread.currentThread().name)
                Thread.sleep(1000)
            }
        }
    }
}

I believe the subsequent co-routines should be rejected; launching by default on DefaultExecutor is an undesired behavior and breaks fundamental assumption.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions