Skip to content

Support yield in immediate dispatchers #1667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public abstract class CoroutineDispatcher :
*
* This method should generally be exception-safe. An exception thrown from this method
* may leave the coroutines that use this dispatcher in the inconsistent and hard to debug state.
*
* **Note**: This method must not immediately call [block]. Doing so would result in [StackOverflowError]
* when [yield] is repeatedly called from a loop. However, an implementation that returns `false` from
* [isDispatchNeeded] can delegate this function to `dispatch` method of [Dispatchers.Unconfined], which is
* integrated with [yield] to avoid this problem.
*/
public abstract fun dispatch(context: CoroutineContext, block: Runnable)

Expand Down
26 changes: 25 additions & 1 deletion kotlinx-coroutines-core/common/src/Unconfined.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,36 @@
package kotlinx.coroutines

import kotlin.coroutines.*
import kotlin.jvm.*

/**
* A coroutine dispatcher that is not confined to any specific thread.
*/
internal object Unconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }

override fun dispatch(context: CoroutineContext, block: Runnable) {
// It can only be called by the "yield" function. See also code of "yield" function.
val yieldContext = context[YieldContext]
if (yieldContext != null) {
// report to "yield" that it is an unconfined dispatcher and don't call "block.run()"
yieldContext.dispatcherWasUnconfined = true
return
}
throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
"isDispatchNeeded and dispatch calls.")
}

override fun toString(): String = "Unconfined"
}

/**
* Used to detect calls to [Unconfined.dispatch] from [yield] function.
*/
internal class YieldContext : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<YieldContext>

