diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index e880916cc6..848efc0ce0 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -310,7 +310,9 @@ public abstract interface class kotlinx/coroutines/DisposableHandle { } public final class kotlinx/coroutines/EventLoopKt { + public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z public static final fun processNextEventInCurrentThread ()J + public static final fun runSingleTaskFromCurrentSystemDispatcher ()J } public final class kotlinx/coroutines/ExceptionsKt { diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 214480ea70..8d814d566d 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -13,6 +13,16 @@ import kotlin.jvm.* * The result of .limitedParallelism(x) call, a dispatcher * that wraps the given dispatcher, but limits the parallelism level, while * trying to emulate fairness. + * + * ### Implementation details + * + * By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks + * to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and + * dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt + * in order to avoid starvation of the underlying dispatcher. + * + * Such behavior is crucial to be compatible with any underlying dispatcher implementation without + * direct cooperation. */ internal class LimitedDispatcher( private val dispatcher: CoroutineDispatcher, diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 1ee651aa41..7d1078cf6f 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -4,6 +4,10 @@ package kotlinx.coroutines +import kotlinx.coroutines.Runnable +import kotlinx.coroutines.scheduling.* +import kotlinx.coroutines.scheduling.CoroutineScheduler + internal actual abstract class EventLoopImplPlatform: EventLoop() { protected abstract val thread: Thread @@ -45,6 +49,80 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr */ @InternalCoroutinesApi public fun processNextEventInCurrentThread(): Long = + // This API is used in Ktor for serverless integration where a single thread awaits a blocking call + // (and, to avoid actual blocking, does something via this call), see #850 ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() + +/** + * Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]). + * Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again + * (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all. + * + * ### Invariants + * + * - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher, + * [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended + * up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO] + * **are not** executed[1]. + * - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only. + * + * [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API, + * and we cannot leave this method without leaving thread in its original state. + * + * ### Rationale + * + * This is an internal API that is intended to replace IDEA's core FJP decomposition. + * The following API is provided by IDEA core: + * ``` + * runDecomposedTaskAndJoinIt { // <- non-suspending call + * // spawn as many tasks as needed + * // these tasks can also invoke 'runDecomposedTaskAndJoinIt' + * } + * ``` + * The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself, + * thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions, + * 'runDecomposedTaskAndJoinIt' should not just block but also **help** execute the task or other tasks + * until an arbitrary condition is satisfied. + * + * See #3439 for additional details. + * + * ### Limitations and caveats + * + * - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread + * - Is not 100% effective, because the caller should somehow "wait" (or do other work) for [Long] returned nanoseconds + * even when work arrives immediately after returning from this method. + * - When there is no more work, it's up to the caller to decide what to do. It's important to remember that + * work to current dispatcher may arrive **later** from external sources [1] + * + * [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires + * a tremendous effort. + * + * @throws IllegalStateException if the current thread is not system dispatcher thread + */ +@InternalCoroutinesApi +@DelicateCoroutinesApi +@PublishedApi +internal fun runSingleTaskFromCurrentSystemDispatcher(): Long { + val thread = Thread.currentThread() + if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread") + return thread.runSingleTask() +} + +/** + * Checks whether the given thread belongs to Dispatchers.IO. + * Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread + * may change this status when switching between tasks. + * + * This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic + * purposes, and is declared as an extension only to avoid top-level scope pollution. + */ +@InternalCoroutinesApi +@DelicateCoroutinesApi +@PublishedApi +internal fun Thread.isIoDispatcherThread(): Boolean { + if (this !is CoroutineScheduler.Worker) return false + return isIo() +} + diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 2bad139fe6..e08f3deedd 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -720,6 +720,30 @@ internal class CoroutineScheduler( tryReleaseCpu(WorkerState.TERMINATED) } + /** + * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details. + * This is a fine-tailored method for a specific use-case not expected to be used widely. + */ + fun runSingleTask(): Long { + val stateSnapshot = state + val isCpuThread = state == WorkerState.CPU_ACQUIRED + val task = if (isCpuThread) { + findCpuTask() + } else { + findBlockingTask() + } + if (task == null) { + if (minDelayUntilStealableTaskNs == 0L) return -1L + return minDelayUntilStealableTaskNs + } + runSafely(task) + if (!isCpuThread) decrementBlockingTasks() + assert { state == stateSnapshot} + return 0L + } + + fun isIo() = state == WorkerState.BLOCKING + // Counterpart to "tryUnpark" private fun tryPark() { if (!inStack()) { @@ -879,9 +903,21 @@ internal class CoroutineScheduler( * * Check if our queue has one (maybe mixed in with CPU tasks) * * Poll global and try steal */ + return findBlockingTask() + } + + // NB: ONLY for runSingleTask method + private fun findBlockingTask(): Task? { return localQueue.pollBlocking() ?: globalBlockingQueue.removeFirstOrNull() - ?: trySteal(blockingOnly = true) + ?: trySteal(STEAL_BLOCKING_ONLY) + } + + // NB: ONLY for runSingleTask method + private fun findCpuTask(): Task? { + return localQueue.pollCpu() + ?: globalBlockingQueue.removeFirstOrNull() + ?: trySteal(STEAL_CPU_ONLY) } private fun findAnyTask(scanLocalQueue: Boolean): Task? { @@ -897,7 +933,7 @@ internal class CoroutineScheduler( } else { pollGlobalQueues()?.let { return it } } - return trySteal(blockingOnly = false) + return trySteal(STEAL_ANY) } private fun pollGlobalQueues(): Task? { @@ -910,7 +946,7 @@ internal class CoroutineScheduler( } } - private fun trySteal(blockingOnly: Boolean): Task? { + private fun trySteal(stealingMode: StealingMode): Task? { val created = createdWorkers // 0 to await an initialization and 1 to avoid excess stealing on single-core machines if (created < 2) { @@ -924,11 +960,7 @@ internal class CoroutineScheduler( if (currentIndex > created) currentIndex = 1 val worker = workers[currentIndex] if (worker !== null && worker !== this) { - val stealResult = if (blockingOnly) { - worker.localQueue.tryStealBlocking(stolenTask) - } else { - worker.localQueue.trySteal(stolenTask) - } + val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask) if (stealResult == TASK_STOLEN) { val result = stolenTask.element stolenTask.element = null diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index 611f2b108a..a185410ab2 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -16,6 +16,14 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default internal const val TASK_STOLEN = -1L internal const val NOTHING_TO_STEAL = -2L +internal typealias StealingMode = Int +internal const val STEAL_ANY: StealingMode = 3 +internal const val STEAL_CPU_ONLY: StealingMode = 2 +internal const val STEAL_BLOCKING_ONLY: StealingMode = 1 + +internal inline val Task.maskForStealingMode: Int + get() = if (isBlocking) STEAL_BLOCKING_ONLY else STEAL_CPU_ONLY + /** * Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity. * At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue) @@ -107,42 +115,63 @@ internal class WorkQueue { * * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal. + * + * [StealingMode] controls what tasks to steal: + * * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen + * * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task, which is used by the scheduler when helping in Dispatchers.IO mode + * * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher` */ - fun trySteal(stolenTaskRef: ObjectRef): Long { - val task = pollBuffer() + fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef): Long { + val task = when (stealingMode) { + STEAL_ANY -> pollBuffer() + else -> stealWithExclusiveMode(stealingMode) + } + if (task != null) { stolenTaskRef.element = task return TASK_STOLEN } - return tryStealLastScheduled(stolenTaskRef, blockingOnly = false) + return tryStealLastScheduled(stealingMode, stolenTaskRef) } - fun tryStealBlocking(stolenTaskRef: ObjectRef): Long { + // Steal only tasks of a particular kind, potentially invoking full queue scan + private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? { var start = consumerIndex.value val end = producerIndex.value - - while (start != end && blockingTasksInBuffer.value > 0) { - stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue - return TASK_STOLEN + val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY + // Bail out if there is no blocking work for us + while (start != end) { + if (onlyBlocking && blockingTasksInBuffer.value == 0) return null + return tryExtractFromTheMiddle(start++, onlyBlocking) ?: continue } - return tryStealLastScheduled(stolenTaskRef, blockingOnly = true) + + return null } // Polls for blocking task, invoked only by the owner - fun pollBlocking(): Task? { + // NB: ONLY for runSingleTask method + fun pollBlocking(): Task? = pollWithExclusiveMode(onlyBlocking = true /* only blocking */) + + // Polls for CPU task, invoked only by the owner + // NB: ONLY for runSingleTask method + fun pollCpu(): Task? = pollWithExclusiveMode(onlyBlocking = false /* only cpu */) + + private fun pollWithExclusiveMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? { while (true) { // Poll the slot val lastScheduled = lastScheduledTask.value ?: break - if (!lastScheduled.isBlocking) break + if (lastScheduled.isBlocking != onlyBlocking) break if (lastScheduledTask.compareAndSet(lastScheduled, null)) { return lastScheduled } // Failed -> someone else stole it } + // Failed to poll the slot, scan the queue val start = consumerIndex.value var end = producerIndex.value - - while (start != end && blockingTasksInBuffer.value > 0) { - val task = tryExtractBlockingTask(--end) + // Bail out if there is no blocking work for us + while (start != end) { + if (onlyBlocking && blockingTasksInBuffer.value == 0) return null + val task = tryExtractFromTheMiddle(--end, onlyBlocking) if (task != null) { return task } @@ -150,11 +179,11 @@ internal class WorkQueue { return null } - private fun tryExtractBlockingTask(index: Int): Task? { + private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? { val arrayIndex = index and MASK val value = buffer[arrayIndex] - if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) { - blockingTasksInBuffer.decrementAndGet() + if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) { + if (onlyBlocking) blockingTasksInBuffer.decrementAndGet() return value } return null @@ -170,10 +199,12 @@ internal class WorkQueue { /** * Contract on return value is the same as for [trySteal] */ - private fun tryStealLastScheduled(stolenTaskRef: ObjectRef, blockingOnly: Boolean): Long { + private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef): Long { while (true) { val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL - if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL + if ((lastScheduled.maskForStealingMode and stealingMode) == 0) { + return NOTHING_TO_STEAL + } // TODO time wraparound ? val time = schedulerTimeSource.nanoTime() diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt new file mode 100644 index 0000000000..22b9b02916 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.AVAILABLE_PROCESSORS +import org.junit.Test +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.AtomicInteger +import kotlin.random.* +import kotlin.random.Random +import kotlin.test.* +import kotlin.time.* + +class CoroutineSchedulerInternalApiStressTest : TestBase() { + + @Test(timeout = 120_000L) + fun testHelpDefaultIoIsIsolated() = repeat(100 * stressTestMultiplierSqrt) { + val ioTaskMarker = ThreadLocal.withInitial { false } + runTest { + val jobToComplete = Job() + val expectedIterations = 100 + val completionLatch = CountDownLatch(1) + val tasksToCompleteJob = AtomicInteger(expectedIterations) + val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap()) + val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap()) + + val barrier = CyclicBarrier(AVAILABLE_PROCESSORS) + val spawners = ArrayList() + repeat(AVAILABLE_PROCESSORS - 1) { + // Launch CORES - 1 spawners + spawners += launch(Dispatchers.Default) { + barrier.await() + repeat(expectedIterations) { + launch { + val tasksLeft = tasksToCompleteJob.decrementAndGet() + if (tasksLeft < 0) return@launch // Leftovers are being executed all over the place + observedDefaultThreads.add(Thread.currentThread()) + if (tasksLeft == 0) { + // Verify threads first + try { + assertFalse(observedIoThreads.containsAll(observedDefaultThreads)) + } finally { + jobToComplete.complete() + } + } + } + + // Sometimes launch an IO task to mess with a scheduler + if (Random.nextInt(0..9) == 0) { + launch(Dispatchers.IO) { + ioTaskMarker.set(true) + observedIoThreads.add(Thread.currentThread()) + assertTrue(Thread.currentThread().isIoDispatcherThread()) + } + } + } + completionLatch.await() + } + } + + withContext(Dispatchers.Default) { + barrier.await() + var timesHelped = 0 + while (!jobToComplete.isCompleted) { + val result = runSingleTaskFromCurrentSystemDispatcher() + assertFalse(ioTaskMarker.get()) + if (result == 0L) { + ++timesHelped + continue + } else if (result >= 0L) { + Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis()) + } else { + Thread.sleep(10) + } + } + completionLatch.countDown() +// assertEquals(100, timesHelped) +// assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString()) + } + } + } +} + diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt index 864e65a3a9..f690d3882f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt @@ -107,3 +107,5 @@ internal fun GlobalQueue.drain(): List { } return result } + +internal fun WorkQueue.trySteal(stolenTaskRef: ObjectRef): Long = trySteal(STEAL_ANY, stolenTaskRef)