Skip to content

Implement internal API for running current event loop #3641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 3, 2023
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
}

public final class kotlinx/coroutines/EventLoopKt {
public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z
public static final fun processNextEventInCurrentThread ()J
public static final fun runSingleTaskFromCurrentSystemDispatcher ()J
}

public final class kotlinx/coroutines/ExceptionsKt {
Expand Down
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import kotlin.jvm.*
* The result of .limitedParallelism(x) call, a dispatcher
* that wraps the given dispatcher, but limits the parallelism level, while
* trying to emulate fairness.
*
* ### Implementation details
*
* By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
* to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
* dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
* in order to avoid starvation of the underlying dispatcher.
*
* Such behavior is crucial to be compatible with any underlying dispatcher implementation without
* direct cooperation.
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
Expand Down
78 changes: 78 additions & 0 deletions kotlinx-coroutines-core/jvm/src/EventLoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package kotlinx.coroutines

import kotlinx.coroutines.Runnable
import kotlinx.coroutines.scheduling.*
import kotlinx.coroutines.scheduling.CoroutineScheduler

internal actual abstract class EventLoopImplPlatform: EventLoop() {
protected abstract val thread: Thread

Expand Down Expand Up @@ -45,6 +49,80 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr
*/
@InternalCoroutinesApi
public fun processNextEventInCurrentThread(): Long =
// This API is used in Ktor for serverless integration where a single thread awaits a blocking call
// (and, to avoid actual blocking, does something via this call), see #850
ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE

internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()

/**
* Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
* Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
* (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
*
* ### Invariants
*
* - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
* [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
* up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
* **are not** executed[1].
* - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
*
* [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
* and we cannot leave this method without leaving thread in its original state.
*
* ### Rationale
*
* This is an internal API that is intended to replace IDEA's core FJP decomposition.
* The following API is provided by IDEA core:
* ```
* runDecomposedTaskAndJoinIt { // <- non-suspending call
* // spawn as many tasks as needed
* // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
* }
* ```
* The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
* thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
* 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks
* until an arbitrary condition is satisfied.
*
* See #3439 for additional details.
*
* ### Limitations and caveats
*
* - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
* - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds
* even when work arrives immediately after returning from this method.
* - When there is no more work, it's up to the caller to decide what to do. It's important to remember that
* work to current dispatcher may arrive **later** from external sources [1]
*
* [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
* a tremendous effort.
*
* @throws IllegalStateException if the current thread is not system dispatcher thread
*/
@InternalCoroutinesApi
@DelicateCoroutinesApi
@PublishedApi
internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
val thread = Thread.currentThread()
if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
return thread.runSingleTask()
}

/**
* Checks whether the given thread belongs to Dispatchers.IO.
* Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
* may change this status when switching between tasks.
*
* This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
* purposes, and is declared as an extension only to avoid top-level scope pollution.
*/
@InternalCoroutinesApi
@DelicateCoroutinesApi
@PublishedApi
internal fun Thread.isIoDispatcherThread(): Boolean {
if (this !is CoroutineScheduler.Worker) return false
return isIo()
}

48 changes: 40 additions & 8 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,30 @@ internal class CoroutineScheduler(
tryReleaseCpu(WorkerState.TERMINATED)
}

/**
* See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
* This is a fine-tailored method for a specific use-case not expected to be used widely.
*/
fun runSingleTask(): Long {
val stateSnapshot = state
val isCpuThread = state == WorkerState.CPU_ACQUIRED
val task = if (isCpuThread) {
findCpuTask()
} else {
findBlockingTask()
}
if (task == null) {
if (minDelayUntilStealableTaskNs == 0L) return -1L
return minDelayUntilStealableTaskNs
}
runSafely(task)
if (!isCpuThread) decrementBlockingTasks()
assert { state == stateSnapshot}
return 0L
}

fun isIo() = state == WorkerState.BLOCKING

// Counterpart to "tryUnpark"
private fun tryPark() {
if (!inStack()) {
Expand Down Expand Up @@ -879,9 +903,21 @@ internal class CoroutineScheduler(
* * Check if our queue has one (maybe mixed in with CPU tasks)
* * Poll global and try steal
*/
return findBlockingTask()
}

// NB: ONLY for runSingleTask method
private fun findBlockingTask(): Task? {
return localQueue.pollBlocking()
?: globalBlockingQueue.removeFirstOrNull()
?: trySteal(blockingOnly = true)
?: trySteal(STEAL_BLOCKING_ONLY)
}

// NB: ONLY for runSingleTask method
private fun findCpuTask(): Task? {
return localQueue.pollCpu()
?: globalBlockingQueue.removeFirstOrNull()
?: trySteal(STEAL_CPU_ONLY)
}

private fun findAnyTask(scanLocalQueue: Boolean): Task? {
Expand All @@ -897,7 +933,7 @@ internal class CoroutineScheduler(
} else {
pollGlobalQueues()?.let { return it }
}
return trySteal(blockingOnly = false)
return trySteal(STEAL_ANY)
}

private fun pollGlobalQueues(): Task? {
Expand All @@ -910,7 +946,7 @@ internal class CoroutineScheduler(
}
}

private fun trySteal(blockingOnly: Boolean): Task? {
private fun trySteal(stealingMode: StealingMode): Task? {
val created = createdWorkers
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
if (created < 2) {
Expand All @@ -924,11 +960,7 @@ internal class CoroutineScheduler(
if (currentIndex > created) currentIndex = 1
val worker = workers[currentIndex]
if (worker !== null && worker !== this) {
val stealResult = if (blockingOnly) {
worker.localQueue.tryStealBlocking(stolenTask)
} else {
worker.localQueue.trySteal(stolenTask)
}
val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
if (stealResult == TASK_STOLEN) {
val result = stolenTask.element
stolenTask.element = null
Expand Down
69 changes: 50 additions & 19 deletions kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
internal const val TASK_STOLEN = -1L
internal const val NOTHING_TO_STEAL = -2L

internal typealias StealingMode = Int
internal const val STEAL_ANY: StealingMode = 3
internal const val STEAL_CPU_ONLY: StealingMode = 2
internal const val STEAL_BLOCKING_ONLY: StealingMode = 1

internal inline val Task.maskForStealingMode: Int
get() = if (isBlocking) STEAL_BLOCKING_ONLY else STEAL_CPU_ONLY

/**
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
Expand Down Expand Up @@ -107,54 +115,75 @@ internal class WorkQueue {
*
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
*
* [StealingMode] controls what tasks to steal:
* * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen
* * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task, which is used by the scheduler when helping in Dispatchers.IO mode
* * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher`
*/
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long {
val task = pollBuffer()
fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
val task = when (stealingMode) {
STEAL_ANY -> pollBuffer()
else -> stealWithExclusiveMode(stealingMode)
}

if (task != null) {
stolenTaskRef.element = task
return TASK_STOLEN
}
return tryStealLastScheduled(stolenTaskRef, blockingOnly = false)
return tryStealLastScheduled(stealingMode, stolenTaskRef)
}

fun tryStealBlocking(stolenTaskRef: ObjectRef<Task?>): Long {
// Steal only tasks of a particular kind, potentially invoking full queue scan
private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? {
var start = consumerIndex.value
val end = producerIndex.value

while (start != end && blockingTasksInBuffer.value > 0) {
stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue
return TASK_STOLEN
val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY
// Bail out if there is no blocking work for us
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
while (start != end) {
return tryExtractFromTheMiddle(start++, onlyBlocking) ?: continue
}
return tryStealLastScheduled(stolenTaskRef, blockingOnly = true)

return null
}

// Polls for blocking task, invoked only by the owner
fun pollBlocking(): Task? {
// NB: ONLY for runSingleTask method
fun pollBlocking(): Task? = pollWithExclusiveMode(onlyBlocking = true /* only blocking */)

// Polls for CPU task, invoked only by the owner
// NB: ONLY for runSingleTask method
fun pollCpu(): Task? = pollWithExclusiveMode(onlyBlocking = false /* only cpu */)

private fun pollWithExclusiveMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
while (true) { // Poll the slot
val lastScheduled = lastScheduledTask.value ?: break
if (!lastScheduled.isBlocking) break
if (lastScheduled.isBlocking != onlyBlocking) break
if (lastScheduledTask.compareAndSet(lastScheduled, null)) {
return lastScheduled
} // Failed -> someone else stole it
}

// Failed to poll the slot, scan the queue
val start = consumerIndex.value
var end = producerIndex.value

while (start != end && blockingTasksInBuffer.value > 0) {
val task = tryExtractBlockingTask(--end)
// Bail out if there is no blocking work for us
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still a mystery to me why we don't do it on every iteration any longer. Could you drop a comment in the code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea was to check it once and then look for the task. The only scenario where the task is not found is when another thread decides to steal a blocking task from the very same queue, which is not very likely. Effectively, I traded a negligible part of "slow path" ("unlikely event happened") performance for the same negligible benefit on "fast path".

It's unlikely to matter in reality, so readability/maintainability must be preferred; I'll rollback the change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular place still has the change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I would be okay with it if it had a corresponding comment and, preferably, was in a separate commit (this one is a bit loaded with changes already).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I probably have to stop pushing any changes around Friday's evenings 😅

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, you could embrace the flow, go for it, and publish a release today.

while (start != end) {
val task = tryExtractFromTheMiddle(--end, onlyBlocking)
if (task != null) {
return task
}
}
return null
}

private fun tryExtractBlockingTask(index: Int): Task? {
private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? {
val arrayIndex = index and MASK
val value = buffer[arrayIndex]
if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
blockingTasksInBuffer.decrementAndGet()
if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
if (onlyBlocking) blockingTasksInBuffer.decrementAndGet()
return value
}
return null
Expand All @@ -170,10 +199,12 @@ internal class WorkQueue {
/**
* Contract on return value is the same as for [trySteal]
*/
private fun tryStealLastScheduled(stolenTaskRef: ObjectRef<Task?>, blockingOnly: Boolean): Long {
private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
while (true) {
val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
if ((lastScheduled.maskForStealingMode and stealingMode) == 0) {
return NOTHING_TO_STEAL
}

// TODO time wraparound ?
val time = schedulerTimeSource.nanoTime()
Expand Down
Loading