Skip to content

EventLoop integration and reuse for runBlocking and Unconfined dispatchers #889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 21, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
public abstract fun dispose ()V
}

public final class kotlinx/coroutines/EventLoopKt {
public static final fun processNextEventInCurrentThread ()J
}

public abstract class kotlinx/coroutines/ExecutorCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, java/io/Closeable {
public fun <init> ()V
public abstract fun close ()V
Expand Down
122 changes: 58 additions & 64 deletions common/kotlinx-coroutines-core-common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,70 +12,64 @@ import kotlin.jvm.*
@SharedImmutable
private val UNDEFINED = Symbol("UNDEFINED")

@NativeThreadLocal
internal object UndispatchedEventLoop {
data class EventLoop(
@JvmField var isActive: Boolean = false,
@JvmField val queue: ArrayQueue<Runnable> = ArrayQueue()
)

@JvmField
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }

/**
* Executes given [block] as part of current event loop, updating related to block [continuation]
* mode and state if continuation is not resumed immediately.
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
*/
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
doYield: Boolean = false, block: () -> Unit) : Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
// If we are yielding and queue is empty, we can bail out as part of fast path
if (doYield && eventLoop.queue.isEmpty) {
return false
}

continuation._state = contState
continuation.resumeMode = mode
eventLoop.queue.addLast(continuation)
return true
}

runEventLoop(eventLoop, block)
return false
/**
* Executes given [block] as part of current event loop, updating current continuation
* mode and state if continuation is not resumed immediately.
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
*/
private inline fun DispatchedContinuation<*>.executeUnconfined(
contState: Any?, mode: Int, doYield: Boolean = false,
block: () -> Unit
) : Boolean {
val eventLoop = ThreadLocalEventLoop.eventLoop
// If we are yielding and unconfined queue is empty, we can bail out as part of fast path
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
return if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
_state = contState
resumeMode = mode
eventLoop.dispatchUnconfined(this)
true // queued into the active loop
} else {
// Was not active -- run event loop until all unconfined tasks are executed
runUnconfinedEventLoop(eventLoop, block = block)
false
}
}

fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
eventLoop.queue.addLast(task)
return true
private fun DispatchedTask<*>.resumeUnconfined() {
val eventLoop = ThreadLocalEventLoop.eventLoop
if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
eventLoop.dispatchUnconfined(this)
} else {
// Was not active -- run event loop until all unconfined tasks are executed
runUnconfinedEventLoop(eventLoop) {
resume(delegate, MODE_UNDISPATCHED)
}

runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
return false
}
}

inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) {
try {
eventLoop.isActive = true
block()
while (true) {
val nextEvent = eventLoop.queue.removeFirstOrNull() ?: return
nextEvent.run()
}
} catch (e: Throwable) {
/*
* This exception doesn't happen normally, only if user either submitted throwing runnable
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
*/
eventLoop.queue.clear()
throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
} finally {
eventLoop.isActive = false
private inline fun runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
eventLoop.incrementUseCount(unconfined = true)
try {
block()
while (eventLoop.processNextEvent() <= 0) {
// break when all unconfined continuations where executed
if (eventLoop.isUnconfinedQueueEmpty) break
}
} catch (e: Throwable) {
/*
* This exception doesn't happen normally, only if user either submitted throwing runnable
* or if we have a bug in implementation. Throw an exception that better explains the problem.
*/
throw DispatchException("Unexpected exception in unconfined event loop", e)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
}

Expand Down Expand Up @@ -109,7 +103,7 @@ internal class DispatchedContinuation<in T>(
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
Expand All @@ -124,7 +118,7 @@ internal class DispatchedContinuation<in T>(
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
executeUnconfined(value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
Expand All @@ -141,7 +135,7 @@ internal class DispatchedContinuation<in T>(
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, state, MODE_CANCELLABLE) {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWithException(exception)
}
Expand Down Expand Up @@ -207,7 +201,7 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
}

internal abstract class DispatchedTask<in T>(
@JvmField var resumeMode: Int
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public abstract val delegate: Continuation<T>

Expand Down Expand Up @@ -248,7 +242,7 @@ internal abstract class DispatchedTask<in T>(
}

internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) {
executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
run()
}

Expand All @@ -261,7 +255,7 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.resumeUndispatched(this)
resumeUnconfined()
}
} else {
resume(delegate, mode)
Expand Down
135 changes: 135 additions & 0 deletions common/kotlinx-coroutines-core-common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.internal.*

/**
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
*
* It may optionally implement [Delay] interface and support time-scheduled tasks.
* It is created or pigged back onto (see [ThreadLocalEventLoop])
* by [runBlocking] and by [Dispatchers.Unconfined].
*
* @suppress **This an internal API and should not be used from general code.**
*/
internal abstract class EventLoop : CoroutineDispatcher() {
/**
* Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop.
*/
private var useCount = 0L

/**
* Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so
* this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
* by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
*/
private var shared = false

/**
* Queue used by [Dispatchers.Unconfined] tasks.
* These tasks are thread-local for performance and take precedence over the rest of the queue.
*/
private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null

/**
* Processes next event in this event loop.
*
* The result of this function is to be interpreted like this:
* * `<= 0` -- there are potentially more events for immediate processing;
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
* * [Long.MAX_VALUE] -- no more events.
*
* **NOTE**: Must be invoked only from the event loop's thread
* (no check for performance reasons, may be added in the future).
*/
public open fun processNextEvent(): Long {
if (!processUnconfinedEvent()) return Long.MAX_VALUE
return nextTime
}

protected open val nextTime: Long
get() {
val queue = unconfinedQueue ?: return Long.MAX_VALUE
return if (queue.isEmpty) Long.MAX_VALUE else 0L
}

protected fun processUnconfinedEvent(): Boolean {
val queue = unconfinedQueue ?: return false
val task = queue.removeFirstOrNull() ?: return false
task.run()
return true
}
/**
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
* By default, event loop implementation is thread-local and should not processed in the context
* (current thread's event loop should be processed instead).
*/
public open fun shouldBeProcessedFromContext(): Boolean = false

/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
* into the current event loop.
*/
public fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?:
ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
queue.addLast(task)
}

public val isActive: Boolean
get() = useCount > 0

public val isUnconfinedLoopActive: Boolean
get() = useCount >= delta(unconfined = true)

// May only be used from the event loop's thread
public val isUnconfinedQueueEmpty: Boolean
get() = unconfinedQueue?.isEmpty ?: false

private fun delta(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L

fun incrementUseCount(unconfined: Boolean = false) {
useCount += delta(unconfined)
if (!unconfined) shared = true
}

fun decrementUseCount(unconfined: Boolean = false) {
useCount -= delta(unconfined)
if (useCount > 0) return
check(useCount == 0L) { "Extra decrementUseCount" }
if (shared) {
// shut it down and remove from ThreadLocalEventLoop
shutdown()
}
}

protected open fun shutdown() {}
}

@NativeThreadLocal
internal object ThreadLocalEventLoop {
private val ref = CommonThreadLocal<EventLoop?>()

internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }

internal fun currentOrNull(): EventLoop? =
ref.get()

internal fun resetEventLoop() {
ref.set(null)
}

internal fun setEventLoop(eventLoop: EventLoop) {
ref.set(eventLoop)
}
}

internal expect fun createEventLoop(): EventLoop

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

package kotlinx.coroutines.internal

internal class ArrayQueue<T : Any> {
internal open class ArrayQueue<T : Any> {
private var elements = arrayOfNulls<Any>(16)
private var head = 0
private var tail = 0

val isEmpty: Boolean get() = head == tail

public fun addLast(element: T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kotlinx.coroutines.internal
@UseExperimental(ExperimentalMultiplatform::class)
internal expect annotation class NativeThreadLocal()

internal expect class CommonThreadLocal<T>(supplier: () -> T) {
internal expect class CommonThreadLocal<T>() {
fun get(): T
fun set(value: T)
}
Loading