From 20daaa7c04ddebe16a1b3350d9d8e399bda72328 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 22 Nov 2022 11:44:30 +0100 Subject: [PATCH 01/14] Introduce a separate slot for stealing tasks into in CoroutineScheduler It solves two problems: * Stealing into exclusively owned local queue does no longer require and CAS'es or atomic operations where they were previously not needed. It should save a few cycles on the stealing code path * The overall timing perturbations should be slightly better now: previously it was possible for the stolen task to be immediately got stolen again from the stealer thread because it was actually published to owner's queue, but its submission time was never updated Fixes #3416 --- .../jvm/src/scheduling/CoroutineScheduler.kt | 17 ++++++++++---- .../jvm/src/scheduling/WorkQueue.kt | 22 +++++++++---------- .../test/scheduling/WorkQueueStressTest.kt | 19 +++++++++------- .../jvm/test/scheduling/WorkQueueTest.kt | 22 ++++++++++++------- 4 files changed, 49 insertions(+), 31 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index e08d1cef90..ab6e2957be 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* import java.util.concurrent.locks.* +import kotlin.jvm.internal.Ref.ObjectRef import kotlin.math.* import kotlin.random.* @@ -598,6 +599,12 @@ internal class CoroutineScheduler( @JvmField val localQueue: WorkQueue = WorkQueue() + /** + * Slot that is used to steal tasks into to avoid re-adding them + * to the local queue. See [trySteal] + */ + private val stolenTask: ObjectRef = ObjectRef() + /** * Worker state. **Updated only by this worker thread**. * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken. @@ -617,7 +624,7 @@ internal class CoroutineScheduler( /** * It is set to the termination deadline when started doing [park] and it reset - * when there is a task. It servers as protection against spurious wakeups of parkNanos. + * when there is a task. It serves as protection against spurious wakeups of parkNanos. */ private var terminationDeadline = 0L @@ -920,12 +927,14 @@ internal class CoroutineScheduler( if (worker !== null && worker !== this) { assert { localQueue.size == 0 } val stealResult = if (blockingOnly) { - localQueue.tryStealBlockingFrom(victim = worker.localQueue) + localQueue.tryStealBlockingFrom(victim = worker.localQueue, stolenTask) } else { - localQueue.tryStealFrom(victim = worker.localQueue) + localQueue.tryStealFrom(victim = worker.localQueue, stolenTask) } if (stealResult == TASK_STOLEN) { - return localQueue.poll() + val result = stolenTask.element + stolenTask.element = null + return result } else if (stealResult > 0) { minDelay = min(minDelay, stealResult) } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index 6a9a8a5a31..4ee53afdcf 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.scheduling import kotlinx.atomicfu.* import kotlinx.coroutines.* import java.util.concurrent.atomic.* +import kotlin.jvm.internal.Ref.ObjectRef internal const val BUFFER_CAPACITY_BASE = 7 internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE @@ -31,7 +32,7 @@ internal const val NOTHING_TO_STEAL = -2L * (scheduler workers without a CPU permit steal blocking tasks via this mechanism). Such property enforces us to use CAS in * order to properly claim value from the buffer. * Moreover, [Task] objects are reusable, so it may seem that this queue is prone to ABA problem. - * Indeed it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless. + * Indeed, it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless. * I have discovered a truly marvelous proof of this, which this KDoc is too narrow to contain. */ internal class WorkQueue { @@ -100,23 +101,22 @@ internal class WorkQueue { } /** - * Tries stealing from [victim] queue into this queue. + * Tries stealing from [victim] queue into the [stolenTaskRef] argument. * * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal. */ - fun tryStealFrom(victim: WorkQueue): Long { + fun tryStealFrom(victim: WorkQueue, stolenTaskRef: ObjectRef): Long { assert { bufferSize == 0 } val task = victim.pollBuffer() if (task != null) { - val notAdded = add(task) - assert { notAdded == null } + stolenTaskRef.element = task return TASK_STOLEN } - return tryStealLastScheduled(victim, blockingOnly = false) + return tryStealLastScheduled(victim, stolenTaskRef, blockingOnly = false) } - fun tryStealBlockingFrom(victim: WorkQueue): Long { + fun tryStealBlockingFrom(victim: WorkQueue, stolenTaskRef: ObjectRef): Long { assert { bufferSize == 0 } var start = victim.consumerIndex.value val end = victim.producerIndex.value @@ -128,13 +128,13 @@ internal class WorkQueue { val value = buffer[index] if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { victim.blockingTasksInBuffer.decrementAndGet() - add(value) + stolenTaskRef.element = value return TASK_STOLEN } else { ++start } } - return tryStealLastScheduled(victim, blockingOnly = true) + return tryStealLastScheduled(victim, stolenTaskRef, blockingOnly = true) } fun offloadAllWorkTo(globalQueue: GlobalQueue) { @@ -147,7 +147,7 @@ internal class WorkQueue { /** * Contract on return value is the same as for [tryStealFrom] */ - private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long { + private fun tryStealLastScheduled(victim: WorkQueue, stolenTaskRef: ObjectRef, blockingOnly: Boolean): Long { while (true) { val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL @@ -164,7 +164,7 @@ internal class WorkQueue { * and dispatched another one. In the latter case we should retry to avoid missing task. */ if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) { - add(lastScheduled) + stolenTaskRef.element = lastScheduled return TASK_STOLEN } continue diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt index 5e170c9f6b..0ee5565317 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt @@ -9,6 +9,7 @@ import org.junit.* import org.junit.Test import java.util.concurrent.* import kotlin.concurrent.* +import kotlin.jvm.internal.* import kotlin.test.* class WorkQueueStressTest : TestBase() { @@ -52,17 +53,18 @@ class WorkQueueStressTest : TestBase() { for (i in 0 until stealersCount) { threads += thread(name = "stealer $i") { + val ref = Ref.ObjectRef() val myQueue = WorkQueue() startLatch.await() while (!producerFinished || producerQueue.size != 0) { - stolenTasks[i].addAll(myQueue.drain().map { task(it) }) - myQueue.tryStealFrom(victim = producerQueue) + stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) }) + myQueue.tryStealFrom(victim = producerQueue, ref) } // Drain last element which is not counted in buffer - stolenTasks[i].addAll(myQueue.drain().map { task(it) }) - myQueue.tryStealFrom(producerQueue) - stolenTasks[i].addAll(myQueue.drain().map { task(it) }) + stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) }) + myQueue.tryStealFrom(producerQueue, ref) + stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) }) } } @@ -89,13 +91,14 @@ class WorkQueueStressTest : TestBase() { val stolen = GlobalQueue() threads += thread(name = "stealer") { val myQueue = WorkQueue() + val ref = Ref.ObjectRef() startLatch.await() while (stolen.size != offerIterations) { - if (myQueue.tryStealFrom(producerQueue) != NOTHING_TO_STEAL) { - stolen.addAll(myQueue.drain().map { task(it) }) + if (myQueue.tryStealFrom(producerQueue, ref) != NOTHING_TO_STEAL) { + stolen.addAll(myQueue.drain(ref).map { task(it) }) } } - stolen.addAll(myQueue.drain().map { task(it) }) + stolen.addAll(myQueue.drain(ref).map { task(it) }) } startLatch.countDown() diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt index 7acd1620f4..dbd7db98d0 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt @@ -7,6 +7,7 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.* import org.junit.* import org.junit.Test +import kotlin.jvm.internal.Ref.ObjectRef import kotlin.test.* class WorkQueueTest : TestBase() { @@ -27,7 +28,7 @@ class WorkQueueTest : TestBase() { fun testLastScheduledComesFirst() { val queue = WorkQueue() (1L..4L).forEach { queue.add(task(it)) } - assertEquals(listOf(4L, 1L, 2L, 3L), queue.drain()) + assertEquals(listOf(4L, 1L, 2L, 3L), queue.drain(ObjectRef())) } @Test @@ -38,9 +39,9 @@ class WorkQueueTest : TestBase() { (0 until size).forEach { queue.add(task(it))?.let { t -> offload.addLast(t) } } val expectedResult = listOf(129L) + (0L..126L).toList() - val actualResult = queue.drain() + val actualResult = queue.drain(ObjectRef()) assertEquals(expectedResult, actualResult) - assertEquals((0L until size).toSet().minus(expectedResult), offload.drain().toSet()) + assertEquals((0L until size).toSet().minus(expectedResult.toSet()), offload.drain().toSet()) } @Test @@ -61,23 +62,28 @@ class WorkQueueTest : TestBase() { timeSource.step(3) val stealer = WorkQueue() - assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim)) - assertEquals(arrayListOf(1L), stealer.drain()) + val ref = ObjectRef() + assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim, ref)) + assertEquals(arrayListOf(1L), stealer.drain(ref)) - assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim)) - assertEquals(arrayListOf(2L), stealer.drain()) + assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim, ref)) + assertEquals(arrayListOf(2L), stealer.drain(ref)) } } internal fun task(n: Long) = TaskImpl(Runnable {}, n, NonBlockingContext) -internal fun WorkQueue.drain(): List { +internal fun WorkQueue.drain(ref: ObjectRef): List { var task: Task? = poll() val result = arrayListOf() while (task != null) { result += task.submissionTime task = poll() } + if (ref.element != null) { + result += ref.element!!.submissionTime + ref.element = null + } return result } From 54f5766aeca4f617b7f40cce5e18280ad86c0eec Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 22 Nov 2022 15:34:26 +0100 Subject: [PATCH 02/14] Move victim argument in WorkQueue into receiver position to simplify the overall code structure --- .../jvm/src/scheduling/CoroutineScheduler.kt | 8 ++--- .../jvm/src/scheduling/WorkQueue.kt | 31 +++++++++---------- .../test/scheduling/WorkQueueStressTest.kt | 6 ++-- .../jvm/test/scheduling/WorkQueueTest.kt | 4 +-- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index ab6e2957be..b12da580e0 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -265,7 +265,7 @@ internal class CoroutineScheduler( /** * Long describing state of workers in this pool. - * Currently includes created, CPU-acquired and blocking workers each occupying [BLOCKING_SHIFT] bits. + * Currently, includes created, CPU-acquired and blocking workers each occupying [BLOCKING_SHIFT] bits. */ private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT) private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt() @@ -273,7 +273,7 @@ internal class CoroutineScheduler( private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt() private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt() - public inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt() + inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt() // Guarded by synchronization private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet()) @@ -927,9 +927,9 @@ internal class CoroutineScheduler( if (worker !== null && worker !== this) { assert { localQueue.size == 0 } val stealResult = if (blockingOnly) { - localQueue.tryStealBlockingFrom(victim = worker.localQueue, stolenTask) + worker.localQueue.tryStealBlocking(stolenTask) } else { - localQueue.tryStealFrom(victim = worker.localQueue, stolenTask) + worker.localQueue.trySteal(stolenTask) } if (stealResult == TASK_STOLEN) { val result = stolenTask.element diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index 4ee53afdcf..0692866d2d 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -101,40 +101,37 @@ internal class WorkQueue { } /** - * Tries stealing from [victim] queue into the [stolenTaskRef] argument. + * Tries stealing from this queue into the [stolenTaskRef] argument. * * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal. */ - fun tryStealFrom(victim: WorkQueue, stolenTaskRef: ObjectRef): Long { - assert { bufferSize == 0 } - val task = victim.pollBuffer() + fun trySteal(stolenTaskRef: ObjectRef): Long { + val task = pollBuffer() if (task != null) { stolenTaskRef.element = task return TASK_STOLEN } - return tryStealLastScheduled(victim, stolenTaskRef, blockingOnly = false) + return tryStealLastScheduled(stolenTaskRef, blockingOnly = false) } - fun tryStealBlockingFrom(victim: WorkQueue, stolenTaskRef: ObjectRef): Long { - assert { bufferSize == 0 } - var start = victim.consumerIndex.value - val end = victim.producerIndex.value - val buffer = victim.buffer + fun tryStealBlocking(stolenTaskRef: ObjectRef): Long { + var start = consumerIndex.value + val end = producerIndex.value while (start != end) { val index = start and MASK - if (victim.blockingTasksInBuffer.value == 0) break + if (blockingTasksInBuffer.value == 0) break val value = buffer[index] if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { - victim.blockingTasksInBuffer.decrementAndGet() + blockingTasksInBuffer.decrementAndGet() stolenTaskRef.element = value return TASK_STOLEN } else { ++start } } - return tryStealLastScheduled(victim, stolenTaskRef, blockingOnly = true) + return tryStealLastScheduled(stolenTaskRef, blockingOnly = true) } fun offloadAllWorkTo(globalQueue: GlobalQueue) { @@ -145,11 +142,11 @@ internal class WorkQueue { } /** - * Contract on return value is the same as for [tryStealFrom] + * Contract on return value is the same as for [trySteal] */ - private fun tryStealLastScheduled(victim: WorkQueue, stolenTaskRef: ObjectRef, blockingOnly: Boolean): Long { + private fun tryStealLastScheduled(stolenTaskRef: ObjectRef, blockingOnly: Boolean): Long { while (true) { - val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL + val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL // TODO time wraparound ? @@ -163,7 +160,7 @@ internal class WorkQueue { * If CAS has failed, either someone else had stolen this task or the owner executed this task * and dispatched another one. In the latter case we should retry to avoid missing task. */ - if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) { + if (lastScheduledTask.compareAndSet(lastScheduled, null)) { stolenTaskRef.element = lastScheduled return TASK_STOLEN } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt index 0ee5565317..5891ee875e 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt @@ -58,12 +58,12 @@ class WorkQueueStressTest : TestBase() { startLatch.await() while (!producerFinished || producerQueue.size != 0) { stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) }) - myQueue.tryStealFrom(victim = producerQueue, ref) + producerQueue.trySteal(ref) } // Drain last element which is not counted in buffer stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) }) - myQueue.tryStealFrom(producerQueue, ref) + producerQueue.trySteal(ref) stolenTasks[i].addAll(myQueue.drain(ref).map { task(it) }) } } @@ -94,7 +94,7 @@ class WorkQueueStressTest : TestBase() { val ref = Ref.ObjectRef() startLatch.await() while (stolen.size != offerIterations) { - if (myQueue.tryStealFrom(producerQueue, ref) != NOTHING_TO_STEAL) { + if (producerQueue.trySteal(ref) != NOTHING_TO_STEAL) { stolen.addAll(myQueue.drain(ref).map { task(it) }) } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt index dbd7db98d0..a4ceef23dc 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt @@ -63,10 +63,10 @@ class WorkQueueTest : TestBase() { val stealer = WorkQueue() val ref = ObjectRef() - assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim, ref)) + assertEquals(TASK_STOLEN, victim.trySteal(ref)) assertEquals(arrayListOf(1L), stealer.drain(ref)) - assertEquals(TASK_STOLEN, stealer.tryStealFrom(victim, ref)) + assertEquals(TASK_STOLEN, victim.trySteal(ref)) assertEquals(arrayListOf(2L), stealer.drain(ref)) } } From ca035a0643411caf355c5425c7ed9d3fab1b619f Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 22 Nov 2022 17:52:23 +0100 Subject: [PATCH 03/14] Fix oversubscription in CoroutineScheduler (-> Dispatchers.Default) Previously, a worker thread unconditionally processed tasks from its own local queue, even if tasks were CPU-intensive, but CPU token was not acquired. Fixes #3418 --- .../jvm/src/scheduling/CoroutineScheduler.kt | 14 +-- .../CoroutineSchedulerOversubscriptionTest.kt | 89 +++++++++++++++++++ 2 files changed, 92 insertions(+), 11 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index b12da580e0..4f403c861c 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -726,7 +726,6 @@ internal class CoroutineScheduler( parkedWorkersStackPush(this) return } - assert { localQueue.size == 0 } workerCtl.value = PARKED // Update value once /* * inStack() prevents spurious wakeups, while workerCtl.value == PARKED @@ -873,15 +872,10 @@ internal class CoroutineScheduler( } } - fun findTask(scanLocalQueue: Boolean): Task? { - if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) + fun findTask(mayHaveLocalTasks: Boolean): Task? { + if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks) // If we can't acquire a CPU permit -- attempt to find blocking task - val task = if (scanLocalQueue) { - localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() - } else { - globalBlockingQueue.removeFirstOrNull() - } - return task ?: trySteal(blockingOnly = true) + return globalBlockingQueue.removeFirstOrNull() ?: trySteal(blockingOnly = true) } private fun findAnyTask(scanLocalQueue: Boolean): Task? { @@ -911,7 +905,6 @@ internal class CoroutineScheduler( } private fun trySteal(blockingOnly: Boolean): Task? { - assert { localQueue.size == 0 } val created = createdWorkers // 0 to await an initialization and 1 to avoid excess stealing on single-core machines if (created < 2) { @@ -925,7 +918,6 @@ internal class CoroutineScheduler( if (currentIndex > created) currentIndex = 1 val worker = workers[currentIndex] if (worker !== null && worker !== this) { - assert { localQueue.size == 0 } val stealResult = if (blockingOnly) { worker.localQueue.tryStealBlocking(stolenTask) } else { diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt new file mode 100644 index 0000000000..0fd6159f9e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import org.junit.Test +import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicInteger + +class CoroutineSchedulerOversubscriptionTest : TestBase() { + + private val inDefault = AtomicInteger(0) + + private fun CountDownLatch.runAndCheck() { + if (inDefault.incrementAndGet() > CORE_POOL_SIZE) { + error("Oversubscription detected") + } + + await() + inDefault.decrementAndGet() + } + + @Test + fun testOverSubscriptionDeterministic() = runTest { + val barrier = CountDownLatch(1) + val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) + // All threads but one + repeat(CORE_POOL_SIZE - 1) { + launch(Dispatchers.Default) { + threadsOccupiedBarrier.await() + barrier.runAndCheck() + } + } + threadsOccupiedBarrier.await() + withContext(Dispatchers.Default) { + // Put a task in a local queue, it will be stolen + launch(Dispatchers.Default) { + barrier.runAndCheck() + } + // Put one more task to trick the local queue check + launch(Dispatchers.Default) { + barrier.runAndCheck() + } + + withContext(Dispatchers.IO) { + try { + // Release the thread + delay(100) + } finally { + barrier.countDown() + } + } + } + } + + @Test + fun testOverSubscriptionStress() = repeat(1000 * stressTestMultiplierSqrt) { + inDefault.set(0) + runTest { + val barrier = CountDownLatch(1) + val threadsOccupiedBarrier = CyclicBarrier(CORE_POOL_SIZE) + // All threads but one + repeat(CORE_POOL_SIZE - 1) { + launch(Dispatchers.Default) { + threadsOccupiedBarrier.await() + barrier.runAndCheck() + } + } + threadsOccupiedBarrier.await() + withContext(Dispatchers.Default) { + // Put a task in a local queue + launch(Dispatchers.Default) { + barrier.runAndCheck() + } + // Put one more task to trick the local queue check + launch(Dispatchers.Default) { + barrier.runAndCheck() + } + + withContext(Dispatchers.IO) { + yield() + barrier.countDown() + } + } + } + } +} From 86c39d6b0df83e6b510c703e51efa8caa569711a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 28 Dec 2022 16:31:36 +0100 Subject: [PATCH 04/14] Fix oversubscription in CoroutineScheduler (-> Dispatchers.Default) Previously, a worker thread unconditionally processed tasks from its own local queue, even if tasks were CPU-intensive, but CPU token was not acquired. Fixes #3418 --- .../jvm/src/scheduling/CoroutineScheduler.kt | 10 ++++-- .../jvm/src/scheduling/WorkQueue.kt | 34 +++++++++++++++++-- .../test/scheduling/WorkQueueStressTest.kt | 4 +-- .../jvm/test/scheduling/WorkQueueTest.kt | 11 ++++++ 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 4f403c861c..829daffc5a 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -874,8 +874,14 @@ internal class CoroutineScheduler( fun findTask(mayHaveLocalTasks: Boolean): Task? { if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks) - // If we can't acquire a CPU permit -- attempt to find blocking task - return globalBlockingQueue.removeFirstOrNull() ?: trySteal(blockingOnly = true) + /* + * If we can't acquire a CPU permit, attempt to find blocking task: + * * Check if our queue has one (maybe mixed in with CPU tasks) + * * Poll global and try steal + */ + return localQueue.pollBlocking() + ?: globalBlockingQueue.removeFirstOrNull() + ?: trySteal(blockingOnly = true) } private fun findAnyTask(scanLocalQueue: Boolean): Task? { diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index 0692866d2d..da8a5443b6 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -47,10 +47,12 @@ internal class WorkQueue { * [T2] changeProducerIndex (3) * [T3] changeConsumerIndex (4) * - * Which can lead to resulting size bigger than actual size at any moment of time. - * This is in general harmless because steal will be blocked by timer + * Which can lead to resulting size being negative or bigger than actual size at any moment of time. + * This is in general harmless because steal will be blocked by timer. + * Negative sizes can be observed only when non-owner reads the size, which happens only + * for diagnostic toString(). */ - internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value + private val bufferSize: Int get() = producerIndex.value - consumerIndex.value internal val size: Int get() = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize private val buffer: AtomicReferenceArray = AtomicReferenceArray(BUFFER_CAPACITY) private val lastScheduledTask = atomic(null) @@ -134,6 +136,32 @@ internal class WorkQueue { return tryStealLastScheduled(stolenTaskRef, blockingOnly = true) } + // Polls for blocking task, invoked only by the owner + fun pollBlocking(): Task? { + while (true) { // Poll the slot + val lastScheduled = lastScheduledTask.value ?: break + if (!lastScheduled.isBlocking) break + if (lastScheduledTask.compareAndSet(lastScheduled, null)) { + return lastScheduled + } // Failed -> someone else stole it + } + + val start = consumerIndex.value + var end = producerIndex.value + + while (start != end) { + --end + val index = end and MASK + if (blockingTasksInBuffer.value == 0) break + val value = buffer[index] + if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { + blockingTasksInBuffer.decrementAndGet() + return value + } + } + return null + } + fun offloadAllWorkTo(globalQueue: GlobalQueue) { lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) } while (pollTo(globalQueue)) { diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt index 5891ee875e..e2562b57ba 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt @@ -41,7 +41,7 @@ class WorkQueueStressTest : TestBase() { threads += thread(name = "producer") { startLatch.await() for (i in 1..offerIterations) { - while (producerQueue.bufferSize > BUFFER_CAPACITY / 2) { + while (producerQueue.size > BUFFER_CAPACITY / 2) { Thread.yield() } @@ -79,7 +79,7 @@ class WorkQueueStressTest : TestBase() { threads += thread(name = "producer") { startLatch.await() for (i in 1..offerIterations) { - while (producerQueue.bufferSize == BUFFER_CAPACITY - 1) { + while (producerQueue.size == BUFFER_CAPACITY - 1) { Thread.yield() } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt index a4ceef23dc..864e65a3a9 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt @@ -69,9 +69,20 @@ class WorkQueueTest : TestBase() { assertEquals(TASK_STOLEN, victim.trySteal(ref)) assertEquals(arrayListOf(2L), stealer.drain(ref)) } + + @Test + fun testPollBlocking() { + val queue = WorkQueue() + assertNull(queue.pollBlocking()) + val blockingTask = blockingTask(1L) + queue.add(blockingTask) + queue.add(task(1L)) + assertSame(blockingTask, queue.pollBlocking()) + } } internal fun task(n: Long) = TaskImpl(Runnable {}, n, NonBlockingContext) +internal fun blockingTask(n: Long) = TaskImpl(Runnable {}, n, BlockingContext) internal fun WorkQueue.drain(ref: ObjectRef): List { var task: Task? = poll() From b814035899e7f5d8a63f8a9ac94d9186ade1c91e Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 28 Dec 2022 18:37:36 +0300 Subject: [PATCH 05/14] Update kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt Co-authored-by: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> --- .../jvm/src/scheduling/CoroutineScheduler.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 829daffc5a..67abe907a7 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -265,7 +265,7 @@ internal class CoroutineScheduler( /** * Long describing state of workers in this pool. - * Currently, includes created, CPU-acquired and blocking workers each occupying [BLOCKING_SHIFT] bits. + * Currently includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits. */ private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT) private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt() From 59c84ec013910c8cca1784ed898fbf53b23181c2 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 28 Dec 2022 18:37:41 +0300 Subject: [PATCH 06/14] Update kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt Co-authored-by: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com> --- .../jvm/src/scheduling/CoroutineScheduler.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 67abe907a7..e476088fa7 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -264,7 +264,7 @@ internal class CoroutineScheduler( val workers = ResizableAtomicArray(corePoolSize + 1) /** - * Long describing state of workers in this pool. + * The `Long` value describing the state of workers in this pool. * Currently includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits. */ private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT) From 3e59c5767e310c8dbaa43605c5019431a41396ed Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 28 Dec 2022 16:45:05 +0100 Subject: [PATCH 07/14] ~extract method --- .../jvm/src/scheduling/WorkQueue.kt | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index da8a5443b6..3b4ea05075 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -150,18 +150,25 @@ internal class WorkQueue { var end = producerIndex.value while (start != end) { - --end - val index = end and MASK - if (blockingTasksInBuffer.value == 0) break - val value = buffer[index] - if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { - blockingTasksInBuffer.decrementAndGet() - return value + val task = tryExtractBlockingTask(--end) + if (task != null) { + return task } } return null } + private fun tryExtractBlockingTask(index: Int): Task? { + if (blockingTasksInBuffer.value == 0) return null + val arrayIndex = index and MASK + val value = buffer[arrayIndex] + if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) { + blockingTasksInBuffer.decrementAndGet() + return value + } + return null + } + fun offloadAllWorkTo(globalQueue: GlobalQueue) { lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) } while (pollTo(globalQueue)) { From ffd9fd65b9dbb8917ef3190850d97b88c795e037 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 29 Dec 2022 13:18:04 +0100 Subject: [PATCH 08/14] ~ --- .../jvm/src/scheduling/WorkQueue.kt | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index 3b4ea05075..ae5aff4fc3 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -122,16 +122,8 @@ internal class WorkQueue { val end = producerIndex.value while (start != end) { - val index = start and MASK - if (blockingTasksInBuffer.value == 0) break - val value = buffer[index] - if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) { - blockingTasksInBuffer.decrementAndGet() - stolenTaskRef.element = value - return TASK_STOLEN - } else { - ++start - } + stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue + return TASK_STOLEN } return tryStealLastScheduled(stolenTaskRef, blockingOnly = true) } From 200622224ab336641d31ee8af4f0ae759bbd9736 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 3 Jan 2023 12:07:33 +0100 Subject: [PATCH 09/14] ~properly short-circuit 0 blocking tasks --- kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index ae5aff4fc3..0a93265dfe 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -121,7 +121,7 @@ internal class WorkQueue { var start = consumerIndex.value val end = producerIndex.value - while (start != end) { + while (start != end && blockingTasksInBuffer.value > 0) { stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue return TASK_STOLEN } @@ -141,7 +141,7 @@ internal class WorkQueue { val start = consumerIndex.value var end = producerIndex.value - while (start != end) { + while (start != end && blockingTasksInBuffer.value > 0) { val task = tryExtractBlockingTask(--end) if (task != null) { return task @@ -151,7 +151,6 @@ internal class WorkQueue { } private fun tryExtractBlockingTask(index: Int): Task? { - if (blockingTasksInBuffer.value == 0) return null val arrayIndex = index and MASK val value = buffer[arrayIndex] if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) { From 8cee1fdb2d806b43764d0b8aecb53b6429b9428b Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 3 Jan 2023 14:54:24 +0100 Subject: [PATCH 10/14] Preliminary prototype of #3439 --- .../common/src/internal/LimitedDispatcher.kt | 10 +++ kotlinx-coroutines-core/jvm/src/EventLoop.kt | 60 +++++++++++++++ .../jvm/src/scheduling/CoroutineScheduler.kt | 44 +++++++++-- .../jvm/src/scheduling/WorkQueue.kt | 73 ++++++++++++++----- 4 files changed, 162 insertions(+), 25 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 214480ea70..9b7821f2ed 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -13,6 +13,16 @@ import kotlin.jvm.* * The result of .limitedParallelism(x) call, a dispatcher * that wraps the given dispatcher, but limits the parallelism level, while * trying to emulate fairness. + * + * ### Implementation details + * + * By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks + * to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and + * dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt + * in order to avoid starvation of the underlying dispatcher. + * + * Such invariant is crucial in order to be compatible with any underlying dispatcher implementation without + * direct cooperation. */ internal class LimitedDispatcher( private val dispatcher: CoroutineDispatcher, diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 1ee651aa41..e7d1095460 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -4,6 +4,10 @@ package kotlinx.coroutines +import kotlinx.coroutines.Runnable +import kotlinx.coroutines.scheduling.* +import kotlinx.coroutines.scheduling.CoroutineScheduler + internal actual abstract class EventLoopImplPlatform: EventLoop() { protected abstract val thread: Thread @@ -45,6 +49,62 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr */ @InternalCoroutinesApi public fun processNextEventInCurrentThread(): Long = + // This API is used in Ktor for serverless integration where a single thread awaits a blocking call + // (and, to avoid actual blocking, does something via this call), see #850 ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() + +/** + * Retrieves and executes a single task from the current system dispatcher ([Dispatchers.Default] or [Dispatchers.IO]). + * Returns `0` if any task was executed, `>= 0` for number of nanoseconds to wait until invoking this method again + * (implying that there will be a task to steal in N nanoseconds), `-1` if there is no tasks in the corresponding dispatcher at all. + * + * ### Invariants + * + * - When invoked from [Dispatchers.Default] **thread** (even if the actual context is different dispatcher, + * [CoroutineDispatcher.limitedParallelism] or any in-place wrapper), it runs an arbitrary task that ended + * up being scheduled to [Dispatchers.Default] or its counterpart. Tasks scheduled to [Dispatchers.IO] + * **are not** executed[1]. + * - When invoked from [Dispatchers.IO] thread, the same rules apply, but for blocking tasks only. + * + * [1] -- this is purely technical limitation: the scheduler does not have "notify me when CPU token is available" API, + * and we cannot leave this method without leaving thread in its original state. + * + * ### Rationale + * + * This is an internal API that is intended to replace IDEA's core FJP decomposition. + * The following API is provided by IDEA core: + * ``` + * runDecomposedTaskAndJoinIt { // <- non-suspending call + * // spawn as many task as needed + * // these tasks can also invoke 'runDecomposedTaskAndJoinIt' + * } + * ``` + * The key observation here is that 'runDecomposedTaskAndJoinIt' can be invoked from `Dispatchers.Default` itself, + * thus blocking at least one thread. To avoid deadlocks and starvation during large hierarchical decompositions, + * 'runDecomposedTaskAndJoinIt' should not only block, but also **help** executing the task or other tasks + * until an arbitrary condition is satisfied. + * + * See #3439 for additional details. + * + * ### Limitations and caveats + * + * - Executes tasks in-place, thus potentially leaking irrelevant thread-locals from the current thread + * - Is not 100% effective, it waits for [Long] returned nanoseconds even when work arrives immediately after returning + * from this method + * - When there is no more work, it's up to the caller to decide what to do. It's important to account that + * work to current dispatcher may arrive **later** from external sources [1] + * + * [1] -- this is also a technicality that can be solved in kotlinx.coroutines itself, but unfortunately requires + * a tremendous effort. + * + * @throws IllegalStateException if the current thread is not system dispatcher thread + */ +@InternalCoroutinesApi +@DelicateCoroutinesApi +public fun runSingleTaskFromCurrentSystemDispatcher(): Long { + val thread = Thread.currentThread() + if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread") + return thread.runSingleTask() +} diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index e476088fa7..bad4f322a6 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -720,6 +720,28 @@ internal class CoroutineScheduler( tryReleaseCpu(WorkerState.TERMINATED) } + /** + * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details. + * This is a fine-tailored method for a specific use-case not expected to be used widely. + */ + fun runSingleTask(): Long { + val stateSnapshot = state + val isCpuThread = state == WorkerState.CPU_ACQUIRED + val task = if (isCpuThread) { + findCpuTask() + } else { + findBlockingTask() + } + if (task == null) { + if (minDelayUntilStealableTaskNs == 0L) return -1L + return minDelayUntilStealableTaskNs + } + runSafely(task) + if (!isCpuThread) decrementBlockingTasks() + assert { state == stateSnapshot} + return 0L + } + // Counterpart to "tryUnpark" private fun tryPark() { if (!inStack()) { @@ -879,9 +901,19 @@ internal class CoroutineScheduler( * * Check if our queue has one (maybe mixed in with CPU tasks) * * Poll global and try steal */ + return findBlockingTask() + } + + private fun findBlockingTask(): Task? { return localQueue.pollBlocking() ?: globalBlockingQueue.removeFirstOrNull() - ?: trySteal(blockingOnly = true) + ?: trySteal(STEAL_BLOCKING_ONLY) + } + + private fun findCpuTask(): Task? { + return localQueue.pollCpu() + ?: globalBlockingQueue.removeFirstOrNull() + ?: trySteal(STEAL_CPU_ONLY) } private fun findAnyTask(scanLocalQueue: Boolean): Task? { @@ -897,7 +929,7 @@ internal class CoroutineScheduler( } else { pollGlobalQueues()?.let { return it } } - return trySteal(blockingOnly = false) + return trySteal(STEAL_ANY) } private fun pollGlobalQueues(): Task? { @@ -910,7 +942,7 @@ internal class CoroutineScheduler( } } - private fun trySteal(blockingOnly: Boolean): Task? { + private fun trySteal(stealingMode: StealingMode): Task? { val created = createdWorkers // 0 to await an initialization and 1 to avoid excess stealing on single-core machines if (created < 2) { @@ -924,11 +956,7 @@ internal class CoroutineScheduler( if (currentIndex > created) currentIndex = 1 val worker = workers[currentIndex] if (worker !== null && worker !== this) { - val stealResult = if (blockingOnly) { - worker.localQueue.tryStealBlocking(stolenTask) - } else { - worker.localQueue.trySteal(stolenTask) - } + val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask) if (stealResult == TASK_STOLEN) { val result = stolenTask.element stolenTask.element = null diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index 0a93265dfe..d1e7f7a9a5 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -16,6 +16,11 @@ internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default internal const val TASK_STOLEN = -1L internal const val NOTHING_TO_STEAL = -2L +internal typealias StealingMode = Int +internal const val STEAL_ANY: StealingMode = -1 +internal const val STEAL_CPU_ONLY: StealingMode = 0 +internal const val STEAL_BLOCKING_ONLY: StealingMode = 1 + /** * Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity. * At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue) @@ -108,24 +113,34 @@ internal class WorkQueue { * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal. */ - fun trySteal(stolenTaskRef: ObjectRef): Long { - val task = pollBuffer() + // TODO move it to tests where appropriate + fun trySteal(stolenTaskRef: ObjectRef): Long = trySteal(STEAL_ANY, stolenTaskRef) + + fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef): Long { + val task = when (stealingMode) { + STEAL_ANY -> pollBuffer() + else -> stealWithExclusiveMode(stealingMode) + } + if (task != null) { stolenTaskRef.element = task return TASK_STOLEN } - return tryStealLastScheduled(stolenTaskRef, blockingOnly = false) + return tryStealLastScheduled(stealingMode, stolenTaskRef) } - fun tryStealBlocking(stolenTaskRef: ObjectRef): Long { + // Steal only tasks of a particular kind, potentially invoking full queue scan + private fun stealWithExclusiveMode(stealingMode: StealingMode): Task? { var start = consumerIndex.value val end = producerIndex.value - - while (start != end && blockingTasksInBuffer.value > 0) { - stolenTaskRef.element = tryExtractBlockingTask(start++) ?: continue - return TASK_STOLEN + val onlyBlocking = stealingMode == STEAL_BLOCKING_ONLY + // CPU or (BLOCKING & hasBlocking) + val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0 + while (start != end && shouldProceed) { + return tryExtractFromTheMiddle(start++, onlyBlocking) ?: continue } - return tryStealLastScheduled(stolenTaskRef, blockingOnly = true) + + return null } // Polls for blocking task, invoked only by the owner @@ -138,11 +153,28 @@ internal class WorkQueue { } // Failed -> someone else stole it } + return pollWithMode(onlyBlocking = true /* only blocking */) + } + + fun pollCpu(): Task? { + while (true) { // Poll the slot + val lastScheduled = lastScheduledTask.value ?: break + if (lastScheduled.isBlocking) break + if (lastScheduledTask.compareAndSet(lastScheduled, null)) { + return lastScheduled + } // Failed -> someone else stole it + } + + return pollWithMode(onlyBlocking = false /* only cpu */) + } + + private fun pollWithMode(/* Only blocking OR only CPU */onlyBlocking: Boolean): Task? { val start = consumerIndex.value var end = producerIndex.value - - while (start != end && blockingTasksInBuffer.value > 0) { - val task = tryExtractBlockingTask(--end) + // CPU or (BLOCKING & hasBlocking) + val shouldProceed = !onlyBlocking || blockingTasksInBuffer.value > 0 + while (start != end && shouldProceed) { + val task = tryExtractFromTheMiddle(--end, onlyBlocking) if (task != null) { return task } @@ -150,11 +182,12 @@ internal class WorkQueue { return null } - private fun tryExtractBlockingTask(index: Int): Task? { + private fun tryExtractFromTheMiddle(index: Int, onlyBlocking: Boolean): Task? { + if (onlyBlocking && blockingTasksInBuffer.value == 0) return null val arrayIndex = index and MASK val value = buffer[arrayIndex] - if (value != null && value.isBlocking && buffer.compareAndSet(arrayIndex, value, null)) { - blockingTasksInBuffer.decrementAndGet() + if (value != null && value.isBlocking == onlyBlocking && buffer.compareAndSet(arrayIndex, value, null)) { + if (onlyBlocking) blockingTasksInBuffer.decrementAndGet() return value } return null @@ -170,10 +203,16 @@ internal class WorkQueue { /** * Contract on return value is the same as for [trySteal] */ - private fun tryStealLastScheduled(stolenTaskRef: ObjectRef, blockingOnly: Boolean): Long { + private fun tryStealLastScheduled(stealingMode: StealingMode, stolenTaskRef: ObjectRef): Long { while (true) { val lastScheduled = lastScheduledTask.value ?: return NOTHING_TO_STEAL - if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL + if (lastScheduled.isBlocking) { + if (stealingMode == STEAL_CPU_ONLY) { + return NOTHING_TO_STEAL + } + } else if (stealingMode == STEAL_BLOCKING_ONLY) { + return NOTHING_TO_STEAL + } // TODO time wraparound ? val time = schedulerTimeSource.nanoTime() From fc3c97c974c5306d510fa914bcec2636c55b1cd8 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 9 Jan 2023 11:52:05 +0100 Subject: [PATCH 11/14] ~introduce isIoDispatcherThread for additional diagnostics --- .../api/kotlinx-coroutines-core.api | 2 ++ kotlinx-coroutines-core/jvm/src/EventLoop.kt | 16 ++++++++++++++++ .../jvm/src/scheduling/CoroutineScheduler.kt | 2 ++ 3 files changed, 20 insertions(+) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 3a2d08f428..fe7653e9c5 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -309,7 +309,9 @@ public abstract interface class kotlinx/coroutines/DisposableHandle { } public final class kotlinx/coroutines/EventLoopKt { + public static final fun isIoDispatcherThread (Ljava/lang/Thread;)Z public static final fun processNextEventInCurrentThread ()J + public static final fun runSingleTaskFromCurrentSystemDispatcher ()J } public final class kotlinx/coroutines/ExceptionsKt { diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index e7d1095460..be270d8e46 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -108,3 +108,19 @@ public fun runSingleTaskFromCurrentSystemDispatcher(): Long { if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread") return thread.runSingleTask() } + +/** + * Checks whether the given thread belongs to Dispatchers.IO. + * Note that feature "is part of the Dispatchers.IO" is *dynamic*, meaning that the thread + * may change this status when switching between tasks. + * + * This function is inteded to be used on the result of `Thread.currentThread()` for diagnostic + * purposes, and is declared as an extension only to avoid top-level scope pollution. + */ +@InternalCoroutinesApi +@DelicateCoroutinesApi +public fun Thread.isIoDispatcherThread(): Boolean { + if (this !is CoroutineScheduler.Worker) return false + return isIo() +} + diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index bad4f322a6..dc9a7120fd 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -742,6 +742,8 @@ internal class CoroutineScheduler( return 0L } + fun isIo() = state == WorkerState.BLOCKING + // Counterpart to "tryUnpark" private fun tryPark() { if (!inStack()) { From 1c5315802f880c05a96baa00a6204217d502e0da Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 10 Jan 2023 16:14:19 +0100 Subject: [PATCH 12/14] ~ --- gradle.properties | 2 +- gradle/compile-native-multiplatform.gradle | 28 +++---- kotlinx-coroutines-core/jvm/test/TestBase.kt | 2 +- ...CoroutineSchedulerInternalApiStressTest.kt | 80 +++++++++++++++++++ 4 files changed, 96 insertions(+), 16 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt diff --git a/gradle.properties b/gradle.properties index fb2d529443..08246a2b22 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ # # Kotlin -version=1.6.4-SNAPSHOT +version=1.6.4-seb-qww1 group=org.jetbrains.kotlinx kotlin_version=1.7.21 diff --git a/gradle/compile-native-multiplatform.gradle b/gradle/compile-native-multiplatform.gradle index 1aeb8d2c54..aa9f455997 100644 --- a/gradle/compile-native-multiplatform.gradle +++ b/gradle/compile-native-multiplatform.gradle @@ -16,20 +16,20 @@ kotlin { targets { addTarget(presets.linuxX64) - addTarget(presets.iosArm64) - addTarget(presets.iosArm32) - addTarget(presets.iosX64) - addTarget(presets.macosX64) - addTarget(presets.mingwX64) - addTarget(presets.tvosArm64) - addTarget(presets.tvosX64) - addTarget(presets.watchosArm32) - addTarget(presets.watchosArm64) - addTarget(presets.watchosX86) - addTarget(presets.watchosX64) - addTarget(presets.iosSimulatorArm64) - addTarget(presets.watchosSimulatorArm64) - addTarget(presets.tvosSimulatorArm64) +// addTarget(presets.iosArm64) +// addTarget(presets.iosArm32) +// addTarget(presets.iosX64) +// addTarget(presets.macosX64) +// addTarget(presets.mingwX64) +// addTarget(presets.tvosArm64) +// addTarget(presets.tvosX64) +// addTarget(presets.watchosArm32) +// addTarget(presets.watchosArm64) +// addTarget(presets.watchosX86) +// addTarget(presets.watchosX64) +// addTarget(presets.iosSimulatorArm64) +// addTarget(presets.watchosSimulatorArm64) +// addTarget(presets.tvosSimulatorArm64) addTarget(presets.macosArm64) } diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index 6a013fa1da..ab9cc13b44 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -56,7 +56,7 @@ public actual typealias TestResult = Unit */ public actual open class TestBase(private var disableOutCheck: Boolean) { - actual constructor(): this(false) + actual constructor(): this(true) public actual val isBoundByJsTestTimeout = false private var actionIndex = AtomicInteger() diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt new file mode 100644 index 0000000000..5e7ba55b57 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.AVAILABLE_PROCESSORS +import org.junit.Test +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.atomic.AtomicInteger +import kotlin.random.* +import kotlin.random.Random +import kotlin.test.* +import kotlin.time.* + +class CoroutineSchedulerInternalApiStressTest : TestBase() { + + @Test + fun testHelpDefaultIoIsIsolated() { + repeat(10) { + runTest { + val jobToComplete = Job() + val tasksToCompleteJob = AtomicInteger(200) + + val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap()) + val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap()) + + val barrier = CyclicBarrier(AVAILABLE_PROCESSORS) + repeat(AVAILABLE_PROCESSORS - 1) { + // Launch CORES - 1 spawners + launch(Dispatchers.Default) { + barrier.await() + while (!jobToComplete.isCompleted) { + launch { + observedDefaultThreads.add(Thread.currentThread()) + val tasksLeft = tasksToCompleteJob.decrementAndGet() + if (tasksLeft == 0) { + // Verify threads first + try { + assertFalse(observedIoThreads.containsAll(observedDefaultThreads)) + } finally { + jobToComplete.complete() + } + } + } + + // Sometimes launch an IO task + if (Random.nextInt(0..9) == 0) { + launch(Dispatchers.IO) { + observedIoThreads.add(Thread.currentThread()) + assertTrue(Thread.currentThread().isIoDispatcherThread()) + } + } + } + } + } + + withContext(Dispatchers.Default) { + barrier.await() + + while (!jobToComplete.isCompleted) { + val result = runSingleTaskFromCurrentSystemDispatcher() + if (result == 0L) { + continue + } else if (result >= 0L) { + delay(result.toDuration(DurationUnit.NANOSECONDS)) + } else { + delay(10) + } + } + assertTrue(Thread.currentThread() in observedDefaultThreads) + } + coroutineContext.job.children.toList().joinAll() + } + } + } +} From 17056f8af4babce4b5cb774beeb6a1ab32c152a7 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 23 Feb 2023 19:49:15 +0100 Subject: [PATCH 13/14] ~ --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index cccdea51ce..cb1ff7b9e6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ # # Kotlin -version=1.6.4-seb-qww1 +version=1.6.4-SNAPSHOT group=org.jetbrains.kotlinx kotlin_version=1.8.10 From 4a6a0f3fdfcf8b06ee23f6fdd796ef7643f0de8d Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Sat, 25 Feb 2023 17:02:55 +0100 Subject: [PATCH 14/14] ~ --- kotlinx-coroutines-core/jvm/src/EventLoop.kt | 6 +- .../jvm/src/scheduling/CoroutineScheduler.kt | 2 + .../jvm/src/scheduling/WorkQueue.kt | 10 +- ...CoroutineSchedulerInternalApiStressTest.kt | 104 ++++++++++-------- .../jvm/test/scheduling/WorkQueueTest.kt | 2 + 5 files changed, 73 insertions(+), 51 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index be270d8e46..aeb4f58a15 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -103,7 +103,8 @@ internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit */ @InternalCoroutinesApi @DelicateCoroutinesApi -public fun runSingleTaskFromCurrentSystemDispatcher(): Long { +@PublishedApi +internal fun runSingleTaskFromCurrentSystemDispatcher(): Long { val thread = Thread.currentThread() if (thread !is CoroutineScheduler.Worker) throw IllegalStateException("Expected CoroutineScheduler.Worker, but got $thread") return thread.runSingleTask() @@ -119,7 +120,8 @@ public fun runSingleTaskFromCurrentSystemDispatcher(): Long { */ @InternalCoroutinesApi @DelicateCoroutinesApi -public fun Thread.isIoDispatcherThread(): Boolean { +@PublishedApi +internal fun Thread.isIoDispatcherThread(): Boolean { if (this !is CoroutineScheduler.Worker) return false return isIo() } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 3e7e5e30ff..e08f3deedd 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -906,12 +906,14 @@ internal class CoroutineScheduler( return findBlockingTask() } + // NB: ONLY for runSingleTask method private fun findBlockingTask(): Task? { return localQueue.pollBlocking() ?: globalBlockingQueue.removeFirstOrNull() ?: trySteal(STEAL_BLOCKING_ONLY) } + // NB: ONLY for runSingleTask method private fun findCpuTask(): Task? { return localQueue.pollCpu() ?: globalBlockingQueue.removeFirstOrNull() diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index fb77dd7be4..3b1e6b5b0b 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -112,10 +112,12 @@ internal class WorkQueue { * * Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen * or positive value of how many nanoseconds should pass until the head of this queue will be available to steal. + * + * [StealingMode] controls what tasks to steal: + * * [STEAL_ANY] is default mode for scheduler, task from the head (in FIFO order) is stolen + * * [STEAL_BLOCKING_ONLY] is mode for stealing *an arbitrary* blocking task which is used by scheduler when helping in Dispatchers.IO mode + * * [STEAL_CPU_ONLY] is a kludge for `runSingleTaskFromCurrentSystemDispatcher` */ - // TODO move it to tests where appropriate - fun trySteal(stolenTaskRef: ObjectRef): Long = trySteal(STEAL_ANY, stolenTaskRef) - fun trySteal(stealingMode: StealingMode, stolenTaskRef: ObjectRef): Long { val task = when (stealingMode) { STEAL_ANY -> pollBuffer() @@ -168,7 +170,7 @@ internal class WorkQueue { return pollWithMode(onlyBlocking = false /* only cpu */) } - private fun pollWithMode(/* Only blocking OR only CPU */onlyBlocking: Boolean): Task? { + private fun pollWithMode(/* Only blocking OR only CPU */ onlyBlocking: Boolean): Task? { val start = consumerIndex.value var end = producerIndex.value // CPU or (BLOCKING & hasBlocking) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt index 5e7ba55b57..83be993573 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.internal.AVAILABLE_PROCESSORS import org.junit.Test import java.util.* import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch import java.util.concurrent.CyclicBarrier import java.util.concurrent.atomic.AtomicInteger import kotlin.random.* @@ -18,63 +19,76 @@ import kotlin.time.* class CoroutineSchedulerInternalApiStressTest : TestBase() { - @Test - fun testHelpDefaultIoIsIsolated() { - repeat(10) { - runTest { - val jobToComplete = Job() - val tasksToCompleteJob = AtomicInteger(200) + @Test(timeout = 120_000L) + fun testHelpDefaultIoIsIsolated() = repeat(100 * stressTestMultiplierSqrt) { + val ioTaskMarker = ThreadLocal.withInitial { false } + var threadThatShould = Thread.currentThread() + runTest { + val jobToComplete = Job() + val expectedIterations = 100 + val completionLatch = CountDownLatch(1) + val tasksToCompleteJob = AtomicInteger(expectedIterations) + val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap()) + val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap()) - val observedIoThreads = Collections.newSetFromMap(ConcurrentHashMap()) - val observedDefaultThreads = Collections.newSetFromMap(ConcurrentHashMap()) - - val barrier = CyclicBarrier(AVAILABLE_PROCESSORS) - repeat(AVAILABLE_PROCESSORS - 1) { - // Launch CORES - 1 spawners - launch(Dispatchers.Default) { - barrier.await() - while (!jobToComplete.isCompleted) { - launch { - observedDefaultThreads.add(Thread.currentThread()) - val tasksLeft = tasksToCompleteJob.decrementAndGet() - if (tasksLeft == 0) { - // Verify threads first - try { - assertFalse(observedIoThreads.containsAll(observedDefaultThreads)) - } finally { - jobToComplete.complete() - } + val barrier = CyclicBarrier(AVAILABLE_PROCESSORS) + val spawners = ArrayList() + repeat(AVAILABLE_PROCESSORS - 1) { + // Launch CORES - 1 spawners + spawners += launch(Dispatchers.Default) { + barrier.await() + repeat(expectedIterations) { + launch { + val tasksLeft = tasksToCompleteJob.decrementAndGet() + if (tasksLeft < 0) return@launch // Leftovers are being executed all over the place + if (threadThatShould !== Thread.currentThread()) { + val a = 2 + } + observedDefaultThreads.add(Thread.currentThread()) + if (tasksLeft == 0) { + // Verify threads first + try { + assertFalse(observedIoThreads.containsAll(observedDefaultThreads)) + } finally { + jobToComplete.complete() } } + } - // Sometimes launch an IO task - if (Random.nextInt(0..9) == 0) { - launch(Dispatchers.IO) { - observedIoThreads.add(Thread.currentThread()) - assertTrue(Thread.currentThread().isIoDispatcherThread()) - } + // Sometimes launch an IO task to mess with a scheduler + if (Random.nextInt(0..9) == 0) { + launch(Dispatchers.IO) { + ioTaskMarker.set(true) + observedIoThreads.add(Thread.currentThread()) + assertTrue(Thread.currentThread().isIoDispatcherThread()) } } } + completionLatch.await() } + } - withContext(Dispatchers.Default) { - barrier.await() - - while (!jobToComplete.isCompleted) { - val result = runSingleTaskFromCurrentSystemDispatcher() - if (result == 0L) { - continue - } else if (result >= 0L) { - delay(result.toDuration(DurationUnit.NANOSECONDS)) - } else { - delay(10) - } + withContext(Dispatchers.Default) { + threadThatShould = Thread.currentThread() + barrier.await() + var timesHelped = 0 + while (!jobToComplete.isCompleted) { + val result = runSingleTaskFromCurrentSystemDispatcher() + assertFalse(ioTaskMarker.get()) + if (result == 0L) { + ++timesHelped + continue + } else if (result >= 0L) { + Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis()) + } else { + Thread.sleep(10) } - assertTrue(Thread.currentThread() in observedDefaultThreads) } - coroutineContext.job.children.toList().joinAll() + completionLatch.countDown() +// assertEquals(100, timesHelped) +// assertTrue(Thread.currentThread() in observedDefaultThreads, observedDefaultThreads.toString()) } } } } + diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt index 864e65a3a9..f690d3882f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt @@ -107,3 +107,5 @@ internal fun GlobalQueue.drain(): List { } return result } + +internal fun WorkQueue.trySteal(stolenTaskRef: ObjectRef): Long = trySteal(STEAL_ANY, stolenTaskRef)