From 5a3dd35fd0f1ab699f0eb8481fa0a7426ef8a695 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 12 Dec 2018 23:44:11 +0300 Subject: [PATCH 1/9] EventLoop integration and reuse for runBlocking and Unconfined dispatchers - Event loop that is created by runBlocking or by Unconfined dispatcher is reused across the same thread to prevent blocking of the loop - Semantics of runBlocking and Unconfined are fully retained - DefaultExecutor also registers itself as running event loop and thus cannot be blocked by runBlocking - Consolidates thread-local handling for native - Also fixes thread-local memory leak on JVM (does not use custom class) Fixes #860 --- .../src/Dispatched.kt | 139 ++++++++++-------- .../src/EventLoop.common.kt | 131 +++++++++++++++++ .../src/internal/ThreadLocal.common.kt | 3 +- core/kotlinx-coroutines-core/src/Builders.kt | 56 +++---- .../kotlinx-coroutines-core/src/CommonPool.kt | 12 +- .../src/DefaultExecutor.kt | 25 ++-- core/kotlinx-coroutines-core/src/EventLoop.kt | 104 +++++-------- core/kotlinx-coroutines-core/src/Executors.kt | 11 +- .../src/internal/ThreadLocal.kt | 7 +- .../src/scheduling/Dispatcher.kt | 5 +- .../src/test_/TestCoroutineContext.kt | 16 +- .../test/EventLoopsTest.kt | 71 +++++++++ .../test/VirtualTimeSource.kt | 12 +- .../test/guide/test/TestUtil.kt | 2 +- .../src/EventLoop.kt | 26 ++++ .../src/JSDispatcher.kt | 47 +----- js/kotlinx-coroutines-core-js/src/Window.kt | 1 + .../src/internal/Queue.kt | 51 +++++++ .../src/internal/ThreadLocal.kt | 8 +- .../src/Builders.kt | 57 +++---- .../src/CoroutineContext.kt | 14 +- .../src/EventLoop.kt | 73 +++------ .../src/internal/ThreadLocal.kt | 10 +- 23 files changed, 538 insertions(+), 343 deletions(-) create mode 100644 common/kotlinx-coroutines-core-common/src/EventLoop.common.kt create mode 100644 core/kotlinx-coroutines-core/test/EventLoopsTest.kt create mode 100644 js/kotlinx-coroutines-core-js/src/EventLoop.kt create mode 100644 js/kotlinx-coroutines-core-js/src/internal/Queue.kt diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt index cf9198847d..cd5901de01 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt @@ -12,70 +12,64 @@ import kotlin.jvm.* @SharedImmutable private val UNDEFINED = Symbol("UNDEFINED") -@NativeThreadLocal -internal object UndispatchedEventLoop { - data class EventLoop( - @JvmField var isActive: Boolean = false, - @JvmField val queue: ArrayQueue = ArrayQueue() - ) - - @JvmField - internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() } - - /** - * Executes given [block] as part of current event loop, updating related to block [continuation] - * mode and state if continuation is not resumed immediately. - * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). - * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. - */ - inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, - doYield: Boolean = false, block: () -> Unit) : Boolean { - val eventLoop = threadLocalEventLoop.get() - if (eventLoop.isActive) { - // If we are yielding and queue is empty, we can bail out as part of fast path - if (doYield && eventLoop.queue.isEmpty) { - return false - } - - continuation._state = contState - continuation.resumeMode = mode - eventLoop.queue.addLast(continuation) - return true - } - - runEventLoop(eventLoop, block) - return false +/** + * Executes given [block] as part of current event loop, updating related to block [continuation] + * mode and state if continuation is not resumed immediately. + * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). + * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. + */ +private inline fun executeUnconfined( + continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, + doYield: Boolean = false, block: () -> Unit +) : Boolean { + val eventLoop = ThreadLocalEventLoop.eventLoop + // If we are yielding and unconfined queue is empty, we can bail out as part of fast path + if (doYield && eventLoop.isEmptyUnconfinedQueue) return false + return if (eventLoop.isUnconfinedLoopActive) { + // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow + continuation._state = contState + continuation.resumeMode = mode + eventLoop.dispatchUnconfined(continuation) + true // queued into the active loop + } else { + // Was not active -- run event loop until unconfined tasks are executed + runUnconfinedEventLoop(eventLoop, block = block) + false } +} - fun resumeUndispatched(task: DispatchedTask<*>): Boolean { - val eventLoop = threadLocalEventLoop.get() - if (eventLoop.isActive) { - eventLoop.queue.addLast(task) - return true +private fun resumeUnconfined(task: DispatchedTask<*>) { + val eventLoop = ThreadLocalEventLoop.eventLoop + if (eventLoop.isUnconfinedLoopActive) { + // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow + eventLoop.dispatchUnconfined(task) + } else { + // Was not active -- run event loop until unconfined tasks are executed + runUnconfinedEventLoop(eventLoop) { + task.resume(task.delegate, MODE_UNDISPATCHED) } - - runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) }) - return false } +} - inline fun runEventLoop(eventLoop: EventLoop, block: () -> Unit) { - try { - eventLoop.isActive = true - block() - while (true) { - val nextEvent = eventLoop.queue.removeFirstOrNull() ?: return - nextEvent.run() - } - } catch (e: Throwable) { - /* - * This exception doesn't happen normally, only if user either submitted throwing runnable - * or if we have a bug in implementation. Anyway, reset state of the dispatcher to the initial. - */ - eventLoop.queue.clear() - throw DispatchException("Unexpected exception in undispatched event loop, clearing pending tasks", e) - } finally { - eventLoop.isActive = false +private inline fun runUnconfinedEventLoop( + eventLoop: EventLoop, + block: () -> Unit +) { + eventLoop.incrementUseCount(unconfined = true) + try { + block() + while (eventLoop.processNextEvent() <= 0) { + // break when all unconfined continuations where executed + if (eventLoop.isEmptyUnconfinedQueue) break } + } catch (e: Throwable) { + /* + * This exception doesn't happen normally, only if user either submitted throwing runnable + * or if we have a bug in implementation. Throw an exception that better explains the problem. + */ + throw DispatchException("Unexpected exception in unconfined event loop", e) + } finally { + eventLoop.decrementUseCount(unconfined = true) } } @@ -109,7 +103,7 @@ internal class DispatchedContinuation( resumeMode = MODE_ATOMIC_DEFAULT dispatcher.dispatch(context, this) } else { - UndispatchedEventLoop.execute(this, state, MODE_ATOMIC_DEFAULT) { + executeUnconfined(this, state, MODE_ATOMIC_DEFAULT) { withCoroutineContext(this.context, countOrElement) { continuation.resumeWith(result) } @@ -124,7 +118,7 @@ internal class DispatchedContinuation( resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { - UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) { + executeUnconfined(this, value, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatched(value) } @@ -141,7 +135,7 @@ internal class DispatchedContinuation( resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { - UndispatchedEventLoop.execute(this, state, MODE_CANCELLABLE) { + executeUnconfined(this, state, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatchedWithException(exception) } @@ -206,9 +200,26 @@ internal fun Continuation.resumeDirectWithException(exception: Throwable) else -> resumeWithStackTrace(exception) } +private const val UNCONFINED_TASK_BIT = 1 shl 31 + internal abstract class DispatchedTask( - @JvmField var resumeMode: Int + resumeMode: Int ) : SchedulerTask() { + private var _resumeMode: Int = resumeMode // can have UNCONFINED_TASK_BIT set + + public var resumeMode: Int + get() = _resumeMode and UNCONFINED_TASK_BIT.inv() + set(value) { _resumeMode = value } + + /** + * Set to `true` when this task comes from [Dispatchers.Unconfined] or from another dispatcher + * that returned `false` from [CoroutineDispatcher.isDispatchNeeded], + * but there was event loop running, so it was submitted into that event loop. + */ + public var isUnconfinedTask: Boolean + get() = _resumeMode and UNCONFINED_TASK_BIT != 0 + set(value) { _resumeMode = if (value) resumeMode or UNCONFINED_TASK_BIT else resumeMode } + public abstract val delegate: Continuation public abstract fun takeState(): Any? @@ -248,7 +259,7 @@ internal abstract class DispatchedTask( } internal fun DispatchedContinuation.yieldUndispatched(): Boolean = - UndispatchedEventLoop.execute(this, Unit, MODE_CANCELLABLE, doYield = true) { + executeUnconfined(this, Unit, MODE_CANCELLABLE, doYield = true) { run() } @@ -261,7 +272,7 @@ internal fun DispatchedTask.dispatch(mode: Int = MODE_CANCELLABLE) { if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { - UndispatchedEventLoop.resumeUndispatched(this) + resumeUnconfined(this) } } else { resume(delegate, mode) diff --git a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt new file mode 100644 index 0000000000..9e30fefc7a --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt @@ -0,0 +1,131 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* + +/** + * Extended by [CoroutineDispatcher] implementations that have event loop inside and can + * be asked to process next event from their event queue. + * + * It may optionally implement [Delay] interface and support time-scheduled tasks. + * It is created or pigged back onto (see [ThreadLocalEventLoop]) + * by [runBlocking] and by [Dispatchers.Unconfined]. + * + * @suppress **This an internal API and should not be used from general code.** + */ +internal abstract class EventLoop : CoroutineDispatcher() { + /** + * Processes next event in this event loop. + * + * The result of this function is to be interpreted like this: + * * `<= 0` -- there are potentially more events for immediate processing; + * * `> 0` -- a number of nanoseconds to wait for next scheduled event; + * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread. + */ + public abstract fun processNextEvent(): Long + + public abstract val isEmpty: Boolean + + /** + * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] + * into the current event loop. + */ + public fun dispatchUnconfined(task: DispatchedTask<*>) { + task.isUnconfinedTask = true + check(enqueue(task)) { "Attempting to dispatchUnconfined into the EventLoop that was shut down"} + queuedUnconfinedTasks++ + } + + public override fun dispatch(context: CoroutineContext, block: Runnable) { + if (block is DispatchedTask<*>) block.isUnconfinedTask = false + enqueue(block) + } + + // returns true if it was successfully enqueued for execution in this event loop, false if got to default executor + public abstract fun enqueue(task: Runnable): Boolean + + protected fun runBlock(block: Runnable) { + try { + block.run() + } finally { + if (block is DispatchedTask<*> && block.isUnconfinedTask) { + check(--queuedUnconfinedTasks >= 0) { "queuedUnconfinedTasks underflow" } + } + } + } + + /** + * Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop. + */ + private var useCount = 0L + + /** + * Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so + * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely + * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time. + */ + private var shared = false + + /** + * Counts a number of currently enqueued (but not executed yet) unconfined tasks. + */ + private var queuedUnconfinedTasks = 0 + + public val isActive: Boolean + get() = useCount > 0 + + public val isUnconfinedLoopActive: Boolean + get() = useCount >= increment(unconfined = true) + + public val isEmptyUnconfinedQueue: Boolean + get() = queuedUnconfinedTasks == 0 + + private fun increment(unconfined: Boolean) = + if (unconfined) (1L shl 32) else 1L + + fun incrementUseCount(unconfined: Boolean = false) { + useCount += increment(unconfined) + if (!unconfined) shared = true + } + + fun decrementUseCount(unconfined: Boolean = false) { + useCount -= increment(unconfined) + if (useCount > 0) return + check(useCount == 0L) { "Extra decrementUseCount" } + if (shared) { + // shut it down and remove from ThreadLocalEventLoop + shutdown() + } else { + // it was not shared, so it could not have accumulated any other tasks + check(isEmpty) { "EventLoop that was used only by unconfined tasks should be empty" } + } + } + + protected open fun shutdown() {} +} + +@NativeThreadLocal +internal object ThreadLocalEventLoop { + private val ref = CommonThreadLocal() + + internal val eventLoop: EventLoop + get() = ref.get() ?: createEventLoop().also { ref.set(it) } + + internal fun currentOrNull(): EventLoop? = + ref.get() + + internal fun resetEventLoop() { + ref.set(null) + } + + internal fun setEventLoop(eventLoop: EventLoop) { + ref.set(eventLoop) + } +} + +internal expect fun createEventLoop(): EventLoop + diff --git a/common/kotlinx-coroutines-core-common/src/internal/ThreadLocal.common.kt b/common/kotlinx-coroutines-core-common/src/internal/ThreadLocal.common.kt index 81eb4d725a..ddf29888b2 100644 --- a/common/kotlinx-coroutines-core-common/src/internal/ThreadLocal.common.kt +++ b/common/kotlinx-coroutines-core-common/src/internal/ThreadLocal.common.kt @@ -8,6 +8,7 @@ package kotlinx.coroutines.internal @UseExperimental(ExperimentalMultiplatform::class) internal expect annotation class NativeThreadLocal() -internal expect class CommonThreadLocal(supplier: () -> T) { +internal expect class CommonThreadLocal() { fun get(): T + fun set(value: T) } diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt index e3a7562734..519825ad38 100644 --- a/core/kotlinx-coroutines-core/src/Builders.kt +++ b/core/kotlinx-coroutines-core/src/Builders.kt @@ -35,12 +35,18 @@ import kotlin.coroutines.* public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { val currentThread = Thread.currentThread() val contextInterceptor = context[ContinuationInterceptor] - val privateEventLoop = contextInterceptor == null // create private event loop if no dispatcher is specified - val eventLoop = if (privateEventLoop) BlockingEventLoop(currentThread) else contextInterceptor as? EventLoop - val newContext = GlobalScope.newCoroutineContext( - if (privateEventLoop) context + (eventLoop as ContinuationInterceptor) else context - ) - val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop, privateEventLoop) + val eventLoop: EventLoop? + val newContext: CoroutineContext + if (contextInterceptor == null) { + // create/use private event loop if no dispatcher is specified + eventLoop = ThreadLocalEventLoop.eventLoop.also { + newContext = GlobalScope.newCoroutineContext(context + it) + } + } else { + eventLoop = contextInterceptor as? EventLoop + newContext = GlobalScope.newCoroutineContext(context) + } + val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } @@ -48,13 +54,8 @@ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl private class BlockingCoroutine( parentContext: CoroutineContext, private val blockedThread: Thread, - private val eventLoop: EventLoop?, - private val privateEventLoop: Boolean + private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true) { - init { - if (privateEventLoop) require(eventLoop is BlockingEventLoop) - } - override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) { // wake up blocked thread if (Thread.currentThread() != blockedThread) @@ -64,22 +65,23 @@ private class BlockingCoroutine( @Suppress("UNCHECKED_CAST") fun joinBlocking(): T { timeSource.registerTimeLoopThread() - while (true) { - @Suppress("DEPRECATION") - if (Thread.interrupted()) throw InterruptedException().also { cancel(it) } - val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE - // note: process next even may loose unpark flag, so check if completed before parking - if (isCompleted) break - timeSource.parkNanos(this, parkNanos) - } - // process queued events (that could have been added after last processNextEvent and before cancel - if (privateEventLoop) (eventLoop as BlockingEventLoop).apply { - // We exit the "while" loop above when this coroutine's state "isCompleted", - // Here we should signal that BlockingEventLoop should not accept any more tasks - isCompleted = true - shutdown() + try { + eventLoop?.incrementUseCount() + try { + while (true) { + @Suppress("DEPRECATION") + if (Thread.interrupted()) throw InterruptedException().also { cancel(it) } + val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE + // note: process next even may loose unpark flag, so check if completed before parking + if (isCompleted) break + timeSource.parkNanos(this, parkNanos) + } + } finally { // paranoia + eventLoop?.decrementUseCount() + } + } finally { // paranoia + timeSource.unregisterTimeLoopThread() } - timeSource.unregisterTimeLoopThread() // now return result val state = this.state.unboxState() (state as? CompletedExceptionally)?.let { throw it.cause } diff --git a/core/kotlinx-coroutines-core/src/CommonPool.kt b/core/kotlinx-coroutines-core/src/CommonPool.kt index 75cee921ee..7c8ac7c8eb 100644 --- a/core/kotlinx-coroutines-core/src/CommonPool.kt +++ b/core/kotlinx-coroutines-core/src/CommonPool.kt @@ -98,12 +98,14 @@ internal object CommonPool : ExecutorCoroutineDispatcher() { private fun getOrCreatePoolSync(): Executor = pool ?: createPool().also { pool = it } - override fun dispatch(context: CoroutineContext, block: Runnable) = - try { (pool ?: getOrCreatePoolSync()).execute(timeSource.wrapTask(block)) } - catch (e: RejectedExecutionException) { + override fun dispatch(context: CoroutineContext, block: Runnable) { + try { + (pool ?: getOrCreatePoolSync()).execute(timeSource.wrapTask(block)) + } catch (e: RejectedExecutionException) { timeSource.unTrackTask() - DefaultExecutor.execute(block) + DefaultExecutor.enqueue(block) } + } // used for tests @Synchronized @@ -120,7 +122,7 @@ internal object CommonPool : ExecutorCoroutineDispatcher() { shutdown() if (timeout > 0) awaitTermination(timeout, TimeUnit.MILLISECONDS) - shutdownNow().forEach { DefaultExecutor.execute(it) } + shutdownNow().forEach { DefaultExecutor.enqueue(it) } } pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") } } diff --git a/core/kotlinx-coroutines-core/src/DefaultExecutor.kt b/core/kotlinx-coroutines-core/src/DefaultExecutor.kt index 0b35b1199a..813da13ff5 100644 --- a/core/kotlinx-coroutines-core/src/DefaultExecutor.kt +++ b/core/kotlinx-coroutines-core/src/DefaultExecutor.kt @@ -9,9 +9,12 @@ import java.util.concurrent.* internal actual val DefaultDelay: Delay = DefaultExecutor @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") -internal object DefaultExecutor : EventLoopBase(), Runnable { +internal object DefaultExecutor : EventLoopImplBase(), Runnable { + const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor" - override val isCompleted: Boolean get() = false + init { + incrementUseCount() // this event loop is never completed + } private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds @@ -26,6 +29,9 @@ internal object DefaultExecutor : EventLoopBase(), Runnable { @Volatile private var _thread: Thread? = null + override val thread: Thread + get() = _thread ?: createThreadSync() + private const val FRESH = 0 private const val ACTIVE = 1 private const val SHUTDOWN_REQ = 2 @@ -52,6 +58,7 @@ internal object DefaultExecutor : EventLoopBase(), Runnable { DelayedRunnableTask(timeMillis, block).also { schedule(it) } override fun run() { + ThreadLocalEventLoop.setEventLoop(this) timeSource.registerTimeLoopThread() try { var shutdownNanos = Long.MAX_VALUE @@ -81,27 +88,19 @@ internal object DefaultExecutor : EventLoopBase(), Runnable { acknowledgeShutdownIfNeeded() timeSource.unregisterTimeLoopThread() // recheck if queues are empty after _thread reference was set to null (!!!) - if (!isEmpty) thread() // recreate thread if it is needed + if (!isQueueEmpty) thread // recreate thread if it is needed } } - // ensure that thread is there - private fun thread(): Thread = _thread ?: createThreadSync() - @Synchronized - private fun createThreadSync() = _thread ?: - Thread(this, "kotlinx.coroutines.DefaultExecutor").apply { + private fun createThreadSync(): Thread { + return _thread ?: Thread(this, THREAD_NAME).apply { _thread = this isDaemon = true start() } - - override fun unpark() { - timeSource.unpark(thread()) // as a side effect creates thread if it is not there } - override fun isCorrectThread(): Boolean = true - // used for tests @Synchronized internal fun ensureStarted() { diff --git a/core/kotlinx-coroutines-core/src/EventLoop.kt b/core/kotlinx-coroutines-core/src/EventLoop.kt index bd64d0c69b..792adebf66 100644 --- a/core/kotlinx-coroutines-core/src/EventLoop.kt +++ b/core/kotlinx-coroutines-core/src/EventLoop.kt @@ -6,28 +6,6 @@ package kotlinx.coroutines import kotlinx.atomicfu.* import kotlinx.coroutines.internal.* -import kotlin.coroutines.* - -/** - * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can - * be asked to process next event from their event queue. - * - * It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to - * continue processing events when invoked from the event dispatch thread. - * - * @suppress **This an internal API and should not be used from general code.** - */ -internal interface EventLoop: ContinuationInterceptor { - /** - * Processes next event in this event loop. - * - * The result of this function is to be interpreted like this: - * * `<= 0` -- there are potentially more events for immediate processing; - * * `> 0` -- a number of nanoseconds to wait for next scheduled event; - * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread. - */ - public fun processNextEvent(): Long -} private val DISPOSED_TASK = Symbol("REMOVED_TASK") @@ -53,21 +31,22 @@ private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY") private typealias Queue = LockFreeTaskQueueCore -internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { +internal abstract class EventLoopImplBase: EventLoop(), Delay { // null | CLOSED_EMPTY | task | Queue private val _queue = atomic(null) // Allocated only only once private val _delayed = atomic?>(null) - protected abstract val isCompleted: Boolean - protected abstract fun unpark() - protected abstract fun isCorrectThread(): Boolean + protected abstract val thread: Thread + + @Volatile + private var isCompleted = false - protected val isEmpty: Boolean + override val isEmpty: Boolean get() = isQueueEmpty && isDelayedEmpty - private val isQueueEmpty: Boolean get() { + protected val isQueueEmpty: Boolean get() { val queue = _queue.value return when (queue) { null -> true @@ -95,14 +74,30 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { return (nextDelayedTask.nanoTime - timeSource.nanoTime()).coerceAtLeast(0) } - override fun dispatch(context: CoroutineContext, block: Runnable) = - execute(block) + private fun unpark() { + val thread = thread + if (Thread.currentThread() !== thread) + timeSource.unpark(thread) + } + + override fun shutdown() { + // Clean up thread-local reference here -- this event loop is shutting down + ThreadLocalEventLoop.resetEventLoop() + // We should signal that ThreadEventLoop should not accept any more tasks + // and process queued events (that could have been added after last processNextEvent) + isCompleted = true + closeQueue() + // complete processing of all queued tasks + while (processNextEvent() <= 0) { /* spin */ } + // reschedule the rest of delayed tasks + rescheduleAllDelayed() + } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = schedule(DelayedResumeTask(timeMillis, continuation)) override fun processNextEvent(): Long { - if (!isCorrectThread()) return Long.MAX_VALUE + if (Thread.currentThread() !== thread) return Long.MAX_VALUE // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { @@ -120,18 +115,20 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { } } // then process one event from queue - dequeue()?.run() + dequeue()?.let { runBlock(it) } return nextTime } - internal fun execute(task: Runnable) { + // returns true if it was successfully enqueued for execution in this event loop, false if got to default executor + override fun enqueue(task: Runnable): Boolean = if (enqueueImpl(task)) { // todo: we should unpark only when this delayed task became first in the queue unpark() + true } else { - DefaultExecutor.execute(task) + DefaultExecutor.enqueue(task) + false } - } @Suppress("UNCHECKED_CAST") private fun enqueueImpl(task: Runnable): Boolean { @@ -178,7 +175,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { } } - protected fun closeQueue() { + private fun closeQueue() { assert(isCompleted) _queue.loop { queue -> when (queue) { @@ -221,10 +218,6 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { return delayedTask.schedule(delayed, this) } - internal fun removeDelayedImpl(delayedTask: DelayedTask) { - _delayed.value?.remove(delayedTask) - } - // It performs "hard" shutdown for test cleanup purposes protected fun resetAll() { _queue.value = null @@ -232,7 +225,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { } // This is a "soft" (normal) shutdown - protected fun rescheduleAllDelayed() { + private fun rescheduleAllDelayed() { while (true) { /* * `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not @@ -274,7 +267,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L @Synchronized - fun schedule(delayed: ThreadSafeHeap, eventLoop: EventLoopBase): Int { + fun schedule(delayed: ThreadSafeHeap, eventLoop: EventLoopImplBase): Int { if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed return if (delayed.addLastIf(this) { !eventLoop.isCompleted }) SCHEDULE_OK else SCHEDULE_COMPLETED } @@ -318,27 +311,8 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { } } -internal abstract class ThreadEventLoop( - private val thread: Thread -) : EventLoopBase() { - override fun isCorrectThread(): Boolean = Thread.currentThread() === thread - - override fun unpark() { - if (Thread.currentThread() !== thread) - timeSource.unpark(thread) - } +internal class ThreadEventLoop( + override val thread: Thread +) : EventLoopImplBase() - fun shutdown() { - closeQueue() - assert(isCorrectThread()) - // complete processing of all queued tasks - while (processNextEvent() <= 0) { /* spin */ } - // reschedule the rest of delayed tasks - rescheduleAllDelayed() - } -} - -internal class BlockingEventLoop(thread: Thread) : ThreadEventLoop(thread) { - @Volatile - public override var isCompleted: Boolean = false -} +internal actual fun createEventLoop(): EventLoop = ThreadEventLoop(Thread.currentThread()) \ No newline at end of file diff --git a/core/kotlinx-coroutines-core/src/Executors.kt b/core/kotlinx-coroutines-core/src/Executors.kt index 52c8846825..876b44a3fd 100644 --- a/core/kotlinx-coroutines-core/src/Executors.kt +++ b/core/kotlinx-coroutines-core/src/Executors.kt @@ -5,7 +5,6 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* -import java.io.* import java.io.Closeable import java.util.concurrent.* import kotlin.coroutines.* @@ -60,12 +59,14 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa removesFutureOnCancellation = removeFutureOnCancel(executor) } - override fun dispatch(context: CoroutineContext, block: Runnable) = - try { executor.execute(timeSource.wrapTask(block)) } - catch (e: RejectedExecutionException) { + override fun dispatch(context: CoroutineContext, block: Runnable) { + try { + executor.execute(timeSource.wrapTask(block)) + } catch (e: RejectedExecutionException) { timeSource.unTrackTask() - DefaultExecutor.execute(block) + DefaultExecutor.enqueue(block) } + } /* * removesFutureOnCancellation is required to avoid memory leak. diff --git a/core/kotlinx-coroutines-core/src/internal/ThreadLocal.kt b/core/kotlinx-coroutines-core/src/internal/ThreadLocal.kt index 8de2abc328..39c87bed1c 100644 --- a/core/kotlinx-coroutines-core/src/internal/ThreadLocal.kt +++ b/core/kotlinx-coroutines-core/src/internal/ThreadLocal.kt @@ -6,8 +6,5 @@ package kotlinx.coroutines.internal import java.lang.ThreadLocal -internal actual typealias CommonThreadLocal = ThreadLocalWithInitialValue - -internal class ThreadLocalWithInitialValue(private val supplier: () -> T) : ThreadLocal() { - override fun initialValue(): T = supplier() -} +@Suppress("ACTUAL_WITHOUT_EXPECT") // internal visibility +internal actual typealias CommonThreadLocal = ThreadLocal diff --git a/core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt b/core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt index b7f6b35e31..ea58adcfa5 100644 --- a/core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt +++ b/core/kotlinx-coroutines-core/src/scheduling/Dispatcher.kt @@ -101,13 +101,14 @@ open class ExperimentalCoroutineDispatcher( return LimitingDispatcher(this, parallelism, TaskMode.NON_BLOCKING) } - internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit = + internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) { try { coroutineScheduler.dispatch(block, context, fair) } catch (e: RejectedExecutionException) { // Context shouldn't be lost here to properly invoke before/after task - DefaultExecutor.execute(coroutineScheduler.createTask(block, context)) + DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) } + } private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) diff --git a/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt b/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt index bea9e6203c..4f1d37925b 100644 --- a/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt +++ b/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt @@ -181,7 +181,7 @@ class TestCoroutineContext(private val name: String? = null) : CoroutineContext uncaughtExceptions.clear() } - private fun post(block: Runnable) = + private fun enqueue(block: Runnable) = queue.addLast(TimedRunnable(block, counter++)) private fun postDelayed(block: Runnable, delayTime: Long) = @@ -210,8 +210,18 @@ class TestCoroutineContext(private val name: String? = null) : CoroutineContext public override fun toString(): String = name ?: "TestCoroutineContext@$hexAddress" - private inner class Dispatcher : CoroutineDispatcher(), Delay, EventLoop { - override fun dispatch(context: CoroutineContext, block: Runnable) = post(block) + private inner class Dispatcher : EventLoop(), Delay { + init { + incrementUseCount() // this event loop is never completed + } + + override val isEmpty: Boolean + get() = queue.isEmpty + + override fun enqueue(task: Runnable): Boolean { + this@TestCoroutineContext.enqueue(task) + return true + } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { postDelayed(Runnable { diff --git a/core/kotlinx-coroutines-core/test/EventLoopsTest.kt b/core/kotlinx-coroutines-core/test/EventLoopsTest.kt new file mode 100644 index 0000000000..1a6369b54d --- /dev/null +++ b/core/kotlinx-coroutines-core/test/EventLoopsTest.kt @@ -0,0 +1,71 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +/** + * Tests event loops integration. + * See [https://github.com/Kotlin/kotlinx.coroutines/issues/860]. + */ +class EventLoopsTest : TestBase() { + @Test + fun testNestedRunBlocking() { + runBlocking { // outer event loop + // Produce string "OK" + val ch = produce { send("OK") } + // try receive this string in a blocking way: + assertEquals("OK", runBlocking { ch.receive() }) // it should not hang here + } + } + + @Test + fun testUnconfinedInRunBlocking() { + var completed = false + runBlocking { + launch(Dispatchers.Unconfined) { + completed = true + } + // should not go into runBlocking loop, but complete here + assertTrue(completed) + } + } + + @Test + fun testNestedUnconfined() { + expect(1) + GlobalScope.launch(Dispatchers.Unconfined) { + expect(2) + GlobalScope.launch(Dispatchers.Unconfined) { + // this gets scheduled into outer unconfined loop + expect(4) + } + expect(3) // ^^ executed before the above unconfined + } + finish(5) + } + + @Test + fun testEventLoopInDefaultExecutor() = runTest { + expect(1) + withContext(Dispatchers.Unconfined) { + delay(1) + assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME)) + expect(2) + // now runBlocking inside default executor thread --> should use outer event loop + DefaultExecutor.enqueue(Runnable { + expect(4) // will execute when runBlocking runs loop + }) + expect(3) + runBlocking { + expect(5) + } + } + finish(6) + } +} \ No newline at end of file diff --git a/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt b/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt index 91fb5d99f1..21230629df 100644 --- a/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt +++ b/core/kotlinx-coroutines-core/test/VirtualTimeSource.kt @@ -31,6 +31,7 @@ private class ThreadStatus { var parkedTill = NOT_PARKED @Volatile @JvmField var permit = false + var registered = 0 override fun toString(): String = "parkedTill = ${TimeUnit.NANOSECONDS.toMillis(parkedTill)} ms, permit = $permit" } @@ -79,13 +80,18 @@ internal class VirtualTimeSource( @Synchronized override fun registerTimeLoopThread() { - assert(threads.putIfAbsent(Thread.currentThread(), ThreadStatus()) == null) + val status = threads.getOrPut(Thread.currentThread()) { ThreadStatus() }!! + status.registered++ } @Synchronized override fun unregisterTimeLoopThread() { - assert(threads.remove(Thread.currentThread()) != null) - wakeupAll() + val currentThread = Thread.currentThread() + val status = threads[currentThread]!! + if (--status.registered == 0) { + threads.remove(currentThread) + wakeupAll() + } } override fun parkNanos(blocker: Any, nanos: Long) { diff --git a/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt b/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt index 983f6c5164..786338c5a6 100644 --- a/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt +++ b/core/kotlinx-coroutines-core/test/guide/test/TestUtil.kt @@ -108,7 +108,7 @@ private fun shutdownDispatcherPools(timeout: Long) { (thread.dispatcher.executor as ExecutorService).apply { shutdown() awaitTermination(timeout, TimeUnit.MILLISECONDS) - shutdownNow().forEach { DefaultExecutor.execute(it) } + shutdownNow().forEach { DefaultExecutor.enqueue(it) } } } } diff --git a/js/kotlinx-coroutines-core-js/src/EventLoop.kt b/js/kotlinx-coroutines-core-js/src/EventLoop.kt new file mode 100644 index 0000000000..53fbf07801 --- /dev/null +++ b/js/kotlinx-coroutines-core-js/src/EventLoop.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.internal.* + +internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoop() + +internal class UnconfinedEventLoop : EventLoop() { + private val queue = Queue() + + override val isEmpty: Boolean + get() = queue.isEmpty + + override fun processNextEvent(): Long { + queue.poll()?.run() + return if (queue.isEmpty) Long.MAX_VALUE else 0L + } + + override fun enqueue(task: Runnable): Boolean { + queue.add(task) + return true + } +} diff --git a/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt b/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt index c181131a8a..138eada1b0 100644 --- a/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt +++ b/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.internal.* import kotlin.coroutines.* import org.w3c.dom.* @@ -103,52 +104,6 @@ internal abstract class MessageQueue : Queue() { } } -internal open class Queue { - private var queue = arrayOfNulls(8) - private var head = 0 - private var tail = 0 - - val isEmpty get() = head == tail - - fun poll(): T? { - if (isEmpty) return null - val result = queue[head]!! - queue[head] = null - head = head.next() - @Suppress("UNCHECKED_CAST") - return result as T - } - - tailrec fun add(element: T) { - val newTail = tail.next() - if (newTail == head) { - resize() - add(element) // retry with larger size - return - } - queue[tail] = element - tail = newTail - } - - private fun resize() { - var i = head - var j = 0 - val a = arrayOfNulls(queue.size * 2) - while (i != tail) { - a[j++] = queue[i] - i = i.next() - } - queue = a - head = 0 - tail = j - } - - private fun Int.next(): Int { - val j = this + 1 - return if (j == queue.size) 0 else j - } -} - // We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to // using them via "window" (which only works in browser) private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int diff --git a/js/kotlinx-coroutines-core-js/src/Window.kt b/js/kotlinx-coroutines-core-js/src/Window.kt index 42b476ed40..1b1dc1a95f 100644 --- a/js/kotlinx-coroutines-core-js/src/Window.kt +++ b/js/kotlinx-coroutines-core-js/src/Window.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.internal.* import org.w3c.dom.Window /** diff --git a/js/kotlinx-coroutines-core-js/src/internal/Queue.kt b/js/kotlinx-coroutines-core-js/src/internal/Queue.kt new file mode 100644 index 0000000000..b48f275054 --- /dev/null +++ b/js/kotlinx-coroutines-core-js/src/internal/Queue.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +internal open class Queue { + private var queue = arrayOfNulls(8) + private var head = 0 + private var tail = 0 + + val isEmpty get() = head == tail + + fun poll(): T? { + if (isEmpty) return null + val result = queue[head]!! + queue[head] = null + head = head.next() + @Suppress("UNCHECKED_CAST") + return result as T + } + + tailrec fun add(element: T) { + val newTail = tail.next() + if (newTail == head) { + resize() + add(element) // retry with larger size + return + } + queue[tail] = element + tail = newTail + } + + private fun resize() { + var i = head + var j = 0 + val a = arrayOfNulls(queue.size * 2) + while (i != tail) { + a[j++] = queue[i] + i = i.next() + } + queue = a + head = 0 + tail = j + } + + private fun Int.next(): Int { + val j = this + 1 + return if (j == queue.size) 0 else j + } +} diff --git a/js/kotlinx-coroutines-core-js/src/internal/ThreadLocal.kt b/js/kotlinx-coroutines-core-js/src/internal/ThreadLocal.kt index 5c6aa80717..2ee0a4fd0d 100644 --- a/js/kotlinx-coroutines-core-js/src/internal/ThreadLocal.kt +++ b/js/kotlinx-coroutines-core-js/src/internal/ThreadLocal.kt @@ -4,7 +4,9 @@ package kotlinx.coroutines.internal -internal actual class CommonThreadLocal actual constructor(supplier: () -> T) { - private val value = supplier() - actual fun get(): T = value +internal actual class CommonThreadLocal actual constructor() { + private var value: T? = null + @Suppress("UNCHECKED_CAST") + actual fun get(): T = value as T + actual fun set(value: T) { this.value = value } } \ No newline at end of file diff --git a/native/kotlinx-coroutines-core-native/src/Builders.kt b/native/kotlinx-coroutines-core-native/src/Builders.kt index 135529d1da..f4ec935be9 100644 --- a/native/kotlinx-coroutines-core-native/src/Builders.kt +++ b/native/kotlinx-coroutines-core-native/src/Builders.kt @@ -30,47 +30,38 @@ import kotlin.coroutines.* */ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T { val contextInterceptor = context[ContinuationInterceptor] - val privateEventLoop = contextInterceptor == null // create private event loop if no dispatcher is specified - val eventLoop = if (privateEventLoop) { - //check(currentEventLoop == null) { "Cannot use runBlocking inside runBlocking" } - val newEventLoop = BlockingEventLoop() - currentEventLoop.add(newEventLoop) - newEventLoop - } else - contextInterceptor as? EventLoop - val newContext = GlobalScope.newCoroutineContext( - if (privateEventLoop) context + (eventLoop as ContinuationInterceptor) else context - ) - val coroutine = BlockingCoroutine(newContext, eventLoop, privateEventLoop) - coroutine.start(CoroutineStart.DEFAULT, coroutine, block) - return coroutine.joinBlocking().also { - currentEventLoop.removeAt(currentEventLoop.size - 1) + val eventLoop: EventLoop? + var newContext: CoroutineContext = context // todo: kludge for data flow analysis error + if (contextInterceptor == null) { + // create/use private event loop if no dispatcher is specified + eventLoop = ThreadLocalEventLoop.eventLoop.also { + newContext = GlobalScope.newCoroutineContext(context + it) + } + } else { + eventLoop = contextInterceptor as? EventLoop + newContext = GlobalScope.newCoroutineContext(context) } + val coroutine = BlockingCoroutine(newContext, eventLoop) + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) + return coroutine.joinBlocking() } private class BlockingCoroutine( parentContext: CoroutineContext, - private val eventLoop: EventLoop?, - private val privateEventLoop: Boolean + private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true) { - init { - if (privateEventLoop) require(eventLoop is BlockingEventLoop) - } - @Suppress("UNCHECKED_CAST") fun joinBlocking(): T { - while (true) { - val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE - // note: process next even may loose unpark flag, so check if completed before parking - if (isCompleted) break - // todo: LockSupport.parkNanos(this, parkNanos) - } - // process queued events (that could have been added after last processNextEvent and before cancel - if (privateEventLoop) (eventLoop as BlockingEventLoop).apply { - // We exit the "while" loop above when this coroutine's state "isCompleted", - // Here we should signal that BlockingEventLoop should not accept any more tasks - isCompleted = true - shutdown() + try { + eventLoop?.incrementUseCount() + while (true) { + val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE + // note: process next even may loose unpark flag, so check if completed before parking + if (isCompleted) break + // todo: LockSupport.parkNanos(this, parkNanos) + } + } finally { // paranoia + eventLoop?.decrementUseCount() } // now return result val state = this.state diff --git a/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt b/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt index 1a7628d978..d846569126 100644 --- a/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt +++ b/native/kotlinx-coroutines-core-native/src/CoroutineContext.kt @@ -7,11 +7,9 @@ package kotlinx.coroutines import kotlin.coroutines.* import kotlinx.coroutines.internal.* -@ThreadLocal -internal val currentEventLoop = ArrayList() - -private fun takeEventLoop(): BlockingEventLoop = - currentEventLoop.firstOrNull() ?: error("There is no event loop. Use runBlocking { ... } to start one.") +private fun takeEventLoop(): EventLoopImpl = + ThreadLocalEventLoop.currentOrNull() as EventLoopImpl ?: + error("There is no event loop. Use runBlocking { ... } to start one.") internal object DefaultExecutor : CoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable) = @@ -21,15 +19,15 @@ internal object DefaultExecutor : CoroutineDispatcher(), Delay { override fun invokeOnTimeout(time: Long, block: Runnable): DisposableHandle = takeEventLoop().invokeOnTimeout(time, block) - fun execute(task: Runnable) { + fun enqueue(task: Runnable): Boolean { error("Cannot execute task because event loop was shut down") } - fun schedule(delayedTask: EventLoopBase.DelayedTask) { + fun schedule(delayedTask: EventLoopImpl.DelayedTask) { error("Cannot schedule task because event loop was shut down") } - fun removeDelayedImpl(delayedTask: EventLoopBase.DelayedTask) { + fun removeDelayedImpl(delayedTask: EventLoopImpl.DelayedTask) { error("Cannot happen") } } diff --git a/native/kotlinx-coroutines-core-native/src/EventLoop.kt b/native/kotlinx-coroutines-core-native/src/EventLoop.kt index 2e9efcce76..34c5a16062 100644 --- a/native/kotlinx-coroutines-core-native/src/EventLoop.kt +++ b/native/kotlinx-coroutines-core-native/src/EventLoop.kt @@ -11,34 +11,6 @@ import platform.posix.* import kotlin.coroutines.* import kotlin.system.* -/** - * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can - * be asked to process next event from their event queue. - * - * It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to - * continue processing events when invoked from the event dispatch thread. - */ -internal interface EventLoop { - /** - * Processes next event in this event loop. - * - * The result of this function is to be interpreted like this: - * * `<= 0` -- there are potentially more events for immediate processing; - * * `> 0` -- a number of nanoseconds to wait for next scheduled event; - * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread. - */ - public fun processNextEvent(): Long -} - -/** - * Creates a new event loop. - */ -@Suppress("FunctionName") -internal fun EventLoop(parentJob: Job? = null): CoroutineDispatcher = - EventLoopImpl().apply { - if (parentJob != null) initParentJob(parentJob) - } - private const val DELAYED = 0 private const val REMOVED = 1 private const val RESCHEDULED = 2 @@ -58,16 +30,16 @@ private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY") private typealias Queue = LockFreeMPSCQueueCore -internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { +internal class EventLoopImpl: EventLoop(), Delay { // null | CLOSED_EMPTY | task | Queue private val _queue = atomic(null) // Allocated only once private val _delayed = atomic?>(null) - protected abstract val isCompleted: Boolean + private var isCompleted = false - protected val isEmpty: Boolean + override val isEmpty: Boolean get() = isQueueEmpty && isDelayedEmpty private val isQueueEmpty: Boolean get() { @@ -92,9 +64,6 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0) } - override fun dispatch(context: CoroutineContext, block: Runnable) = - execute(block) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = schedule(DelayedResumeTask(timeMillis, continuation)) @@ -119,18 +88,20 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { } } // then process one event from queue - dequeue()?.run() + dequeue()?.let { runBlock(it) } return nextTime } - @Suppress("MemberVisibilityCanBePrivate") // todo: remove suppress when KT-22030 is fixed - internal fun execute(task: Runnable) { + // returns true if it was successfully enqueued for execution in this event loop, false if got to default executor + override fun enqueue(task: Runnable): Boolean = if (enqueueImpl(task)) { // todo: we should unpark only when this delayed task became first in the queue unpark() - } else - DefaultExecutor.execute(task) - } + true + } else { + DefaultExecutor.enqueue(task) + false + } @Suppress("UNCHECKED_CAST") private fun enqueueImpl(task: Runnable): Boolean { @@ -235,7 +206,12 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { } } - fun shutdown() { + override fun shutdown() { + // Clean up thread-local reference here -- this event loop is shutting down + ThreadLocalEventLoop.resetEventLoop() + // We should signal that ThreadEventLoop should not accept any more tasks + // and process queued events (that could have been added after last processNextEvent) + isCompleted = true closeQueue() // complete processing of all queued tasks while (processNextEvent() <= 0) { /* spin */ } @@ -300,20 +276,7 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop { } } -private class EventLoopImpl : EventLoopBase() { - private var parentJob: Job? = null - - override val isCompleted: Boolean get() = parentJob?.isCompleted == true - - fun initParentJob(parentJob: Job) { - require(this.parentJob == null) - this.parentJob = parentJob - } -} - -internal class BlockingEventLoop : EventLoopBase() { - public override var isCompleted: Boolean = false -} +internal actual fun createEventLoop(): EventLoop = EventLoopImpl() private fun nanoTime(): Long { return getTimeNanos() diff --git a/native/kotlinx-coroutines-core-native/src/internal/ThreadLocal.kt b/native/kotlinx-coroutines-core-native/src/internal/ThreadLocal.kt index 5f98f1c255..420e0d2cb0 100644 --- a/native/kotlinx-coroutines-core-native/src/internal/ThreadLocal.kt +++ b/native/kotlinx-coroutines-core-native/src/internal/ThreadLocal.kt @@ -8,7 +8,9 @@ import kotlin.native.concurrent.* @Suppress("ACTUAL_WITHOUT_EXPECT") internal actual typealias NativeThreadLocal = kotlin.native.ThreadLocal -internal actual class CommonThreadLocal actual constructor(supplier: () -> T) { - private val value = supplier() - actual fun get(): T = value -} +internal actual class CommonThreadLocal actual constructor() { + private var value: T? = null + @Suppress("UNCHECKED_CAST") + actual fun get(): T = value as T + actual fun set(value: T) { this.value = value } +} \ No newline at end of file From 639644070cdedd6edb964610dd7fdd7e84436094 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 14 Dec 2018 10:40:12 +0300 Subject: [PATCH 2/9] Restore BlockingEventLoop class name for runBlocking event loop impl It is a name people are used to seen then they example the coroutineContext inside runBlocking. The fact that it is also used for unconfined now is Ok as it is not directly exposed. --- core/kotlinx-coroutines-core/src/EventLoop.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/kotlinx-coroutines-core/src/EventLoop.kt b/core/kotlinx-coroutines-core/src/EventLoop.kt index 792adebf66..0f5e0c3ae2 100644 --- a/core/kotlinx-coroutines-core/src/EventLoop.kt +++ b/core/kotlinx-coroutines-core/src/EventLoop.kt @@ -83,7 +83,7 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { override fun shutdown() { // Clean up thread-local reference here -- this event loop is shutting down ThreadLocalEventLoop.resetEventLoop() - // We should signal that ThreadEventLoop should not accept any more tasks + // We should signal that this event loop should not accept any more tasks // and process queued events (that could have been added after last processNextEvent) isCompleted = true closeQueue() @@ -311,8 +311,8 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { } } -internal class ThreadEventLoop( +internal class BlockingEventLoop( override val thread: Thread ) : EventLoopImplBase() -internal actual fun createEventLoop(): EventLoop = ThreadEventLoop(Thread.currentThread()) \ No newline at end of file +internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread()) \ No newline at end of file From 2c1ae01a2eef30b94ec5a0c7706d97e65d71062e Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 14 Dec 2018 13:55:09 +0300 Subject: [PATCH 3/9] Internal API for custom waiting loops To be used by kotlinx-io --- .../kotlinx-coroutines-core.txt | 4 +++ core/kotlinx-coroutines-core/src/EventLoop.kt | 25 ++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index a2000c7640..14c2433222 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -243,6 +243,10 @@ public abstract interface class kotlinx/coroutines/DisposableHandle { public abstract fun dispose ()V } +public final class kotlinx/coroutines/EventLoopKt { + public static final fun processNextEventInCurrentThread ()J +} + public abstract class kotlinx/coroutines/ExecutorCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, java/io/Closeable { public fun ()V public abstract fun close ()V diff --git a/core/kotlinx-coroutines-core/src/EventLoop.kt b/core/kotlinx-coroutines-core/src/EventLoop.kt index 0f5e0c3ae2..1062c6ce97 100644 --- a/core/kotlinx-coroutines-core/src/EventLoop.kt +++ b/core/kotlinx-coroutines-core/src/EventLoop.kt @@ -315,4 +315,27 @@ internal class BlockingEventLoop( override val thread: Thread ) : EventLoopImplBase() -internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread()) \ No newline at end of file +internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread()) + +/** + * Processes next event in the current thread's event loop. + * + * The result of this function is to be interpreted like this: + * * `<= 0` -- there are potentially more events for immediate processing; + * * `> 0` -- a number of nanoseconds to wait for the next scheduled event; + * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread. + * + * Sample usage of this function: + * + * ``` + * while (waitingCondition) { + * val time = processNextEventInCurrentThread() + * LockSupport.parkNanos(time) + * } + * ``` + * + * @suppress **This an internal API and should not be used from general code.** + */ +@InternalCoroutinesApi +public fun processNextEventInCurrentThread(): Long = + ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE From 5038f92391e4b8dd4eadbca44d9b86c38987b822 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 19 Dec 2018 18:38:38 +0300 Subject: [PATCH 4/9] Improvements to EventLoops implementation * Unused ArrayQueue is dropped * executeUnconfined and resumeUnconfined are defined as extension funs * EventLoop.increment renamed to delta * Test for EventLoop added Note: This PR also fixes #850 --- .../src/Dispatched.kt | 34 +++++------ .../src/EventLoop.common.kt | 8 +-- .../src/internal/ArrayQueue.kt | 45 -------------- .../test/EventLoopsTest.kt | 58 +++++++++++++++++++ 4 files changed, 79 insertions(+), 66 deletions(-) delete mode 100644 common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt index cd5901de01..383b61a427 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt @@ -13,40 +13,40 @@ import kotlin.jvm.* private val UNDEFINED = Symbol("UNDEFINED") /** - * Executes given [block] as part of current event loop, updating related to block [continuation] + * Executes given [block] as part of current event loop, updating current continuation * mode and state if continuation is not resumed immediately. * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. */ -private inline fun executeUnconfined( - continuation: DispatchedContinuation<*>, contState: Any?, mode: Int, - doYield: Boolean = false, block: () -> Unit +private inline fun DispatchedContinuation<*>.executeUnconfined( + contState: Any?, mode: Int, doYield: Boolean = false, + block: () -> Unit ) : Boolean { val eventLoop = ThreadLocalEventLoop.eventLoop // If we are yielding and unconfined queue is empty, we can bail out as part of fast path if (doYield && eventLoop.isEmptyUnconfinedQueue) return false return if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow - continuation._state = contState - continuation.resumeMode = mode - eventLoop.dispatchUnconfined(continuation) + _state = contState + resumeMode = mode + eventLoop.dispatchUnconfined(this) true // queued into the active loop } else { - // Was not active -- run event loop until unconfined tasks are executed + // Was not active -- run event loop until all unconfined tasks are executed runUnconfinedEventLoop(eventLoop, block = block) false } } -private fun resumeUnconfined(task: DispatchedTask<*>) { +private fun DispatchedTask<*>.resumeUnconfined() { val eventLoop = ThreadLocalEventLoop.eventLoop if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow - eventLoop.dispatchUnconfined(task) + eventLoop.dispatchUnconfined(this) } else { - // Was not active -- run event loop until unconfined tasks are executed + // Was not active -- run event loop until all unconfined tasks are executed runUnconfinedEventLoop(eventLoop) { - task.resume(task.delegate, MODE_UNDISPATCHED) + resume(delegate, MODE_UNDISPATCHED) } } } @@ -103,7 +103,7 @@ internal class DispatchedContinuation( resumeMode = MODE_ATOMIC_DEFAULT dispatcher.dispatch(context, this) } else { - executeUnconfined(this, state, MODE_ATOMIC_DEFAULT) { + executeUnconfined(state, MODE_ATOMIC_DEFAULT) { withCoroutineContext(this.context, countOrElement) { continuation.resumeWith(result) } @@ -118,7 +118,7 @@ internal class DispatchedContinuation( resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { - executeUnconfined(this, value, MODE_CANCELLABLE) { + executeUnconfined(value, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatched(value) } @@ -135,7 +135,7 @@ internal class DispatchedContinuation( resumeMode = MODE_CANCELLABLE dispatcher.dispatch(context, this) } else { - executeUnconfined(this, state, MODE_CANCELLABLE) { + executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled()) { resumeUndispatchedWithException(exception) } @@ -259,7 +259,7 @@ internal abstract class DispatchedTask( } internal fun DispatchedContinuation.yieldUndispatched(): Boolean = - executeUnconfined(this, Unit, MODE_CANCELLABLE, doYield = true) { + executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) { run() } @@ -272,7 +272,7 @@ internal fun DispatchedTask.dispatch(mode: Int = MODE_CANCELLABLE) { if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { - resumeUnconfined(this) + resumeUnconfined() } } else { resume(delegate, mode) diff --git a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt index 9e30fefc7a..a288018352 100644 --- a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt +++ b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt @@ -79,21 +79,21 @@ internal abstract class EventLoop : CoroutineDispatcher() { get() = useCount > 0 public val isUnconfinedLoopActive: Boolean - get() = useCount >= increment(unconfined = true) + get() = useCount >= delta(unconfined = true) public val isEmptyUnconfinedQueue: Boolean get() = queuedUnconfinedTasks == 0 - private fun increment(unconfined: Boolean) = + private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L fun incrementUseCount(unconfined: Boolean = false) { - useCount += increment(unconfined) + useCount += delta(unconfined) if (!unconfined) shared = true } fun decrementUseCount(unconfined: Boolean = false) { - useCount -= increment(unconfined) + useCount -= delta(unconfined) if (useCount > 0) return check(useCount == 0L) { "Extra decrementUseCount" } if (shared) { diff --git a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt deleted file mode 100644 index 5cfb8e8df1..0000000000 --- a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -internal class ArrayQueue { - private var elements = arrayOfNulls(16) - private var head = 0 - private var tail = 0 - val isEmpty: Boolean get() = head == tail - - public fun addLast(element: T) { - elements[tail] = element - tail = (tail + 1) and elements.size - 1 - if (tail == head) ensureCapacity() - } - - @Suppress("UNCHECKED_CAST") - public fun removeFirstOrNull(): T? { - if (head == tail) return null - val element = elements[head] - elements[head] = null - head = (head + 1) and elements.size - 1 - return element as T - } - - public fun clear() { - head = 0 - tail = 0 - elements = arrayOfNulls(elements.size) - } - - private fun ensureCapacity() { - val currentSize = elements.size - val newCapacity = currentSize shl 1 - val newElements = arrayOfNulls(newCapacity) - val remaining = elements.size - head - arraycopy(elements, head, newElements, 0, remaining) - arraycopy(elements, 0, newElements, remaining, head) - elements = newElements - head = 0 - tail = currentSize - } -} diff --git a/core/kotlinx-coroutines-core/test/EventLoopsTest.kt b/core/kotlinx-coroutines-core/test/EventLoopsTest.kt index 1a6369b54d..fb54dc2419 100644 --- a/core/kotlinx-coroutines-core/test/EventLoopsTest.kt +++ b/core/kotlinx-coroutines-core/test/EventLoopsTest.kt @@ -4,9 +4,11 @@ package kotlinx.coroutines +import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import org.junit.* import org.junit.Test +import java.util.concurrent.locks.* import kotlin.test.* /** @@ -68,4 +70,60 @@ class EventLoopsTest : TestBase() { } finish(6) } + + /** + * Simple test for [processNextEventInCurrentThread] API use-case. + */ + @Test + fun testProcessNextEventInCurrentThreadSimple() = runTest { + expect(1) + val event = CustomBlockingEvent() + // this coroutine fires event + launch { + expect(3) + event.fireEvent() + } + // main coroutine waits for event (same thread!) + expect(2) + event.blockingAwait() + finish(4) + } + + /** + * Test for [processNextEventInCurrentThread] API use-case with delay. + */ + @Test + fun testProcessNextEventInCurrentThreadDelay() = runTest { + expect(1) + val event = CustomBlockingEvent() + // this coroutine fires event + launch { + expect(3) + delay(100) + event.fireEvent() + } + // main coroutine waits for event (same thread!) + expect(2) + event.blockingAwait() + finish(4) + } + + class CustomBlockingEvent { + private val waitingThread = atomic(null) + private val fired = atomic(false) + + fun fireEvent() { + fired.value = true + waitingThread.value?.let { LockSupport.unpark(it) } + } + + fun blockingAwait() { + check(waitingThread.getAndSet(Thread.currentThread()) == null) + while (!fired.getAndSet(false)) { + val time = processNextEventInCurrentThread() + LockSupport.parkNanos(time) + } + waitingThread.value = null + } + } } \ No newline at end of file From f57ad8877658853968e3e56f116651a18e201b32 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 19 Dec 2018 20:11:04 +0300 Subject: [PATCH 5/9] Avoid blocking current-thread event loop even with runBlocking(otherLoop) Previous logic of handling runBlocking(context) had a flaw in that the presence of EventLoop-capable dispatcher in the context lead to the blocking of current thread's event loop. Test for this issue is added. --- core/kotlinx-coroutines-core/src/Builders.kt | 5 ++-- .../test/EventLoopsTest.kt | 29 ++++++++++++++++--- .../src/Builders.kt | 5 ++-- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt index 519825ad38..c56d624d80 100644 --- a/core/kotlinx-coroutines-core/src/Builders.kt +++ b/core/kotlinx-coroutines-core/src/Builders.kt @@ -38,12 +38,13 @@ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl val eventLoop: EventLoop? val newContext: CoroutineContext if (contextInterceptor == null) { - // create/use private event loop if no dispatcher is specified + // create or use private event loop if no dispatcher is specified eventLoop = ThreadLocalEventLoop.eventLoop.also { newContext = GlobalScope.newCoroutineContext(context + it) } } else { - eventLoop = contextInterceptor as? EventLoop + // find existing thread-local event loop if present to avoid blocking it (but don't create one) + eventLoop = ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) diff --git a/core/kotlinx-coroutines-core/test/EventLoopsTest.kt b/core/kotlinx-coroutines-core/test/EventLoopsTest.kt index fb54dc2419..835e6b427c 100644 --- a/core/kotlinx-coroutines-core/test/EventLoopsTest.kt +++ b/core/kotlinx-coroutines-core/test/EventLoopsTest.kt @@ -6,9 +6,9 @@ package kotlinx.coroutines import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* -import org.junit.* import org.junit.Test import java.util.concurrent.locks.* +import kotlin.concurrent.* import kotlin.test.* /** @@ -77,7 +77,7 @@ class EventLoopsTest : TestBase() { @Test fun testProcessNextEventInCurrentThreadSimple() = runTest { expect(1) - val event = CustomBlockingEvent() + val event = EventSync() // this coroutine fires event launch { expect(3) @@ -89,13 +89,34 @@ class EventLoopsTest : TestBase() { finish(4) } + @Test + fun testSecondThreadRunBlocking() = runTest { + val testThread = Thread.currentThread() + val testContext = coroutineContext + val event = EventSync() // will signal completion + var thread = thread { + runBlocking { // outer event loop + // Produce string "OK" + val ch = produce { send("OK") } + // try receive this string in a blocking way using test context (another thread) + assertEquals("OK", runBlocking(testContext) { + assertEquals(testThread, Thread.currentThread()) + ch.receive() // it should not hang here + }) + } + event.fireEvent() // done thread + } + event.blockingAwait() // wait for thread to complete + thread.join() // it is safe to join thread now + } + /** * Test for [processNextEventInCurrentThread] API use-case with delay. */ @Test fun testProcessNextEventInCurrentThreadDelay() = runTest { expect(1) - val event = CustomBlockingEvent() + val event = EventSync() // this coroutine fires event launch { expect(3) @@ -108,7 +129,7 @@ class EventLoopsTest : TestBase() { finish(4) } - class CustomBlockingEvent { + class EventSync { private val waitingThread = atomic(null) private val fired = atomic(false) diff --git a/native/kotlinx-coroutines-core-native/src/Builders.kt b/native/kotlinx-coroutines-core-native/src/Builders.kt index f4ec935be9..ca677f2a51 100644 --- a/native/kotlinx-coroutines-core-native/src/Builders.kt +++ b/native/kotlinx-coroutines-core-native/src/Builders.kt @@ -33,12 +33,13 @@ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl val eventLoop: EventLoop? var newContext: CoroutineContext = context // todo: kludge for data flow analysis error if (contextInterceptor == null) { - // create/use private event loop if no dispatcher is specified + // create or use private event loop if no dispatcher is specified eventLoop = ThreadLocalEventLoop.eventLoop.also { newContext = GlobalScope.newCoroutineContext(context + it) } } else { - eventLoop = contextInterceptor as? EventLoop + // find existing thread-local event loop if present to avoid blocking it (but don't create one) + eventLoop = ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, eventLoop) From d987c2721157bff49bcc201055b9a33ad892c8cd Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 19 Dec 2018 20:16:59 +0300 Subject: [PATCH 6/9] Clarify EventLoop.processNextEvent contract Must be invoked from the correct thread. --- .../kotlinx-coroutines-core-common/src/EventLoop.common.kt | 5 ++++- core/kotlinx-coroutines-core/src/EventLoop.kt | 3 +-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt index a288018352..b8e15eba42 100644 --- a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt +++ b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt @@ -24,7 +24,10 @@ internal abstract class EventLoop : CoroutineDispatcher() { * The result of this function is to be interpreted like this: * * `<= 0` -- there are potentially more events for immediate processing; * * `> 0` -- a number of nanoseconds to wait for next scheduled event; - * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread. + * * [Long.MAX_VALUE] -- no more events. + * + * **NOTE**: Must be invoked only from the event loop's thread + * (no check for performance reasons, may be added in the future). */ public abstract fun processNextEvent(): Long diff --git a/core/kotlinx-coroutines-core/src/EventLoop.kt b/core/kotlinx-coroutines-core/src/EventLoop.kt index 1062c6ce97..35ed3520b7 100644 --- a/core/kotlinx-coroutines-core/src/EventLoop.kt +++ b/core/kotlinx-coroutines-core/src/EventLoop.kt @@ -97,7 +97,6 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { schedule(DelayedResumeTask(timeMillis, continuation)) override fun processNextEvent(): Long { - if (Thread.currentThread() !== thread) return Long.MAX_VALUE // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { @@ -323,7 +322,7 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr * The result of this function is to be interpreted like this: * * `<= 0` -- there are potentially more events for immediate processing; * * `> 0` -- a number of nanoseconds to wait for the next scheduled event; - * * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread. + * * [Long.MAX_VALUE] -- no more events or no thread-local event loop. * * Sample usage of this function: * From fd7a3ded275ac212067c5d0c482a1751556f3a2f Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Thu, 20 Dec 2018 00:05:59 +0300 Subject: [PATCH 7/9] Restored special status of TestCoroutineContext for runBlocking So that runBlocking processes events from TestCoroutineContext --- .../src/EventLoop.common.kt | 8 ++++++++ core/kotlinx-coroutines-core/src/Builders.kt | 6 ++++-- .../src/test_/TestCoroutineContext.kt | 3 +++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt index b8e15eba42..131d194eab 100644 --- a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt +++ b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt @@ -31,6 +31,14 @@ internal abstract class EventLoop : CoroutineDispatcher() { */ public abstract fun processNextEvent(): Long + /** + * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context + * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). + * By default, event loop implementation is thread-local and should not processed in the context + * (current thread's event loop should be processed instead). + */ + public open fun shouldBeProcessedFromContext(): Boolean = false + public abstract val isEmpty: Boolean /** diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt index c56d624d80..f1f9b89b06 100644 --- a/core/kotlinx-coroutines-core/src/Builders.kt +++ b/core/kotlinx-coroutines-core/src/Builders.kt @@ -43,8 +43,10 @@ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl newContext = GlobalScope.newCoroutineContext(context + it) } } else { - // find existing thread-local event loop if present to avoid blocking it (but don't create one) - eventLoop = ThreadLocalEventLoop.currentOrNull() + // See if context's interceptor is an event loop that we shall use (to support TestContext) + // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) + eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } + ?: ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) diff --git a/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt b/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt index 4f1d37925b..47dfc6c2a5 100644 --- a/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt +++ b/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt @@ -215,6 +215,9 @@ class TestCoroutineContext(private val name: String? = null) : CoroutineContext incrementUseCount() // this event loop is never completed } + // override runBlocking to process this event loop + override fun shouldBeProcessedFromContext(): Boolean = true + override val isEmpty: Boolean get() = queue.isEmpty From 24e5fc46bbfe570af4379de220cdda5062cbb949 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 21 Dec 2018 11:03:44 +0300 Subject: [PATCH 8/9] Unconfined event loop uses thread-local queue for performance * Unconfined tasks take priority over others * Tracking of unconfined tasks simplified * Removed duplication between ArrayQueue/Queue classes in JS --- .../src/Dispatched.kt | 23 +---- .../src/EventLoop.common.kt | 87 +++++++++---------- .../src/internal/ArrayQueue.kt | 46 ++++++++++ core/kotlinx-coroutines-core/src/EventLoop.kt | 24 +++-- .../src/test_/TestCoroutineContext.kt | 12 +-- .../src/EventLoop.kt | 17 +--- .../src/JSDispatcher.kt | 6 +- js/kotlinx-coroutines-core-js/src/Window.kt | 8 +- .../src/internal/Queue.kt | 51 ----------- 9 files changed, 113 insertions(+), 161 deletions(-) create mode 100644 common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt delete mode 100644 js/kotlinx-coroutines-core-js/src/internal/Queue.kt diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt index 383b61a427..505955a7a7 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt @@ -24,7 +24,7 @@ private inline fun DispatchedContinuation<*>.executeUnconfined( ) : Boolean { val eventLoop = ThreadLocalEventLoop.eventLoop // If we are yielding and unconfined queue is empty, we can bail out as part of fast path - if (doYield && eventLoop.isEmptyUnconfinedQueue) return false + if (doYield && eventLoop.isUnconfinedQueueEmpty) return false return if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow _state = contState @@ -60,7 +60,7 @@ private inline fun runUnconfinedEventLoop( block() while (eventLoop.processNextEvent() <= 0) { // break when all unconfined continuations where executed - if (eventLoop.isEmptyUnconfinedQueue) break + if (eventLoop.isUnconfinedQueueEmpty) break } } catch (e: Throwable) { /* @@ -200,26 +200,9 @@ internal fun Continuation.resumeDirectWithException(exception: Throwable) else -> resumeWithStackTrace(exception) } -private const val UNCONFINED_TASK_BIT = 1 shl 31 - internal abstract class DispatchedTask( - resumeMode: Int + @JvmField public var resumeMode: Int ) : SchedulerTask() { - private var _resumeMode: Int = resumeMode // can have UNCONFINED_TASK_BIT set - - public var resumeMode: Int - get() = _resumeMode and UNCONFINED_TASK_BIT.inv() - set(value) { _resumeMode = value } - - /** - * Set to `true` when this task comes from [Dispatchers.Unconfined] or from another dispatcher - * that returned `false` from [CoroutineDispatcher.isDispatchNeeded], - * but there was event loop running, so it was submitted into that event loop. - */ - public var isUnconfinedTask: Boolean - get() = _resumeMode and UNCONFINED_TASK_BIT != 0 - set(value) { _resumeMode = if (value) resumeMode or UNCONFINED_TASK_BIT else resumeMode } - public abstract val delegate: Continuation public abstract fun takeState(): Any? diff --git a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt index 131d194eab..ecbc85d9b3 100644 --- a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt +++ b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt @@ -5,7 +5,6 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* -import kotlin.coroutines.* /** * Extended by [CoroutineDispatcher] implementations that have event loop inside and can @@ -18,6 +17,24 @@ import kotlin.coroutines.* * @suppress **This an internal API and should not be used from general code.** */ internal abstract class EventLoop : CoroutineDispatcher() { + /** + * Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop. + */ + private var useCount = 0L + + /** + * Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so + * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely + * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time. + */ + private var shared = false + + /** + * Queue used by [Dispatchers.Unconfined] tasks. + * These tasks are thread-local for performance and take precedence over the rest of the queue. + */ + private var unconfinedQueue: ArrayQueue>? = null + /** * Processes next event in this event loop. * @@ -29,8 +46,23 @@ internal abstract class EventLoop : CoroutineDispatcher() { * **NOTE**: Must be invoked only from the event loop's thread * (no check for performance reasons, may be added in the future). */ - public abstract fun processNextEvent(): Long + public open fun processNextEvent(): Long { + if (!processUnconfinedEvent()) return Long.MAX_VALUE + return nextTime + } + protected open val nextTime: Long + get() { + val queue = unconfinedQueue ?: return Long.MAX_VALUE + return if (queue.isEmpty) Long.MAX_VALUE else 0L + } + + protected fun processUnconfinedEvent(): Boolean { + val queue = unconfinedQueue ?: return false + val task = queue.removeFirstOrNull() ?: return false + task.run() + return true + } /** * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). @@ -39,61 +71,25 @@ internal abstract class EventLoop : CoroutineDispatcher() { */ public open fun shouldBeProcessedFromContext(): Boolean = false - public abstract val isEmpty: Boolean - /** * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] * into the current event loop. */ public fun dispatchUnconfined(task: DispatchedTask<*>) { - task.isUnconfinedTask = true - check(enqueue(task)) { "Attempting to dispatchUnconfined into the EventLoop that was shut down"} - queuedUnconfinedTasks++ + val queue = unconfinedQueue ?: + ArrayQueue>().also { unconfinedQueue = it } + queue.addLast(task) } - public override fun dispatch(context: CoroutineContext, block: Runnable) { - if (block is DispatchedTask<*>) block.isUnconfinedTask = false - enqueue(block) - } - - // returns true if it was successfully enqueued for execution in this event loop, false if got to default executor - public abstract fun enqueue(task: Runnable): Boolean - - protected fun runBlock(block: Runnable) { - try { - block.run() - } finally { - if (block is DispatchedTask<*> && block.isUnconfinedTask) { - check(--queuedUnconfinedTasks >= 0) { "queuedUnconfinedTasks underflow" } - } - } - } - - /** - * Counts the number of nested [runBlocking] and [Dispatchers.Unconfined] that use this event loop. - */ - private var useCount = 0L - - /** - * Set to true on any use by [runBlocking], because it potentially leaks this loop to other threads, so - * this instance must be properly shutdown. We don't need to shutdown event loop that was used solely - * by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time. - */ - private var shared = false - - /** - * Counts a number of currently enqueued (but not executed yet) unconfined tasks. - */ - private var queuedUnconfinedTasks = 0 - public val isActive: Boolean get() = useCount > 0 public val isUnconfinedLoopActive: Boolean get() = useCount >= delta(unconfined = true) - public val isEmptyUnconfinedQueue: Boolean - get() = queuedUnconfinedTasks == 0 + // May only be used from the event loop's thread + public val isUnconfinedQueueEmpty: Boolean + get() = unconfinedQueue?.isEmpty ?: false private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L @@ -110,9 +106,6 @@ internal abstract class EventLoop : CoroutineDispatcher() { if (shared) { // shut it down and remove from ThreadLocalEventLoop shutdown() - } else { - // it was not shared, so it could not have accumulated any other tasks - check(isEmpty) { "EventLoop that was used only by unconfined tasks should be empty" } } } diff --git a/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt new file mode 100644 index 0000000000..759c0dbc64 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +internal open class ArrayQueue { + private var elements = arrayOfNulls(16) + private var head = 0 + private var tail = 0 + + val isEmpty: Boolean get() = head == tail + + public fun addLast(element: T) { + elements[tail] = element + tail = (tail + 1) and elements.size - 1 + if (tail == head) ensureCapacity() + } + + @Suppress("UNCHECKED_CAST") + public fun removeFirstOrNull(): T? { + if (head == tail) return null + val element = elements[head] + elements[head] = null + head = (head + 1) and elements.size - 1 + return element as T + } + + public fun clear() { + head = 0 + tail = 0 + elements = arrayOfNulls(elements.size) + } + + private fun ensureCapacity() { + val currentSize = elements.size + val newCapacity = currentSize shl 1 + val newElements = arrayOfNulls(newCapacity) + val remaining = elements.size - head + arraycopy(elements, head, newElements, 0, remaining) + arraycopy(elements, 0, newElements, remaining, head) + elements = newElements + head = 0 + tail = currentSize + } +} diff --git a/core/kotlinx-coroutines-core/src/EventLoop.kt b/core/kotlinx-coroutines-core/src/EventLoop.kt index 35ed3520b7..883600e993 100644 --- a/core/kotlinx-coroutines-core/src/EventLoop.kt +++ b/core/kotlinx-coroutines-core/src/EventLoop.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines import kotlinx.atomicfu.* import kotlinx.coroutines.internal.* +import kotlin.coroutines.* private val DISPOSED_TASK = Symbol("REMOVED_TASK") @@ -43,9 +44,6 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { @Volatile private var isCompleted = false - override val isEmpty: Boolean - get() = isQueueEmpty && isDelayedEmpty - protected val isQueueEmpty: Boolean get() { val queue = _queue.value return when (queue) { @@ -55,13 +53,9 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { } } - private val isDelayedEmpty: Boolean get() { - val delayed = _delayed.value - return delayed == null || delayed.isEmpty - } - - private val nextTime: Long + protected override val nextTime: Long get() { + if (super.nextTime == 0L) return 0L val queue = _queue.value when { queue === null -> {} // empty queue -- proceed @@ -97,6 +91,8 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { schedule(DelayedResumeTask(timeMillis, continuation)) override fun processNextEvent(): Long { + // unconfined events take priority + if (processUnconfinedEvent()) return nextTime // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { @@ -114,20 +110,20 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { } } // then process one event from queue - dequeue()?.let { runBlock(it) } + dequeue()?.run() return nextTime } - // returns true if it was successfully enqueued for execution in this event loop, false if got to default executor - override fun enqueue(task: Runnable): Boolean = + public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) + + public fun enqueue(task: Runnable) { if (enqueueImpl(task)) { // todo: we should unpark only when this delayed task became first in the queue unpark() - true } else { DefaultExecutor.enqueue(task) - false } + } @Suppress("UNCHECKED_CAST") private fun enqueueImpl(task: Runnable): Boolean { diff --git a/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt b/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt index 47dfc6c2a5..983af6d17d 100644 --- a/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt +++ b/core/kotlinx-coroutines-core/src/test_/TestCoroutineContext.kt @@ -215,17 +215,13 @@ class TestCoroutineContext(private val name: String? = null) : CoroutineContext incrementUseCount() // this event loop is never completed } + override fun dispatch(context: CoroutineContext, block: Runnable) { + this@TestCoroutineContext.enqueue(block) + } + // override runBlocking to process this event loop override fun shouldBeProcessedFromContext(): Boolean = true - override val isEmpty: Boolean - get() = queue.isEmpty - - override fun enqueue(task: Runnable): Boolean { - this@TestCoroutineContext.enqueue(task) - return true - } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { postDelayed(Runnable { with(continuation) { resumeUndispatched(Unit) } diff --git a/js/kotlinx-coroutines-core-js/src/EventLoop.kt b/js/kotlinx-coroutines-core-js/src/EventLoop.kt index 53fbf07801..7a27fe6aef 100644 --- a/js/kotlinx-coroutines-core-js/src/EventLoop.kt +++ b/js/kotlinx-coroutines-core-js/src/EventLoop.kt @@ -4,23 +4,12 @@ package kotlinx.coroutines -import kotlinx.coroutines.internal.* +import kotlin.coroutines.* internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoop() internal class UnconfinedEventLoop : EventLoop() { - private val queue = Queue() - - override val isEmpty: Boolean - get() = queue.isEmpty - - override fun processNextEvent(): Long { - queue.poll()?.run() - return if (queue.isEmpty) Long.MAX_VALUE else 0L - } - - override fun enqueue(task: Runnable): Boolean { - queue.add(task) - return true + override fun dispatch(context: CoroutineContext, block: Runnable) { + throw UnsupportedOperationException("runBlocking event loop is not supported") } } diff --git a/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt b/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt index 138eada1b0..f2e3b9026a 100644 --- a/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt +++ b/js/kotlinx-coroutines-core-js/src/JSDispatcher.kt @@ -72,7 +72,7 @@ internal class WindowDispatcher(private val window: Window) : CoroutineDispatche } } -internal abstract class MessageQueue : Queue() { +internal abstract class MessageQueue : ArrayQueue() { val yieldEvery = 16 // yield to JS event loop after this many processed messages private var scheduled = false @@ -80,7 +80,7 @@ internal abstract class MessageQueue : Queue() { abstract fun schedule() fun enqueue(element: Runnable) { - add(element) + addLast(element) if (!scheduled) { scheduled = true schedule() @@ -91,7 +91,7 @@ internal abstract class MessageQueue : Queue() { try { // limit number of processed messages repeat(yieldEvery) { - val element = poll() ?: return@process + val element = removeFirstOrNull() ?: return@process element.run() } } finally { diff --git a/js/kotlinx-coroutines-core-js/src/Window.kt b/js/kotlinx-coroutines-core-js/src/Window.kt index 1b1dc1a95f..3c1cdb42a6 100644 --- a/js/kotlinx-coroutines-core-js/src/Window.kt +++ b/js/kotlinx-coroutines-core-js/src/Window.kt @@ -35,12 +35,12 @@ private fun Window.asWindowAnimationQueue(): WindowAnimationQueue = private class WindowAnimationQueue(private val window: Window) { private val dispatcher = window.asCoroutineDispatcher() private var scheduled = false - private var current = Queue>() - private var next = Queue>() + private var current = ArrayQueue>() + private var next = ArrayQueue>() private var timestamp = 0.0 fun enqueue(cont: CancellableContinuation) { - next.add(cont) + next.addLast(cont) if (!scheduled) { scheduled = true window.requestAnimationFrame { ts -> @@ -56,7 +56,7 @@ private class WindowAnimationQueue(private val window: Window) { fun process() { while(true) { - val element = current.poll() ?: return + val element = current.removeFirstOrNull() ?: return with(element) { dispatcher.resumeUndispatched(timestamp) } } } diff --git a/js/kotlinx-coroutines-core-js/src/internal/Queue.kt b/js/kotlinx-coroutines-core-js/src/internal/Queue.kt deleted file mode 100644 index b48f275054..0000000000 --- a/js/kotlinx-coroutines-core-js/src/internal/Queue.kt +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.internal - -internal open class Queue { - private var queue = arrayOfNulls(8) - private var head = 0 - private var tail = 0 - - val isEmpty get() = head == tail - - fun poll(): T? { - if (isEmpty) return null - val result = queue[head]!! - queue[head] = null - head = head.next() - @Suppress("UNCHECKED_CAST") - return result as T - } - - tailrec fun add(element: T) { - val newTail = tail.next() - if (newTail == head) { - resize() - add(element) // retry with larger size - return - } - queue[tail] = element - tail = newTail - } - - private fun resize() { - var i = head - var j = 0 - val a = arrayOfNulls(queue.size * 2) - while (i != tail) { - a[j++] = queue[i] - i = i.next() - } - queue = a - head = 0 - tail = j - } - - private fun Int.next(): Int { - val j = this + 1 - return if (j == queue.size) 0 else j - } -} From 2a66caedce23233751984d806d12bf293f823b7e Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 21 Dec 2018 15:46:42 +0300 Subject: [PATCH 9/9] Further improvements for unconfined event loop perf & native --- .../src/Dispatched.kt | 4 +-- .../src/EventLoop.common.kt | 6 ++-- core/kotlinx-coroutines-core/src/Builders.kt | 5 ++- .../src/DefaultExecutor.kt | 2 +- core/kotlinx-coroutines-core/src/EventLoop.kt | 5 ++- .../src/Builders.kt | 11 +++--- .../src/EventLoop.kt | 36 ++++++++++--------- 7 files changed, 39 insertions(+), 30 deletions(-) diff --git a/common/kotlinx-coroutines-core-common/src/Dispatched.kt b/common/kotlinx-coroutines-core-common/src/Dispatched.kt index 505955a7a7..eb6e02f375 100644 --- a/common/kotlinx-coroutines-core-common/src/Dispatched.kt +++ b/common/kotlinx-coroutines-core-common/src/Dispatched.kt @@ -58,9 +58,9 @@ private inline fun runUnconfinedEventLoop( eventLoop.incrementUseCount(unconfined = true) try { block() - while (eventLoop.processNextEvent() <= 0) { + while (true) { // break when all unconfined continuations where executed - if (eventLoop.isUnconfinedQueueEmpty) break + if (!eventLoop.processUnconfinedEvent()) break } } catch (e: Throwable) { /* diff --git a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt index ecbc85d9b3..3565292e52 100644 --- a/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt +++ b/common/kotlinx-coroutines-core-common/src/EventLoop.common.kt @@ -51,13 +51,15 @@ internal abstract class EventLoop : CoroutineDispatcher() { return nextTime } + protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty + protected open val nextTime: Long get() { val queue = unconfinedQueue ?: return Long.MAX_VALUE return if (queue.isEmpty) Long.MAX_VALUE else 0L } - protected fun processUnconfinedEvent(): Boolean { + public fun processUnconfinedEvent(): Boolean { val queue = unconfinedQueue ?: return false val task = queue.removeFirstOrNull() ?: return false task.run() @@ -89,7 +91,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { // May only be used from the event loop's thread public val isUnconfinedQueueEmpty: Boolean - get() = unconfinedQueue?.isEmpty ?: false + get() = unconfinedQueue?.isEmpty ?: true private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt index f1f9b89b06..033452a3a1 100644 --- a/core/kotlinx-coroutines-core/src/Builders.kt +++ b/core/kotlinx-coroutines-core/src/Builders.kt @@ -39,9 +39,8 @@ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop.also { - newContext = GlobalScope.newCoroutineContext(context + it) - } + eventLoop = ThreadLocalEventLoop.eventLoop + newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { // See if context's interceptor is an event loop that we shall use (to support TestContext) // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) diff --git a/core/kotlinx-coroutines-core/src/DefaultExecutor.kt b/core/kotlinx-coroutines-core/src/DefaultExecutor.kt index 813da13ff5..8358dcc4d2 100644 --- a/core/kotlinx-coroutines-core/src/DefaultExecutor.kt +++ b/core/kotlinx-coroutines-core/src/DefaultExecutor.kt @@ -88,7 +88,7 @@ internal object DefaultExecutor : EventLoopImplBase(), Runnable { acknowledgeShutdownIfNeeded() timeSource.unregisterTimeLoopThread() // recheck if queues are empty after _thread reference was set to null (!!!) - if (!isQueueEmpty) thread // recreate thread if it is needed + if (!isEmpty) thread // recreate thread if it is needed } } diff --git a/core/kotlinx-coroutines-core/src/EventLoop.kt b/core/kotlinx-coroutines-core/src/EventLoop.kt index 883600e993..5d214d148f 100644 --- a/core/kotlinx-coroutines-core/src/EventLoop.kt +++ b/core/kotlinx-coroutines-core/src/EventLoop.kt @@ -44,7 +44,10 @@ internal abstract class EventLoopImplBase: EventLoop(), Delay { @Volatile private var isCompleted = false - protected val isQueueEmpty: Boolean get() { + override val isEmpty: Boolean get() { + if (!isUnconfinedQueueEmpty) return false + val delayed = _delayed.value + if (delayed != null && !delayed.isEmpty) return false val queue = _queue.value return when (queue) { null -> true diff --git a/native/kotlinx-coroutines-core-native/src/Builders.kt b/native/kotlinx-coroutines-core-native/src/Builders.kt index ca677f2a51..c53831ac11 100644 --- a/native/kotlinx-coroutines-core-native/src/Builders.kt +++ b/native/kotlinx-coroutines-core-native/src/Builders.kt @@ -34,12 +34,13 @@ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl var newContext: CoroutineContext = context // todo: kludge for data flow analysis error if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop.also { - newContext = GlobalScope.newCoroutineContext(context + it) - } + eventLoop = ThreadLocalEventLoop.eventLoop + newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { - // find existing thread-local event loop if present to avoid blocking it (but don't create one) - eventLoop = ThreadLocalEventLoop.currentOrNull() + // See if context's interceptor is an event loop that we shall use (to support TestContext) + // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) + eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } + ?: ThreadLocalEventLoop.currentOrNull() newContext = GlobalScope.newCoroutineContext(context) } val coroutine = BlockingCoroutine(newContext, eventLoop) diff --git a/native/kotlinx-coroutines-core-native/src/EventLoop.kt b/native/kotlinx-coroutines-core-native/src/EventLoop.kt index 34c5a16062..37f5442059 100644 --- a/native/kotlinx-coroutines-core-native/src/EventLoop.kt +++ b/native/kotlinx-coroutines-core-native/src/EventLoop.kt @@ -39,10 +39,10 @@ internal class EventLoopImpl: EventLoop(), Delay { private var isCompleted = false - override val isEmpty: Boolean - get() = isQueueEmpty && isDelayedEmpty - - private val isQueueEmpty: Boolean get() { + override val isEmpty: Boolean get() { + if (!isUnconfinedQueueEmpty) return false + val delayed = _delayed.value + if (delayed != null && !delayed.isEmpty) return false val queue = _queue.value return when (queue) { null -> true @@ -51,14 +51,16 @@ internal class EventLoopImpl: EventLoop(), Delay { } } - private val isDelayedEmpty: Boolean get() { - val delayed = _delayed.value - return delayed == null || delayed.isEmpty - } - - private val nextTime: Long + protected override val nextTime: Long get() { - if (!isQueueEmpty) return 0 + if (super.nextTime == 0L) return 0L + val queue = _queue.value + when { + queue === null -> {} // empty queue -- proceed + queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue + queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed + else -> return 0 // non-empty queue + } val delayed = _delayed.value ?: return Long.MAX_VALUE val nextDelayedTask = delayed.peek() ?: return Long.MAX_VALUE return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0) @@ -71,6 +73,8 @@ internal class EventLoopImpl: EventLoop(), Delay { DelayedRunnableTask(timeMillis, block).also { schedule(it) } override fun processNextEvent(): Long { + // unconfined events take priority + if (processUnconfinedEvent()) return nextTime // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { @@ -88,20 +92,20 @@ internal class EventLoopImpl: EventLoop(), Delay { } } // then process one event from queue - dequeue()?.let { runBlock(it) } + dequeue()?.run() return nextTime } - // returns true if it was successfully enqueued for execution in this event loop, false if got to default executor - override fun enqueue(task: Runnable): Boolean = + public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) + + public fun enqueue(task: Runnable) { if (enqueueImpl(task)) { // todo: we should unpark only when this delayed task became first in the queue unpark() - true } else { DefaultExecutor.enqueue(task) - false } + } @Suppress("UNCHECKED_CAST") private fun enqueueImpl(task: Runnable): Boolean {