diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt index 32ee51f0a2..d7f7a97717 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt @@ -21,26 +21,40 @@ internal object UndispatchedEventLoop { @JvmField internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() } - inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) { + /** + * Executes given [block] as part of current event loop, updating related to block [continuation] + * mode and state if continuation is not resumed immediately. + * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). + * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. + */ + inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, + doYield: Boolean = false, block: () -> Unit) : Boolean { val eventLoop = threadLocalEventLoop.get() if (eventLoop.isActive) { + // If we are yielding and queue is empty, we can bail out as part of fast path + if (doYield && eventLoop.queue.isEmpty) { + return false + } + continuation._state = contState continuation.resumeMode = mode eventLoop.queue.addLast(continuation) - return + return true } runEventLoop(eventLoop, block) + return false } - fun resumeUndispatched(task: DispatchedTask<*>) { + fun resumeUndispatched(task: DispatchedTask<*>): Boolean { val eventLoop = threadLocalEventLoop.get() if (eventLoop.isActive) { eventLoop.queue.addLast(task) - return + return true } runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) }) + return false } inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) { @@ -227,6 +241,11 @@ internal interface DispatchedTask : Runnable { } } +internal fun DispatchedContinuation.yieldUndispatched(): Boolean = + UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) { + run() + } + internal fun DispatchedTask.dispatch(mode: Int = MODE_CANCELLABLE) { val delegate = this.delegate if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { diff --git a/common/kotlinx-coroutines-core-common/src/Yield.kt b/common/kotlinx-coroutines-core-common/src/Yield.kt index 632dcba0b0..78ab27fb87 100644 --- a/common/kotlinx-coroutines-core-common/src/Yield.kt +++ b/common/kotlinx-coroutines-core-common/src/Yield.kt @@ -19,7 +19,9 @@ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { u val context = uCont.context context.checkCompletion() val cont = uCont.intercepted() as? DispatchedContinuation ?: return@sc Unit - if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit + if (!cont.dispatcher.isDispatchNeeded(context)) { + return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit + } cont.dispatchYield(Unit) COROUTINE_SUSPENDED } diff --git a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt index a6bf8f6180..5cfb8e8df1 100644 --- a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt +++ b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt @@ -8,6 +8,7 @@ internal class ArrayQueue { private var elements = arrayOfNulls(16) private var head = 0 private var tail = 0 + val isEmpty: Boolean get() = head == tail public fun addLast(element: T) { elements[tail] = element diff --git a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt index 8866057a09..f37c35657c 100644 --- a/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt +++ b/common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt @@ -54,7 +54,7 @@ class UnconfinedTest : TestBase() { } @Test - fun enterMultipleTimes() = runTest { + fun testEnterMultipleTimes() = runTest { launch(Unconfined) { expect(1) } @@ -70,5 +70,46 @@ class UnconfinedTest : TestBase() { finish(4) } + @Test + fun testYield() = runTest { + expect(1) + launch(Dispatchers.Unconfined) { + expect(2) + yield() + launch { + expect(4) + } + expect(3) + yield() + expect(5) + }.join() + + finish(6) + } + + @Test + fun testCancellationWihYields() = runTest { + expect(1) + GlobalScope.launch(Dispatchers.Unconfined) { + val job = coroutineContext[Job]!! + expect(2) + yield() + GlobalScope.launch(Dispatchers.Unconfined) { + expect(4) + job.cancel() + expect(5) + } + expect(3) + + try { + yield() + } finally { + expect(6) + } + } + + finish(7) + } + class TestException : Throwable() }