@JvmField
var dispatcherWasUnconfined = false
}
33 changes: 27 additions & 6 deletions kotlinx-coroutines-core/common/src/Yield.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,42 @@ import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
* Yields the thread (or thread pool) of the current coroutine dispatcher to other coroutines to run.
* If the coroutine dispatcher does not have its own thread pool (like [Dispatchers.Unconfined]), this
* function does nothing but check if the coroutine's [Job] was completed.
* Yields the thread (or thread pool) of the current coroutine dispatcher to other coroutines to run if possible.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed when this suspending function is invoked or while
* this function is waiting for dispatch, it resumes with a [CancellationException].
*
* **Note**: This function always [checks for cancellation][ensureActive] even when it does not suspend.
*
* ### Implementation details
*
* If the coroutine dispatcher is [Unconfined][Dispatchers.Unconfined], this
* functions suspends only when there are other unconfined coroutines working and forming an event-loop.
* For other dispatchers, this function calls [CoroutineDispatcher.dispatch] and
* always suspends to be resumed later regardless of the result of [CoroutineDispatcher.isDispatchNeeded].
* If there is no [CoroutineDispatcher] in the context, it does not suspend.
*/
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val context = uCont.context
context.checkCompletion()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) {
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
if (cont.dispatcher.isDispatchNeeded(context)) {
// this is a regular dispatcher -- do simple dispatchYield
cont.dispatchYield(context, Unit)
} else {
// This is either an "immediate" dispatcher or the Unconfined dispatcher
// This code detects the Unconfined dispatcher even if it was wrapped into another dispatcher
val yieldContext = YieldContext()
cont.dispatchYield(context + yieldContext, Unit)
// Special case for the unconfined dispatcher that can yield only in existing unconfined loop
if (yieldContext.dispatcherWasUnconfined) {
// Means that the Unconfined dispatcher got the call, but did not do anything.
// See also code of "Unconfined.dispatch" function.
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
}
// Otherwise, it was some other dispatcher that successfully dispatched the coroutine
}
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ internal class DispatchedContinuation<in T>(
}

// used by "yield" implementation
internal fun dispatchYield(value: T) {
val context = continuation.context
internal fun dispatchYield(context: CoroutineContext, value: T) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatchYield(context, this)
Expand Down
57 changes: 57 additions & 0 deletions kotlinx-coroutines-core/common/test/ImmediateYieldTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*
import kotlin.test.*

class ImmediateYieldTest : TestBase() {

// See https://github.com/Kotlin/kotlinx.coroutines/issues/1474
@Test
fun testImmediateYield() = runTest {
expect(1)
launch(ImmediateDispatcher(coroutineContext[ContinuationInterceptor])) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}

// imitate immediate dispatcher
private class ImmediateDispatcher(job: ContinuationInterceptor?) : CoroutineDispatcher() {
val delegate: CoroutineDispatcher = job as CoroutineDispatcher

override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

override fun dispatch(context: CoroutineContext, block: Runnable) =
delegate.dispatch(context, block)
}

@Test
fun testWrappedUnconfinedDispatcherYield() = runTest {
expect(1)
launch(wrapperDispatcher(Dispatchers.Unconfined)) {
expect(2)
yield() // Would not work with wrapped unconfined dispatcher
expect(3)
}
finish(4) // after launch
}

@Test
fun testWrappedUnconfinedDispatcherYieldStackOverflow() = runTest {
expect(1)
withContext(wrapperDispatcher(Dispatchers.Unconfined)) {
repeat(100_000) {
yield()
}
}
finish(2)
}
}
5 changes: 3 additions & 2 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ public class RecoverableTestCancellationException(message: String? = null) : Can
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
return object : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) {
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
dispatcher.isDispatchNeeded(context)
override fun dispatch(context: CoroutineContext, block: Runnable) =
dispatcher.dispatch(context, block)
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,18 @@ class HandlerDispatcherTest : TestBase() {
// TODO compile against API 22+ so this can be invoked without reflection.
private val Message.isAsynchronous: Boolean
get() = Message::class.java.getDeclaredMethod("isAsynchronous").invoke(this) as Boolean

@Test
fun testImmediateDispatcherYield() = runBlocking(Dispatchers.Main) {
expect(1)
// launch in the immediate dispatcher
launch(Dispatchers.Main.immediate) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}
}
65 changes: 35 additions & 30 deletions ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,36 +115,41 @@ private class PulseTimer : AnimationTimer() {
}
}

internal fun initPlatform(): Boolean {
/*
* Try to instantiate JavaFx platform in a way which works
* both on Java 8 and Java 11 and does not produce "illegal reflective access":
*
* 1) Try to invoke javafx.application.Platform.startup if this class is
* present in a classpath.
* 2) If it is not successful and does not because it is already started,
* fallback to PlatformImpl.
*
* Ignore exception anyway in case of unexpected changes in API, in that case
* user will have to instantiate it manually.
*/
val runnable = Runnable {}
return runCatching {
// Invoke public API if it is present
Class.forName("javafx.application.Platform")
.getMethod("startup", java.lang.Runnable::class.java)
.invoke(null, runnable)
}.recoverCatching { exception ->
// Recover -> check re-initialization
val cause = exception.cause
if (exception is InvocationTargetException && cause is IllegalStateException
&& "Toolkit already initialized" == cause.message) {
// Toolkit is already initialized -> success, return
Unit
} else { // Fallback to Java 8 API
Class.forName("com.sun.javafx.application.PlatformImpl")
internal fun initPlatform(): Boolean = PlatformInitializer.success

// Lazily try to initialize JavaFx platform just once
private object PlatformInitializer {
val success = run {
/*
* Try to instantiate JavaFx platform in a way which works
* both on Java 8 and Java 11 and does not produce "illegal reflective access":
*
* 1) Try to invoke javafx.application.Platform.startup if this class is
* present in a classpath.
* 2) If it is not successful and does not because it is already started,
* fallback to PlatformImpl.
*
* Ignore exception anyway in case of unexpected changes in API, in that case
* user will have to instantiate it manually.
*/
val runnable = Runnable {}
runCatching {
// Invoke public API if it is present
Class.forName("javafx.application.Platform")
.getMethod("startup", java.lang.Runnable::class.java)
.invoke(null, runnable)
}
}.isSuccess
}.recoverCatching { exception ->
// Recover -> check re-initialization
val cause = exception.cause
if (exception is InvocationTargetException && cause is IllegalStateException
&& "Toolkit already initialized" == cause.message) {
// Toolkit is already initialized -> success, return
Unit
} else { // Fallback to Java 8 API
Class.forName("com.sun.javafx.application.PlatformImpl")
.getMethod("startup", java.lang.Runnable::class.java)
.invoke(null, runnable)
}
}.isSuccess
}
}
22 changes: 22 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,26 @@ class JavaFxTest : TestBase() {
finish(4)
}
}

@Test
fun testImmediateDispatcherYield() {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return // ignore test in headless environments
}

runBlocking(Dispatchers.JavaFx) {
expect(1)
check(Platform.isFxApplicationThread())
// launch in the immediate dispatcher
launch(Dispatchers.JavaFx.immediate) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}
}
}
15 changes: 15 additions & 0 deletions ui/kotlinx-coroutines-swing/test/SwingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.swing

import javafx.application.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
Expand Down Expand Up @@ -83,4 +84,18 @@ class SwingTest : TestBase() {
private suspend fun join(component: SwingTest.SwingComponent) {
component.coroutineContext[Job]!!.join()
}

@Test
fun testImmediateDispatcherYield() = runBlocking(Dispatchers.Swing) {
expect(1)
// launch in the immediate dispatcher
launch(Dispatchers.Swing.immediate) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}
}