Skip to content

Amortize the cost of coroutine dispatch using message queue in all JS… #1027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/js/src/CoroutineContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ internal actual fun createDefaultDispatcher(): CoroutineDispatcher = when {
// For details see https://github.com/Kotlin/kotlinx.coroutines/issues/236
// The check for ReactNative is based on https://github.com/facebook/react-native/commit/3c65e62183ce05893be0822da217cb803b121c61
jsTypeOf(navigator) != UNDEFINED && navigator != null && navigator.product == "ReactNative" ->
NodeDispatcher()
NodeDispatcher
// Check if we are running under jsdom. WindowDispatcher doesn't work under jsdom because it accesses MessageEvent#source.
// It is not implemented in jsdom, see https://github.com/jsdom/jsdom/blob/master/Changelog.md
// "It's missing a few semantics, especially around origins, as well as MessageEvent source."
isJsdom() -> NodeDispatcher()
isJsdom() -> NodeDispatcher
// Check if we are in the browser and must use window.postMessage to avoid setTimeout throttling
jsTypeOf(window) != UNDEFINED && window.asDynamic() != null && jsTypeOf(window.asDynamic().addEventListener) != UNDEFINED ->
window.asCoroutineDispatcher()
// Fallback to NodeDispatcher when browser environment is not detected
else -> NodeDispatcher()
else -> NodeDispatcher
}

private fun isJsdom() = jsTypeOf(navigator) != UNDEFINED &&
Expand Down
75 changes: 52 additions & 23 deletions kotlinx-coroutines-core/js/src/JSDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@
package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import org.w3c.dom.*
import kotlin.coroutines.*
import kotlin.js.*

private const val MAX_DELAY = Int.MAX_VALUE.toLong()

private fun delayToInt(timeMillis: Long): Int =
timeMillis.coerceIn(0, MAX_DELAY).toInt()

internal class NodeDispatcher : CoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) {
setTimeout({ block.run() }, 0)
}
internal object NodeDispatcher : CoroutineDispatcher(), Delay {
override fun dispatch(context: CoroutineContext, block: Runnable) = NodeJsMessageQueue.enqueue(block)

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
Expand All @@ -37,48 +36,77 @@ internal class NodeDispatcher : CoroutineDispatcher(), Delay {
}

internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
private val messageName = "dispatchCoroutine"
private val queue = WindowMessageQueue(window)

override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
}

private val queue = object : MessageQueue() {
override fun schedule() {
window.postMessage(messageName, "*")
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
return object : DisposableHandle {
override fun dispose() {
window.clearTimeout(handle)
}
}
}
}

private class WindowMessageQueue(private val window: Window) : MessageQueue() {
private val messageName = "dispatchCoroutine"

init {
window.addEventListener("message", { event: dynamic ->
if (event.source == window && event.data == messageName) {
event.stopPropagation()
queue.process()
process()
}
}, true)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
queue.enqueue(block)
override fun schedule() {
Promise.resolve(Unit).then({ process() })
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
override fun reschedule() {
window.postMessage(messageName, "*")
}
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
return object : DisposableHandle {
override fun dispose() {
window.clearTimeout(handle)
}
}
private object NodeJsMessageQueue : MessageQueue() {
override fun schedule() {
// next tick is even faster than resolve
process.nextTick({ process() })
}

override fun reschedule() {
setTimeout({ process() }, 0)
}
}

/**
* An abstraction over JS scheduling mechanism that leverages micro-batching of [dispatch] blocks without
* paying the cost of JS callbacks scheduling on every dispatch.
*
* Queue uses two scheduling mechanisms:
* 1) [schedule] is used to schedule the initial processing of the message queue.
* JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
* 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
* JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
*
* Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
*/
internal abstract class MessageQueue : ArrayQueue<Runnable>() {
val yieldEvery = 16 // yield to JS event loop after this many processed messages
val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages

private var scheduled = false

abstract fun schedule()

abstract fun reschedule()

fun enqueue(element: Runnable) {
addLast(element)
if (!scheduled) {
Expand All @@ -98,7 +126,7 @@ internal abstract class MessageQueue : ArrayQueue<Runnable>() {
if (isEmpty) {
scheduled = false
} else {
schedule()
reschedule()
}
}
}
Expand All @@ -108,3 +136,4 @@ internal abstract class MessageQueue : ArrayQueue<Runnable>() {
// using them via "window" (which only works in browser)
private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
private external fun clearTimeout(handle: Int = definedExternally)
private external val process: dynamic
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/js/test/MessageQueueTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class MessageQueueTest {
assertFalse(scheduled)
scheduled = true
}

override fun reschedule() {
schedule()
}
}

inner class Box(val i: Int): Runnable {
Expand Down