diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index d2249bfdd0..59ce6770d9 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines +import kotlinx.coroutines.scheduling.* +import kotlinx.coroutines.scheduling.CoroutineScheduler import java.util.concurrent.locks.* import kotlin.contracts.* import kotlin.coroutines.* @@ -95,6 +97,12 @@ private class BlockingCoroutine( val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break + if (blockedThread is CoroutineScheduler.Worker) { + val queue = blockedThread.localQueue + while (queue.size > 0) { + queue.poll()?.let { blockedThread.scheduler.dispatch(it) } + } + } parkNanos(this, parkNanos) } } finally { // paranoia diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingDispatchLocalTasksTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingDispatchLocalTasksTest.kt new file mode 100644 index 0000000000..7fce423d0f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingDispatchLocalTasksTest.kt @@ -0,0 +1,36 @@ +package kotlinx.coroutines + +import java.util.concurrent.* +import kotlin.coroutines.* +import kotlin.coroutines.intrinsics.* +import kotlin.test.* + +class RunBlockingDispatchLocalTasksTest { + + // A coroutine deadlock occurs if there is a task in the local queue + // of the blocking thread before it is parked by the nested runBlocking + @Test(timeout = 1000) + fun testEmptyLocalTasksBeforePark() { + runBlocking(Dispatchers.IO) { + val latch = CountDownLatch(1) + lateinit var launchContinuation: Continuation + lateinit var runBlockingContinuation: Continuation + CoroutineScope(Dispatchers.IO).launch { + suspendCoroutineUninterceptedOrReturn { + launchContinuation = it + latch.countDown() + COROUTINE_SUSPENDED + } + yield() + runBlockingContinuation.resume(Unit) + } + latch.await() + runBlocking { + suspendCancellableCoroutine { + runBlockingContinuation = it + launchContinuation.resume(Unit) + } + } + } + } +}