Skip to content

EventLoop integration and reuse for runBlocking and Unconfined dispatchers #889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 21, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
public abstract fun dispose ()V
}

public final class kotlinx/coroutines/EventLoopKt {
public static final fun processNextEventInCurrentThread ()J
}

public abstract class kotlinx/coroutines/ExecutorCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, java/io/Closeable {
public fun <init> ()V
public abstract fun close ()V
Expand Down
139 changes: 75 additions & 64 deletions common/kotlinx-coroutines-core-common/src/Dispatched.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,70 +12,64 @@ import kotlin.jvm.*
@SharedImmutable
private val UNDEFINED = Symbol("UNDEFINED")

@NativeThreadLocal
internal object UndispatchedEventLoop {
data class EventLoop(
@JvmField var isActive: Boolean = false,
@JvmField val queue: ArrayQueue<Runnable> = ArrayQueue()
)

@JvmField
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }

/**
* Executes given [block] as part of current event loop, updating related to block [continuation]
* mode and state if continuation is not resumed immediately.
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
*/
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
doYield: Boolean = false, block: () -> Unit) : Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
// If we are yielding and queue is empty, we can bail out as part of fast path
if (doYield && eventLoop.queue.isEmpty) {
return false
}

continuation._state = contState
continuation.resumeMode = mode
eventLoop.queue.addLast(continuation)
return true
}

runEventLoop(eventLoop, block)
return false
/**
* Executes given [block] as part of current event loop, updating related to block [continuation]
* mode and state if continuation is not resumed immediately.
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
*/
private inline fun executeUnconfined(
continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
doYield: Boolean = false, block: () -> Unit
) : Boolean {
val eventLoop = ThreadLocalEventLoop.eventLoop
// If we are yielding and unconfined queue is empty, we can bail out as part of fast path
if (doYield && eventLoop.isEmptyUnconfinedQueue) return false
return if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
continuation._state = contState
continuation.resumeMode = mode
eventLoop.dispatchUnconfined(continuation)
true // queued into the active loop
} else {
// Was not active -- run event loop until unconfined tasks are executed
runUnconfinedEventLoop(eventLoop, block = block)
false
}
}

fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
val eventLoop = threadLocalEventLoop.get()
if (eventLoop.isActive) {
eventLoop.queue.addLast(task)
return true
private fun resumeUnconfined(task: DispatchedTask<*>) {
val eventLoop = ThreadLocalEventLoop.eventLoop
if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
eventLoop.dispatchUnconfined(task)
} else {
// Was not active -- run event loop until unconfined tasks are executed
runUnconfinedEventLoop(eventLoop) {
task.resume(task.delegate, MODE_UNDISPATCHED)
}

runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
return false
}
}

inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) {
try {
eventLoop.isActive = true
block()
while (true) {
val nextEvent = eventLoop.queue.removeFirstOrNull() ?: return
nextEvent.run()
}
} catch (e: Throwable) {
/*
* This exception doesn't happen normally, only if user either submitted throwing runnable
* or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial.
*/
eventLoop.queue.clear()
throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e)
} finally {
eventLoop.isActive = false
private inline fun runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
eventLoop.incrementUseCount(unconfined = true)
try {
block()
while (eventLoop.processNextEvent() <= 0) {
// break when all unconfined continuations where executed
if (eventLoop.isEmptyUnconfinedQueue) break
}
} catch (e: Throwable) {
/*
* This exception doesn't happen normally, only if user either submitted throwing runnable
* or if we have a bug in implementation. Throw an exception that better explains the problem.
*/
throw DispatchException("Unexpected exception in unconfined event loop", e)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
}

Expand Down Expand Up @@ -109,7 +103,7 @@ internal class DispatchedContinuation<in T>(
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) {
executeUnconfined(this, state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
Expand All @@ -124,7 +118,7 @@ internal class DispatchedContinuation<in T>(
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
executeUnconfined(this, value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
Expand All @@ -141,7 +135,7 @@ internal class DispatchedContinuation<in T>(
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, state, MODE_CANCELLABLE) {
executeUnconfined(this, state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWithException(exception)
}
Expand Down Expand Up @@ -206,9 +200,26 @@ internal fun <T> Continuation<T>.resumeDirectWithException(exception: Throwable)
else -> resumeWithStackTrace(exception)
}

private const val UNCONFINED_TASK_BIT = 1 shl 31

internal abstract class DispatchedTask<in T>(
@JvmField var resumeMode: Int
resumeMode: Int
) : SchedulerTask() {
private var _resumeMode: Int = resumeMode // can have UNCONFINED_TASK_BIT set

public var resumeMode: Int
get() = _resumeMode and UNCONFINED_TASK_BIT.inv()
set(value) { _resumeMode = value }

/**
* Set to `true` when this task comes from [Dispatchers.Unconfined] or from another dispatcher
* that returned `false` from [CoroutineDispatcher.isDispatchNeeded],
* but there was event loop running, so it was submitted into that event loop.
*/
public var isUnconfinedTask: Boolean
get() = _resumeMode and UNCONFINED_TASK_BIT != 0
set(value) { _resumeMode = if (value) resumeMode or UNCONFINED_TASK_BIT else resumeMode }

public abstract val delegate: Continuation<T>

public abstract fun takeState(): Any?
Expand Down Expand Up @@ -248,7 +259,7 @@ internal abstract class DispatchedTask<in T>(
}

internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) {
executeUnconfined(this, Unit, MODE_CANCELLABLE, doYield = true) {
run()
}

Expand All @@ -261,7 +272,7 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.resumeUndispatched(this)
resumeUnconfined(this)
}
} else {
resume(delegate, mode)
Expand Down
131 changes: 131 additions & 0 deletions common/kotlinx-coroutines-core-common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlin.coroutines.*

/**
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
*
* It may optionally implement [Delay] interface and support time-scheduled tasks.
* It is created or pigged back onto (see [ThreadLocalEventLoop])
* by [runBlocking] and by [Dispatchers.Unconfined].
*
* @suppress **This an internal API and should not be used from general code.**
*/
internal abstract class EventLoop : CoroutineDispatcher() {
/**
* Processes next event in this event loop.
*
* The result of this function is to be interpreted like this:
* * `<= 0` -- there are potentially more events for immediate processing;
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
*/
public abstract fun processNextEvent(): Long

public abstract val isEmpty: Boolean

/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
* into the current event loop.
*/
public fun dispatchUnconfined(task: DispatchedTask<*>) {
task.isUnconfinedTask = true
check(enqueue(task)) { "Attempting to dispatchUnconfined into the EventLoop that was shut down"}
queuedUnconfinedTasks++
}

public override fun dispatch(context: CoroutineContext, block: Runnable) {
if (block is DispatchedTask<*>) block.isUnconfinedTask = false
enqueue(block)
}

// returns true if it was successfully enqueued for execution in this event loop, false if got to default executor
public abstract fun enqueue(task: Runnable): Boolean

protected fun runBlock(block: Runnable) {
try {
block.run()
} finally {
if (block is DispatchedTask<*> && block.isUnconfinedTask) {
check(--queuedUnconfinedTasks >= 0) { "queuedUnconfinedTasks underflow" }
}
}
}

/**
* Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop.
*/
private var useCount = 0L

/**
* Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so
* this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
* by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
*/
private var shared = false

/**
* Counts a number of currently enqueued (but not executed yet) unconfined tasks.
*/
private var queuedUnconfinedTasks = 0

public val isActive: Boolean
get() = useCount > 0

public val isUnconfinedLoopActive: Boolean
get() = useCount >= increment(unconfined = true)

public val isEmptyUnconfinedQueue: Boolean
get() = queuedUnconfinedTasks == 0

private fun increment(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L

fun incrementUseCount(unconfined: Boolean = false) {
useCount += increment(unconfined)
if (!unconfined) shared = true
}

fun decrementUseCount(unconfined: Boolean = false) {
useCount -= increment(unconfined)
if (useCount > 0) return
check(useCount == 0L) { "Extra decrementUseCount" }
if (shared) {
// shut it down and remove from ThreadLocalEventLoop
shutdown()
} else {
// it was not shared, so it could not have accumulated any other tasks
check(isEmpty) { "EventLoop that was used only by unconfined tasks should be empty" }
}
}

protected open fun shutdown() {}
}

@NativeThreadLocal
internal object ThreadLocalEventLoop {
private val ref = CommonThreadLocal<EventLoop?>()

internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }

internal fun currentOrNull(): EventLoop? =
ref.get()

internal fun resetEventLoop() {
ref.set(null)
}

internal fun setEventLoop(eventLoop: EventLoop) {
ref.set(eventLoop)
}
}

internal expect fun createEventLoop(): EventLoop

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kotlinx.coroutines.internal
@UseExperimental(ExperimentalMultiplatform::class)
internal expect annotation class NativeThreadLocal()

internal expect class CommonThreadLocal<T>(supplier: () -> T) {
internal expect class CommonThreadLocal<T>() {
fun get(): T
fun set(value: T)
}
Loading