Skip to content

Implement yield for unconfined dispatchers #750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions common/kotlinx-coroutines-core-common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,40 @@ internal object UndispatchedEventLoop {
@JvmField
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }

inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, block: () -> Unit) {
/**
* Executes given [block] as part of current event loop, updating related to block [continuation]
* mode and state if continuation is not resumed immediately.
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
*/
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
doYield: Boolean = false, block: () -> Unit) : Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
// If we are yielding and queue is empty, we can bail out as part of fast path
if (doYield && eventLoop.queue.isEmpty) {
return false
}

continuation._state = contState
continuation.resumeMode = mode
eventLoop.queue.addLast(continuation)
return
return true
}

runEventLoop(eventLoop, block)
return false
}

fun resumeUndispatched(task: DispatchedTask<*>) {
fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
eventLoop.queue.addLast(task)
return
return true
}

runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
return false
}

inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) {
Expand Down Expand Up @@ -227,6 +241,11 @@ internal interface DispatchedTask<in T> : Runnable {
}
}

internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) {
run()
}

internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
Expand Down
4 changes: 3 additions & 1 deletion common/kotlinx-coroutines-core-common/src/Yield.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { u
val context = uCont.context
context.checkCompletion()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) {
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
}
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ internal class ArrayQueue<T : Any> {
private var elements = arrayOfNulls<Any>(16)
private var head = 0
private var tail = 0
val isEmpty: Boolean get() = head == tail

public fun addLast(element: T) {
elements[tail] = element
Expand Down
43 changes: 42 additions & 1 deletion common/kotlinx-coroutines-core-common/test/UnconfinedTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class UnconfinedTest : TestBase() {
}

@Test
fun enterMultipleTimes() = runTest {
fun testEnterMultipleTimes() = runTest {
launch(Unconfined) {
expect(1)
}
Expand All @@ -70,5 +70,46 @@ class UnconfinedTest : TestBase() {
finish(4)
}

@Test
fun testYield() = runTest {
expect(1)
launch(Dispatchers.Unconfined) {
expect(2)
yield()
launch {
expect(4)
}
expect(3)
yield()
expect(5)
}.join()

finish(6)
}

@Test
fun testCancellationWihYields() = runTest {
expect(1)
GlobalScope.launch(Dispatchers.Unconfined) {
val job = coroutineContext[Job]!!
expect(2)
yield()
GlobalScope.launch(Dispatchers.Unconfined) {
expect(4)
job.cancel()
expect(5)
}
expect(3)

try {
yield()
} finally {
expect(6)
}
}

finish(7)
}

class TestException : Throwable()
}