Skip to content

Preliminary prototype of #3439 #3572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ public abstract interface class kotlinx/coroutines/DisposableHandle {
}

public final class kotlinx/coroutines/EventLoopKt {
public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z
public static final fun processNextEventInCurrentThread ()J
public static final fun runSingleTaskFromCurrentSystemDispatcher ()J
}

public final class kotlinx/coroutines/ExceptionsKt {
Expand Down
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import kotlin.jvm.*
* The result of .limitedParallelism(x) call, a dispatcher
* that wraps the given dispatcher, but limits the parallelism level, while
* trying to emulate fairness.
*
* ### Implementation details
*
* By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
* to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
* dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
* in order to avoid starvation of the underlying dispatcher.
*
* Such invariant is crucial in order to be compatible with any underlying dispatcher implementation without
* direct cooperation.
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
Expand Down
78 changes: 78 additions & 0 deletions kotlinx-coroutines-core/jvm/src/EventLoop.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package kotlinx.coroutines

import kotlinx.coroutines.Runnable
import kotlinx.coroutines.scheduling.*
import kotlinx.coroutines.scheduling.CoroutineScheduler

internal actual abstract class EventLoopImplPlatform: EventLoop() {
protected abstract val thread: Thread

Expand Down Expand Up @@ -45,6 +49,80 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr
*/
@InternalCoroutinesApi
public fun processNextEventInCurrentThread(): Long =
// This API is used in Ktor for serverless integration where a single thread awaits a blocking call
// (and, to avoid actual blocking, does something via this call), see #850
ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE

internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()

/**
* Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]).
* Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again
* (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all.
*
* ### Invariants
*
* - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher,
* [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended
* up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO]
* **are not** executed[1].
* - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only.
*
* [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API,
* and we cannot leave this method without leaving thread in its original state.
*
* ### Rationale
*
* This is an internal API that is intended to replace IDEA's core FJP decomposition.
* The following API is provided by IDEA core:
* ```
* runDecomposedTaskAndJoinIt { // <- non-suspending call
* // spawn as many task as needed
* // these tasks can also invoke 'runDecomposedTaskAndJoinIt'
* }
* ```
* The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself,
* thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions,
* 'runDecomposedTaskAndJoinIt' should not only block, but also **help** executing the task or other tasks
* until an arbitrary condition is satisfied.
*
* See #3439 for additional details.
*
* ### Limitations and caveats
*
* - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread
* - Is not 100% effective, it waits for [Long] returned nanoseconds even when work arrives immediately after returning
* from this method
* - When there is no more work, it's up to the caller to decide what to do. It's important to account that
* work to current dispatcher may arrive **later** from external sources [1]
*
* [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires
* a tremendous effort.
*
* @throws IllegalStateException if the current thread is not system dispatcher thread
*/
@InternalCoroutinesApi
@DelicateCoroutinesApi
@PublishedApi
internal fun runSingleTaskFromCurrentSystemDispatcher(): Long {
val thread = Thread.currentThread()
if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread")
return thread.runSingleTask()
}

/**
* Checks whether the given thread belongs to Dispatchers.IO.
* Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread
* may change this status when switching between tasks.
*
* This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic
* purposes, and is declared as an extension only to avoid top-level scope pollution.
*/
@InternalCoroutinesApi
@DelicateCoroutinesApi
@PublishedApi
internal fun Thread.isIoDispatcherThread(): Boolean {
if (this !is CoroutineScheduler.Worker) return false
return isIo()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very similar to the functions at the bottom of CoroutineScheduler.kt and would probably enjoy their company.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put it here solely because processNextEventInCurrentThread and I wanted to keep these "internal event loop spinners" in a single place to keep track of


48 changes: 40 additions & 8 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,30 @@ internal class CoroutineScheduler(
tryReleaseCpu(WorkerState.TERMINATED)
}

/**
* See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
* This is a fine-tailored method for a specific use-case not expected to be used widely.
*/
fun runSingleTask(): Long {
val stateSnapshot = state
val isCpuThread = state == WorkerState.CPU_ACQUIRED
val task = if (isCpuThread) {
findCpuTask()
} else {
findBlockingTask()
}
if (task == null) {
if (minDelayUntilStealableTaskNs == 0L) return -1L
return minDelayUntilStealableTaskNs
}
runSafely(task)
if (!isCpuThread) decrementBlockingTasks()
assert { state == stateSnapshot}
return 0L
}

fun isIo() = state == WorkerState.BLOCKING

// Counterpart to "tryUnpark"
private fun tryPark() {
if (!inStack()) {
Expand Down Expand Up @@ -879,9 +903,21 @@ internal class CoroutineScheduler(
* * Check if our queue has one (maybe mixed in with CPU tasks)
* * Poll global and try steal
*/
return findBlockingTask()
}

// NB: ONLY for runSingleTask method
private fun findBlockingTask(): Task? {
return localQueue.pollBlocking()
?: globalBlockingQueue.removeFirstOrNull()
?: trySteal(blockingOnly = true)
?: trySteal(STEAL_BLOCKING_ONLY)
}

// NB: ONLY for runSingleTask method
private fun findCpuTask(): Task? {
return localQueue.pollCpu()
?: globalBlockingQueue.removeFirstOrNull()
?: trySteal(STEAL_CPU_ONLY)
}

private fun findAnyTask(scanLocalQueue: Boolean): Task? {
Expand All @@ -897,7 +933,7 @@ internal class CoroutineScheduler(
} else {
pollGlobalQueues()?.let { return it }
}
return trySteal(blockingOnly = false)
return trySteal(STEAL_ANY)
}

private fun pollGlobalQueues(): Task? {
Expand All @@ -910,7 +946,7 @@ internal class CoroutineScheduler(
}
}

private fun trySteal(blockingOnly: Boolean): Task? {
private fun trySteal(stealingMode: StealingMode): Task? {
val created = createdWorkers
// 0 to await an initialization and 1 to avoid excess stealing on single-core machines
if (created < 2) {
Expand All @@ -924,11 +960,7 @@ internal class CoroutineScheduler(
if (currentIndex > created) currentIndex = 1
val worker = workers[currentIndex]
if (worker !== null && worker !== this) {
val stealResult = if (blockingOnly) {
worker.localQueue.tryStealBlocking(stolenTask)
} else {
worker.localQueue.trySteal(stolenTask)
}
val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
if (stealResult == TASK_STOLEN) {
val result = stolenTask.element
stolenTask.element = null
Expand Down
75 changes: 58 additions & 17 deletions kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default
internal const val TASK_STOLEN = -1L
internal const val NOTHING_TO_STEAL = -2L

internal typealias StealingMode = Int
internal const val STEAL_ANY: StealingMode = -1
internal const val STEAL_CPU_ONLY: StealingMode = 0
internal const val STEAL_BLOCKING_ONLY: StealingMode = 1

/**
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
Expand Down Expand Up @@ -107,25 +112,37 @@ internal class WorkQueue {
*
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
*
* [StealingMode] controls what tasks to steal:
* * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen
* * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task which is used by scheduler when helping in Dispatchers.IO mode
* * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher`
*/
fun trySteal(stolenTaskRef: ObjectRef<Task?>): Long {
val task = pollBuffer()
fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
val task = when (stealingMode) {
STEAL_ANY -> pollBuffer()
else -> stealWithExclusiveMode(stealingMode)
}

if (task != null) {
stolenTaskRef.element = task
return TASK_STOLEN
}
return tryStealLastScheduled(stolenTaskRef, blockingOnly = false)
return tryStealLastScheduled(stealingMode, stolenTaskRef)
}

fun tryStealBlocking(stolenTaskRef: ObjectRef<Task?>): Long {
// Steal only tasks of a particular kind, potentially invoking full queue scan
private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? {
var start = consumerIndex.value
val end = producerIndex.value

while (start != end && blockingTasksInBuffer.value > 0) {
stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue
return TASK_STOLEN
val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY
// CPU or (BLOCKING & hasBlocking)
val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0
while (start != end && shouldProceed) {
return tryExtractFromTheMiddle(start++, onlyBlocking) ?: continue
}
return tryStealLastScheduled(stolenTaskRef, blockingOnly = true)

return null
}

// Polls for blocking task, invoked only by the owner
Expand All @@ -138,23 +155,41 @@ internal class WorkQueue {
} // Failed -> someone else stole it
}

return pollWithMode(onlyBlocking = true /* only blocking */)
}

fun pollCpu(): Task? {
while (true) { // Poll the slot
val lastScheduled = lastScheduledTask.value ?: break
if (lastScheduled.isBlocking) break
if (lastScheduledTask.compareAndSet(lastScheduled, null)) {
return lastScheduled
} // Failed -> someone else stole it
}

return pollWithMode(onlyBlocking = false /* only cpu */)
}

private fun pollWithMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? {
val start = consumerIndex.value
var end = producerIndex.value

while (start != end && blockingTasksInBuffer.value > 0) {
val task = tryExtractBlockingTask(--end)
// CPU or (BLOCKING & hasBlocking)
val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0
while (start != end && shouldProceed) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldProceed doesn't update. It doesn't feel like it's intentional, but if it is, it can be moved to an if wrapping the while.

val task = tryExtractFromTheMiddle(--end, onlyBlocking)
if (task != null) {
return task
}
}
return null
}

private fun tryExtractBlockingTask(index: Int): Task? {
private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? {
if (onlyBlocking && blockingTasksInBuffer.value == 0) return null
val arrayIndex = index and MASK
val value = buffer[arrayIndex]
if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
blockingTasksInBuffer.decrementAndGet()
if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) {
if (onlyBlocking) blockingTasksInBuffer.decrementAndGet()
return value
}
return null
Expand All @@ -170,10 +205,16 @@ internal class WorkQueue {
/**
* Contract on return value is the same as for [trySteal]
*/
private fun tryStealLastScheduled(stolenTaskRef: ObjectRef<Task?>, blockingOnly: Boolean): Long {
private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef<Task?>): Long {
while (true) {
val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL
if (lastScheduled.isBlocking) {
if (stealingMode == STEAL_CPU_ONLY) {
return NOTHING_TO_STEAL
}
} else if (stealingMode == STEAL_BLOCKING_ONLY) {
return NOTHING_TO_STEAL
}

// TODO time wraparound ?
val time = schedulerTimeSource.nanoTime()
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/test/TestBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public actual typealias TestResult = Unit
*/
public actual open class TestBase(private var disableOutCheck: Boolean) {

actual constructor(): this(false)
actual constructor(): this(true)

public actual val isBoundByJsTestTimeout = false
private var actionIndex = AtomicInteger()
Expand Down
Loading