Skip to content

Commit 91b70eb

Browse files
committed
Support yield in immediate dispatchers
So yield now checks for "Unconfined" dispatcher instead of "isDispatchNeeded" and works properly for immediate dispatchers. Fixes #1474
1 parent 7e895fc commit 91b70eb

File tree

7 files changed

+147
-37
lines changed

7 files changed

+147
-37
lines changed

kotlinx-coroutines-core/common/src/Unconfined.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import kotlin.coroutines.*
1111
*/
1212
internal object Unconfined : CoroutineDispatcher() {
1313
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
14-
override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
14+
// Just in case somebody wraps Unconfined dispatcher casing the "dispatch" to be called from "yield"
15+
override fun dispatch(context: CoroutineContext, block: Runnable) = block.run()
1516
override fun toString(): String = "Unconfined"
1617
}

kotlinx-coroutines-core/common/src/Yield.kt

+13-6
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,27 @@ import kotlin.coroutines.*
88
import kotlin.coroutines.intrinsics.*
99

1010
/**
11-
* Yields the thread (or thread pool) of the current coroutine dispatcher to other coroutines to run.
12-
* If the coroutine dispatcher does not have its own thread pool (like [Dispatchers.Unconfined]), this
13-
* function does nothing but check if the coroutine's [Job] was completed.
11+
* Yields the thread (or thread pool) of the current coroutine dispatcher to other coroutines to run if possible.
12+
*
1413
* This suspending function is cancellable.
1514
* If the [Job] of the current coroutine is cancelled or completed when this suspending function is invoked or while
1615
* this function is waiting for dispatch, it resumes with a [CancellationException].
16+
*
17+
* **Note**: This function always [checks for cancellation][ensureActive] even when it does not suspend.
18+
*
19+
* ### Implementation details
20+
*
21+
* If the coroutine dispatcher is [Unconfined][Dispatchers.Unconfined], this
22+
* functions suspends only when there are other unconfined coroutines working and forming an event-loop.
23+
* For other dispatchers, this function does not call [CoroutineDispatcher.isDispatchNeeded] and
24+
* always suspends to be resumed later. If there is no [CoroutineDispatcher] in the context, it does not suspend.
1725
*/
1826
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
1927
val context = uCont.context
2028
context.checkCompletion()
2129
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
22-
if (!cont.dispatcher.isDispatchNeeded(context)) {
23-
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
24-
}
30+
// Special case for the unconfined dispatcher that can yield only in existing unconfined loop
31+
if (cont.dispatcher === Unconfined) return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
2532
cont.dispatchYield(Unit)
2633
COROUTINE_SUSPENDED
2734
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.coroutines.*
8+
import kotlin.test.*
9+
10+
class ImmediateYieldTest : TestBase() {
11+
12+
// See https://github.com/Kotlin/kotlinx.coroutines/issues/1474
13+
@Test
14+
fun testImmediateYield() = runTest {
15+
expect(1)
16+
launch(ImmediateDispatcher(coroutineContext[ContinuationInterceptor])) {
17+
expect(2)
18+
yield()
19+
expect(4)
20+
}
21+
expect(3) // after yield
22+
yield() // yield back
23+
finish(5)
24+
}
25+
26+
// imitate immediate dispatcher
27+
private class ImmediateDispatcher(job: ContinuationInterceptor?) : CoroutineDispatcher() {
28+
val delegate: CoroutineDispatcher = job as CoroutineDispatcher
29+
30+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
31+
32+
override fun dispatch(context: CoroutineContext, block: Runnable) =
33+
delegate.dispatch(context, block)
34+
}
35+
36+
@Test
37+
fun testWrappedUnconfinedDispatcherYield() = runTest {
38+
expect(1)
39+
launch(wrapperDispatcher(Dispatchers.Unconfined)) {
40+
expect(2)
41+
yield() // Would not work with wrapped unconfined dispatcher
42+
expect(3)
43+
}
44+
finish(4) // after launch
45+
}
46+
}

ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt

+14
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,18 @@ class HandlerDispatcherTest : TestBase() {
141141
// TODO compile against API 22+ so this can be invoked without reflection.
142142
private val Message.isAsynchronous: Boolean
143143
get() = Message::class.java.getDeclaredMethod("isAsynchronous").invoke(this) as Boolean
144+
145+
@Test
146+
fun testImmediateDispatcherYield() = runBlocking(Dispatchers.Main) {
147+
expect(1)
148+
// launch in the immediate dispatcher
149+
launch(Dispatchers.Main.immediate) {
150+
expect(2)
151+
yield()
152+
expect(4)
153+
}
154+
expect(3) // after yield
155+
yield() // yield back
156+
finish(5)
157+
}
144158
}

ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt

+35-30
Original file line numberDiff line numberDiff line change
@@ -115,36 +115,41 @@ private class PulseTimer : AnimationTimer() {
115115
}
116116
}
117117

118-
internal fun initPlatform(): Boolean {
119-
/*
120-
* Try to instantiate JavaFx platform in a way which works
121-
* both on Java 8 and Java 11 and does not produce "illegal reflective access":
122-
*
123-
* 1) Try to invoke javafx.application.Platform.startup if this class is
124-
* present in a classpath.
125-
* 2) If it is not successful and does not because it is already started,
126-
* fallback to PlatformImpl.
127-
*
128-
* Ignore exception anyway in case of unexpected changes in API, in that case
129-
* user will have to instantiate it manually.
130-
*/
131-
val runnable = Runnable {}
132-
return runCatching {
133-
// Invoke public API if it is present
134-
Class.forName("javafx.application.Platform")
135-
.getMethod("startup", java.lang.Runnable::class.java)
136-
.invoke(null, runnable)
137-
}.recoverCatching { exception ->
138-
// Recover -> check re-initialization
139-
val cause = exception.cause
140-
if (exception is InvocationTargetException && cause is IllegalStateException
141-
&& "Toolkit already initialized" == cause.message) {
142-
// Toolkit is already initialized -> success, return
143-
Unit
144-
} else { // Fallback to Java 8 API
145-
Class.forName("com.sun.javafx.application.PlatformImpl")
118+
internal fun initPlatform(): Boolean = PlatformInitializer.success
119+
120+
// Lazily try to initialize JavaFx platform just once
121+
private object PlatformInitializer {
122+
val success = run {
123+
/*
124+
* Try to instantiate JavaFx platform in a way which works
125+
* both on Java 8 and Java 11 and does not produce "illegal reflective access":
126+
*
127+
* 1) Try to invoke javafx.application.Platform.startup if this class is
128+
* present in a classpath.
129+
* 2) If it is not successful and does not because it is already started,
130+
* fallback to PlatformImpl.
131+
*
132+
* Ignore exception anyway in case of unexpected changes in API, in that case
133+
* user will have to instantiate it manually.
134+
*/
135+
val runnable = Runnable {}
136+
runCatching {
137+
// Invoke public API if it is present
138+
Class.forName("javafx.application.Platform")
146139
.getMethod("startup", java.lang.Runnable::class.java)
147140
.invoke(null, runnable)
148-
}
149-
}.isSuccess
141+
}.recoverCatching { exception ->
142+
// Recover -> check re-initialization
143+
val cause = exception.cause
144+
if (exception is InvocationTargetException && cause is IllegalStateException
145+
&& "Toolkit already initialized" == cause.message) {
146+
// Toolkit is already initialized -> success, return
147+
Unit
148+
} else { // Fallback to Java 8 API
149+
Class.forName("com.sun.javafx.application.PlatformImpl")
150+
.getMethod("startup", java.lang.Runnable::class.java)
151+
.invoke(null, runnable)
152+
}
153+
}.isSuccess
154+
}
150155
}

ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt

+22
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,26 @@ class JavaFxTest : TestBase() {
3434
finish(4)
3535
}
3636
}
37+
38+
@Test
39+
fun testImmediateDispatcherYield() {
40+
if (!initPlatform()) {
41+
println("Skipping JavaFxTest in headless environment")
42+
return // ignore test in headless environments
43+
}
44+
45+
runBlocking(Dispatchers.JavaFx) {
46+
expect(1)
47+
check(Platform.isFxApplicationThread())
48+
// launch in the immediate dispatcher
49+
launch(Dispatchers.JavaFx.immediate) {
50+
expect(2)
51+
yield()
52+
expect(4)
53+
}
54+
expect(3) // after yield
55+
yield() // yield back
56+
finish(5)
57+
}
58+
}
3759
}

ui/kotlinx-coroutines-swing/test/SwingTest.kt

+15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package kotlinx.coroutines.swing
66

7+
import javafx.application.*
78
import kotlinx.coroutines.*
89
import org.junit.*
910
import org.junit.Test
@@ -83,4 +84,18 @@ class SwingTest : TestBase() {
8384
private suspend fun join(component: SwingTest.SwingComponent) {
8485
component.coroutineContext[Job]!!.join()
8586
}
87+
88+
@Test
89+
fun testImmediateDispatcherYield() = runBlocking(Dispatchers.Swing) {
90+
expect(1)
91+
// launch in the immediate dispatcher
92+
launch(Dispatchers.Swing.immediate) {
93+
expect(2)
94+
yield()
95+
expect(4)
96+
}
97+
expect(3) // after yield
98+
yield() // yield back
99+
finish(5)
100+
}
86101
}

0 commit comments

Comments
 (0)