From a73ddcf8e6113836db1a5655fdc0c0856ef9aa1e Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Thu, 11 Apr 2024 17:11:22 +0200 Subject: [PATCH 01/22] IntelliJ patches base --- IntelliJ-patches.md | 2 ++ README.md | 12 +++++++++++- RELEASE.md | 22 ++++++++++++++++++++++ gradle.properties | 2 +- 4 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 IntelliJ-patches.md diff --git a/IntelliJ-patches.md b/IntelliJ-patches.md new file mode 100644 index 0000000000..c8b8750acb --- /dev/null +++ b/IntelliJ-patches.md @@ -0,0 +1,2 @@ +# Included IntelliJ-related patches +- TODO \ No newline at end of file diff --git a/README.md b/README.md index 373912bff8..d4053e45d5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,14 @@ -# kotlinx.coroutines +# kotlinx.coroutines with IntelliJ patches + +This repository is a fork of the original [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) library that includes +several patches (see [IntelliJ-patches.md](IntelliJ-patches.md)). + +**Important:** +- Only JVM build target is fully supported and used + +Release instructions are [here](RELEASE.md). + +--- [![Kotlin Stable](https://kotl.in/badges/stable.svg)](https://kotlinlang.org/docs/components-stability.html) [![JetBrains official project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) diff --git a/RELEASE.md b/RELEASE.md index 4a793bff6d..a821e591e4 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,3 +1,25 @@ +# com.intellij.platform:kotlinx-coroutines-* release + +``` +# update branches +# git remote add upstream https://github.com/Kotlin/kotlinx.coroutines.git +git checkout master +git fetch upstream +git fetch origin + +# prepare new master with patches +git reset --hard upstream/master +git rebase intellij/patch-base +git rebase intellij/whatever-patches-we-have, see IntelliJ-patches.md for the list of branches + +# Remember to change the version in `gradle.properties` to something like `1.8.4-intellij-SNAPSHOT` +# commit version change + +git push origin master --force +``` + +--- + # kotlinx.coroutines release checklist To release a new `` of `kotlinx-coroutines`: diff --git a/gradle.properties b/gradle.properties index 0aa26b50db..a71bf76e63 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ # Kotlin version=1.8.0-SNAPSHOT -group=org.jetbrains.kotlinx +group=com.intellij.platform kotlin_version=1.9.21 # DO NOT rename this property without adapting kotlinx.train build chain: atomicfu_version=0.23.1 From 3dab3259ed256505b4c4c17ff1f7e8013ac16978 Mon Sep 17 00:00:00 2001 From: "Vladislav.Yaroshchuk" Date: Wed, 8 May 2024 12:06:01 +0400 Subject: [PATCH 02/22] IJI-1751 Parametrise Maven publication Space URL (cherry picked from commit e53add81abd25c716bf132efc2e5d77ea2634a2e) --- buildSrc/src/main/kotlin/Publishing.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/kotlin/Publishing.kt b/buildSrc/src/main/kotlin/Publishing.kt index 2e5028a828..e63b48314e 100644 --- a/buildSrc/src/main/kotlin/Publishing.kt +++ b/buildSrc/src/main/kotlin/Publishing.kt @@ -50,9 +50,10 @@ fun MavenPom.configureMavenCentralMetadata(project: Project) { */ private val spacePublicationEnabled = System.getenv("libs.space.pub")?.equals("true") ?: false -fun mavenRepositoryUri(): URI { +fun Project.mavenRepositoryUri(): URI { if (spacePublicationEnabled) { - return URI("https://maven.pkg.jetbrains.space/public/p/kotlinx-coroutines/maven") + val spaceRepoUrl = getSensitiveProperty("libs.space.url") + return URI(spaceRepoUrl ?: "https://maven.pkg.jetbrains.space/public/p/kotlinx-coroutines/maven") } val repositoryId: String? = System.getenv("libs.repository.id") @@ -65,7 +66,7 @@ fun mavenRepositoryUri(): URI { fun configureMavenPublication(rh: RepositoryHandler, project: Project) { rh.maven { - url = mavenRepositoryUri() + url = project.mavenRepositoryUri() credentials { if (spacePublicationEnabled) { // Configure space credentials From b49f2aad0768a6fde104dab0ea3f19ae6412a26e Mon Sep 17 00:00:00 2001 From: "Vladislav.Yaroshchuk" Date: Wed, 8 May 2024 12:30:30 +0400 Subject: [PATCH 03/22] IJI-1751 Allow setting `libs.space.pub` as Gradle property (cherry picked from commit e7bf19be53d95030ff08a3d48226b29e508ebee5) --- buildSrc/src/main/kotlin/Publishing.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/buildSrc/src/main/kotlin/Publishing.kt b/buildSrc/src/main/kotlin/Publishing.kt index e63b48314e..ecc79bd157 100644 --- a/buildSrc/src/main/kotlin/Publishing.kt +++ b/buildSrc/src/main/kotlin/Publishing.kt @@ -48,7 +48,8 @@ fun MavenPom.configureMavenCentralMetadata(project: Project) { * dev build into 'https://maven.pkg.jetbrains.space/public/p/kotlinx-coroutines/maven' Maven repository. * In order to use it, pass the corresponding ENV to the TC 'Deploy' task. */ -private val spacePublicationEnabled = System.getenv("libs.space.pub")?.equals("true") ?: false +private val Project.spacePublicationEnabled: Boolean + get() = getSensitiveProperty("libs.space.pub")?.equals("true") ?: false fun Project.mavenRepositoryUri(): URI { if (spacePublicationEnabled) { @@ -68,7 +69,7 @@ fun configureMavenPublication(rh: RepositoryHandler, project: Project) { rh.maven { url = project.mavenRepositoryUri() credentials { - if (spacePublicationEnabled) { + if (project.spacePublicationEnabled) { // Configure space credentials username = project.getSensitiveProperty("libs.space.user") password = project.getSensitiveProperty("libs.space.password") From efabd2589d02877fcc99494a489f65cccea87899 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Tue, 26 Mar 2024 20:33:46 +0100 Subject: [PATCH 04/22] Switch Worker into a Blocking mode if it tries to run runBlocking with a CPU permit And reacquire CPU permit after runBlocking finishes. This should resolve Dispatchers.Default starvation in cases where runBlocking is used to run suspend functions from non-suspend execution context. #3983 / IJPL-721 --- kotlinx-coroutines-core/jvm/src/Builders.kt | 10 ++++ .../jvm/src/scheduling/CoroutineScheduler.kt | 46 +++++++++++++++++++ .../jvm/src/scheduling/Tasks.kt | 5 ++ .../BlockingCoroutineDispatcherTest.kt | 17 ++++++- 4 files changed, 77 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index d2249bfdd0..7984c58b1a 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines +import kotlinx.coroutines.scheduling.* +import kotlinx.coroutines.scheduling.CoroutineScheduler import java.util.concurrent.locks.* import kotlin.contracts.* import kotlin.coroutines.* @@ -86,6 +88,7 @@ private class BlockingCoroutine( @Suppress("UNCHECKED_CAST") fun joinBlocking(): T { registerTimeLoopThread() + var cpuPermitReleased: Boolean? = null try { eventLoop?.incrementUseCount() try { @@ -95,6 +98,10 @@ private class BlockingCoroutine( val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break + if (parkNanos > 0 && cpuPermitReleased == null) { + val worker = Thread.currentThread() as? CoroutineScheduler.Worker + cpuPermitReleased = worker?.releaseCpu() ?: false + } parkNanos(this, parkNanos) } } finally { // paranoia @@ -102,6 +109,9 @@ private class BlockingCoroutine( } } finally { // paranoia unregisterTimeLoopThread() + if (cpuPermitReleased == true) { + (Thread.currentThread() as CoroutineScheduler.Worker).reacquireCpu() + } } // now return result val state = this.state.unboxState() diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index e9d11d354f..16e8bfc8b8 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -690,6 +690,52 @@ internal class CoroutineScheduler( return hadCpu } + /** only for [runBlocking] */ + fun releaseCpu(): Boolean { + assert { state == WorkerState.CPU_ACQUIRED || state == WorkerState.BLOCKING } + return tryReleaseCpu(WorkerState.BLOCKING).also { released -> + if (released) incrementBlockingTasks() + } + } + + /** only for [runBlocking] */ + fun reacquireCpu() { + assert { state == WorkerState.BLOCKING } + decrementBlockingTasks() + if (tryAcquireCpuPermit()) return + class CpuPermitTransfer { + private val status = atomic(false) + fun check(): Boolean = status.value + fun complete(): Boolean = status.compareAndSet(false, true) + } + val permitTransfer = CpuPermitTransfer() + val blockedWorker = this@Worker + scheduler.dispatch(Runnable { + // this code runs in a different worker thread that holds a CPU token + val cpuHolder = currentThread() as Worker + assert { cpuHolder.state == WorkerState.CPU_ACQUIRED } + if (permitTransfer.complete()) { + cpuHolder.state = WorkerState.BLOCKING + LockSupport.unpark(blockedWorker) + } + }, taskContext = NonBlockingContext) + while (true) { + if (permitTransfer.check()) { + state = WorkerState.CPU_ACQUIRED + break + } + if (tryAcquireCpuPermit()) { + if (!permitTransfer.complete()) { + // race: transfer was completed by another thread + releaseCpuPermit() + } + assert { state == WorkerState.CPU_ACQUIRED } + break + } + LockSupport.parkNanos(RUN_BLOCKING_CPU_REACQUIRE_PARK_NS) + } + } + override fun run() = runWorker() @JvmField diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index eefccd514a..c03ca30d3e 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -19,6 +19,11 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp( "kotlinx.coroutines.scheduler.resolution.ns", 100000L ) +@JvmField +internal val RUN_BLOCKING_CPU_REACQUIRE_PARK_NS = systemProp( + "kotlinx.coroutines.scheduler.runBlocking.cpu.reacquire.ns", 250L * 1000 * 1000 +) + /** * The maximum number of threads allocated for CPU-bound tasks at the default set of dispatchers. * diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index 49510cde06..9bf220c7b3 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines.scheduling -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.rules.* @@ -171,4 +170,20 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { fun testZeroParallelism() { blockingDispatcher(0) } + + @Test + fun testNoCpuStarvationWithDeepRunBlocking() { + val maxDepth = CORES_COUNT * 3 + 3 + fun body(depth: Int) { + if (depth == maxDepth) return + runBlocking(dispatcher) { + launch(dispatcher) { + body(depth + 1) + } + } + } + + body(1) + checkPoolThreadsCreated(maxDepth..maxDepth + 1) + } } From 94571f6b9ad3874d5fc239ecad87efbffd21aaf0 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Wed, 27 Mar 2024 13:03:26 +0100 Subject: [PATCH 05/22] Introduce withUnlimitedIOScheduler utility method to use instead of direct interaction with Worker #3983 / IJPL-721 --- kotlinx-coroutines-core/jvm/src/Builders.kt | 13 +++------- .../jvm/src/scheduling/CoroutineScheduler.kt | 25 ++++++++++++++++--- .../jvm/src/scheduling/Tasks.kt | 4 +-- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index 7984c58b1a..692983b336 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -5,7 +5,6 @@ package kotlinx.coroutines import kotlinx.coroutines.scheduling.* -import kotlinx.coroutines.scheduling.CoroutineScheduler import java.util.concurrent.locks.* import kotlin.contracts.* import kotlin.coroutines.* @@ -88,7 +87,6 @@ private class BlockingCoroutine( @Suppress("UNCHECKED_CAST") fun joinBlocking(): T { registerTimeLoopThread() - var cpuPermitReleased: Boolean? = null try { eventLoop?.incrementUseCount() try { @@ -98,20 +96,17 @@ private class BlockingCoroutine( val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE // note: process next even may loose unpark flag, so check if completed before parking if (isCompleted) break - if (parkNanos > 0 && cpuPermitReleased == null) { - val worker = Thread.currentThread() as? CoroutineScheduler.Worker - cpuPermitReleased = worker?.releaseCpu() ?: false + if (parkNanos > 0) { + withUnlimitedIOScheduler { + parkNanos(this, parkNanos) + } } - parkNanos(this, parkNanos) } } finally { // paranoia eventLoop?.decrementUseCount() } } finally { // paranoia unregisterTimeLoopThread() - if (cpuPermitReleased == true) { - (Thread.currentThread() as CoroutineScheduler.Worker).reacquireCpu() - } } // now return result val state = this.state.unboxState() diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 16e8bfc8b8..e4f9d1566e 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -690,7 +690,7 @@ internal class CoroutineScheduler( return hadCpu } - /** only for [runBlocking] */ + /** only for [withoutCpuPermit] */ fun releaseCpu(): Boolean { assert { state == WorkerState.CPU_ACQUIRED || state == WorkerState.BLOCKING } return tryReleaseCpu(WorkerState.BLOCKING).also { released -> @@ -698,7 +698,7 @@ internal class CoroutineScheduler( } } - /** only for [runBlocking] */ + /** only for [withoutCpuPermit] */ fun reacquireCpu() { assert { state == WorkerState.BLOCKING } decrementBlockingTasks() @@ -732,7 +732,7 @@ internal class CoroutineScheduler( assert { state == WorkerState.CPU_ACQUIRED } break } - LockSupport.parkNanos(RUN_BLOCKING_CPU_REACQUIRE_PARK_NS) + LockSupport.parkNanos(CPU_REACQUIRE_PARK_NS) } } @@ -1085,3 +1085,22 @@ internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Wo @JvmName("mayNotBlock") internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker && thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED + +/** + * Emulates dispatch to [UnlimitedIoScheduler] in a blocking context. + */ +internal fun withUnlimitedIOScheduler(blocking: () -> Unit) { + withoutCpuPermit { + blocking() + } +} + +private fun withoutCpuPermit(body: () -> Unit) { + val worker = Thread.currentThread() as? CoroutineScheduler.Worker ?: return body() + val releasedPermit = worker.releaseCpu() + try { + return body() + } finally { + if (releasedPermit) worker.reacquireCpu() + } +} diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index c03ca30d3e..b681efa98b 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -20,8 +20,8 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp( ) @JvmField -internal val RUN_BLOCKING_CPU_REACQUIRE_PARK_NS = systemProp( - "kotlinx.coroutines.scheduler.runBlocking.cpu.reacquire.ns", 250L * 1000 * 1000 +internal val CPU_REACQUIRE_PARK_NS = systemProp( + "kotlinx.coroutines.scheduler.cpu.reacquire.ns", 250L * 1000 * 1000 ) /** From f54d9d0d78f7e9e8b97dc6b9612f1287f66e45ec Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Wed, 27 Mar 2024 13:04:41 +0100 Subject: [PATCH 06/22] Cleanup: rename test --- .../jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index 9bf220c7b3..6491bd3fcb 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -172,7 +172,7 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { } @Test - fun testNoCpuStarvationWithDeepRunBlocking() { + fun testNoCpuStarvationRunBlockingOnDefaultDispatcherThread() { val maxDepth = CORES_COUNT * 3 + 3 fun body(depth: Int) { if (depth == maxDepth) return From 275a0ad1c3a424d3d6dd4576a923187e64f0ece1 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Wed, 27 Mar 2024 16:34:13 +0100 Subject: [PATCH 07/22] Implement permit release also for LimitedDispatcher PermitTransfer is extracted to be used both in CoroutineScheduler and in LimitedDispatcher. BlockingDispatchAware interface is introduced for LimitedDispatcher.Worker to be accounted by CoroutineScheduler. #3983 / IJPL-721 --- .../src/internal/BlockingDispatchAware.kt | 7 +++ .../common/src/internal/LimitedDispatcher.kt | 26 ++++++++- .../common/src/internal/PermitTransfer.kt | 51 +++++++++++++++++ .../PermitTransfer.jsAndWasmShared.kt | 3 + .../jvm/src/internal/PermitTransfer.jvm.kt | 37 ++++++++++++ .../jvm/src/scheduling/CoroutineScheduler.kt | 56 +++++++++---------- .../jvm/src/scheduling/Tasks.kt | 4 -- .../BlockingCoroutineDispatcherTest.kt | 30 ++++++++++ .../src/internal/PermitTransfer.native.kt | 3 + 9 files changed, 184 insertions(+), 33 deletions(-) create mode 100644 kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt create mode 100644 kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt create mode 100644 kotlinx-coroutines-core/jsAndWasmShared/src/internal/PermitTransfer.jsAndWasmShared.kt create mode 100644 kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt create mode 100644 kotlinx-coroutines-core/native/src/internal/PermitTransfer.native.kt diff --git a/kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt b/kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt new file mode 100644 index 0000000000..930e9a348b --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt @@ -0,0 +1,7 @@ +package kotlinx.coroutines.internal + + +internal interface BlockingDispatchAware { + fun beforeDispatchElsewhere() + fun afterDispatchBack() +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 0c8e0778b1..cc593b4ae4 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -103,12 +103,14 @@ internal class LimitedDispatcher( * actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to * perform any more dispatches. */ - private inner class Worker(private var currentTask: Runnable) : Runnable { + private inner class Worker(private var currentTask: Runnable) : Runnable, BlockingDispatchAware { override fun run() { var fairnessCounter = 0 while (true) { try { currentTask.run() + } catch (e: WorkerPermitTransferCompleted) { + return } catch (e: Throwable) { handleCoroutineException(EmptyCoroutineContext, e) } @@ -122,7 +124,29 @@ internal class LimitedDispatcher( } } } + + override fun beforeDispatchElsewhere() { + // compensate while we are blocked + val newWorker = Worker(Runnable {}) + dispatcher.dispatch(this@LimitedDispatcher, newWorker) + } + + override fun afterDispatchBack() { + if (tryAllocateWorker()) return + val permitTransfer = PermitTransfer() + queue.addLast( + permitTransfer.releaseFun { throw WorkerPermitTransferCompleted } + .let { Runnable { it() } } + ) + permitTransfer.acquire( + tryAllocatePermit = ::tryAllocateWorker, + deallocatePermit = { runningWorkers.decrementAndGet() } + ) + } + } + + private object WorkerPermitTransferCompleted : Throwable() } // Save a few bytecode ops diff --git a/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt b/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt new file mode 100644 index 0000000000..db9b3cda13 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt @@ -0,0 +1,51 @@ +package kotlinx.coroutines.internal + +import kotlinx.atomicfu.* +import kotlin.jvm.* + +@JvmField +internal val PERMIT_ACQUIRE_PARK_NS = systemProp( + "kotlinx.coroutines.permit.acquire.ns", 250L * 1000 * 1000 // 250ms +) + +internal class PermitTransferStatus { + private val status = atomic(false) + fun check(): Boolean = status.value + fun complete(): Boolean = status.compareAndSet(false, true) +} + +internal expect class PermitTransfer constructor() { + /** + * [releasePermit] may throw + */ + fun releaseFun(releasePermit: () -> Unit): () -> Unit + + /** + * [tryAllocatePermit] and [deallocatePermit] must not throw + */ + fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit) +} + +internal class BusyPermitTransfer { + private val status = PermitTransferStatus() + + fun releaseFun(releasePermit: () -> Unit): () -> Unit = { + if (status.complete()) { + releasePermit() + } + } + + fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit) { + while (true) { + if (status.check()) { + return + } + if (tryAllocatePermit()) { + if (!status.complete()) { // race: transfer was completed first by releaseFun + deallocatePermit() + } + return + } + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/PermitTransfer.jsAndWasmShared.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/PermitTransfer.jsAndWasmShared.kt new file mode 100644 index 0000000000..80d18490c3 --- /dev/null +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/PermitTransfer.jsAndWasmShared.kt @@ -0,0 +1,3 @@ +package kotlinx.coroutines.internal + +internal actual typealias PermitTransfer = BusyPermitTransfer // TODO \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt b/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt new file mode 100644 index 0000000000..c520aea25e --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt @@ -0,0 +1,37 @@ +package kotlinx.coroutines.internal + +import java.util.concurrent.locks.* + +internal actual class PermitTransfer { + private val status = PermitTransferStatus() + + public actual fun releaseFun(releasePermit: () -> Unit): () -> Unit { + val blockedThread = Thread.currentThread() + return { + if (status.complete()) { + try { + releasePermit() + } finally { + LockSupport.unpark(blockedThread) + } + } + } + } + + public actual fun acquire(tryAllocatePermit: () -> Boolean, deallocatePermit: () -> Unit) { + while (true) { + if (status.check()) { + return + } + if (tryAllocatePermit()) { + if (!status.complete()) { // race: transfer was completed first by releaseFun + deallocatePermit() + } + return + } + LockSupport.parkNanos( + PERMIT_ACQUIRE_PARK_NS // todo: not sure if it's needed at all, I mean that it is < Long.MAX_VALUE, but at least this way it's safer + ) + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index e4f9d1566e..2fea576629 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -703,37 +703,18 @@ internal class CoroutineScheduler( assert { state == WorkerState.BLOCKING } decrementBlockingTasks() if (tryAcquireCpuPermit()) return - class CpuPermitTransfer { - private val status = atomic(false) - fun check(): Boolean = status.value - fun complete(): Boolean = status.compareAndSet(false, true) - } - val permitTransfer = CpuPermitTransfer() - val blockedWorker = this@Worker - scheduler.dispatch(Runnable { + val permitTransfer = PermitTransfer() + scheduler.dispatch(permitTransfer.releaseFun { // this code runs in a different worker thread that holds a CPU token val cpuHolder = currentThread() as Worker assert { cpuHolder.state == WorkerState.CPU_ACQUIRED } - if (permitTransfer.complete()) { - cpuHolder.state = WorkerState.BLOCKING - LockSupport.unpark(blockedWorker) - } + cpuHolder.state = WorkerState.BLOCKING }, taskContext = NonBlockingContext) - while (true) { - if (permitTransfer.check()) { - state = WorkerState.CPU_ACQUIRED - break - } - if (tryAcquireCpuPermit()) { - if (!permitTransfer.complete()) { - // race: transfer was completed by another thread - releaseCpuPermit() - } - assert { state == WorkerState.CPU_ACQUIRED } - break - } - LockSupport.parkNanos(CPU_REACQUIRE_PARK_NS) - } + permitTransfer.acquire( + tryAllocatePermit = this@CoroutineScheduler::tryAcquireCpuPermit, + deallocatePermit = ::releaseCpuPermit + ) + state = WorkerState.CPU_ACQUIRED } override fun run() = runWorker() @@ -841,14 +822,20 @@ internal class CoroutineScheduler( private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK + private var currentTask: Task? = null + private fun executeTask(task: Task) { val taskMode = task.mode idleReset(taskMode) beforeTask(taskMode) + currentTask = task runSafely(task) + currentTask = null afterTask(taskMode) } + internal fun getCurrentTaskImpl(): TaskImpl? = currentTask as? TaskImpl + private fun beforeTask(taskMode: Int) { if (taskMode == TASK_NON_BLOCKING) return // Always notify about new work when releasing CPU-permit to execute some blocking task @@ -1091,7 +1078,9 @@ internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker & */ internal fun withUnlimitedIOScheduler(blocking: () -> Unit) { withoutCpuPermit { - blocking() + withTaskBlockingDispatch { + blocking() + } } } @@ -1104,3 +1093,14 @@ private fun withoutCpuPermit(body: () -> Unit) { if (releasedPermit) worker.reacquireCpu() } } + +private fun withTaskBlockingDispatch(body: () -> Unit) { + val worker = Thread.currentThread() as? CoroutineScheduler.Worker ?: return body() + val dispatchAware = worker.getCurrentTaskImpl()?.block as? BlockingDispatchAware ?: return body() + dispatchAware.beforeDispatchElsewhere() + try { + return body() + } finally { + dispatchAware.afterDispatchBack() + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index b681efa98b..8cf7578a32 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -19,10 +19,6 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp( "kotlinx.coroutines.scheduler.resolution.ns", 100000L ) -@JvmField -internal val CPU_REACQUIRE_PARK_NS = systemProp( - "kotlinx.coroutines.scheduler.cpu.reacquire.ns", 250L * 1000 * 1000 -) /** * The maximum number of threads allocated for CPU-bound tasks at the default set of dispatchers. diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index 6491bd3fcb..7addca0cc6 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.* import org.junit.* import org.junit.rules.* import java.util.concurrent.* +import java.util.concurrent.atomic.* class BlockingCoroutineDispatcherTest : SchedulerTestBase() { @@ -186,4 +187,33 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { body(1) checkPoolThreadsCreated(maxDepth..maxDepth + 1) } + + @Test + fun testNoStarvationOfLimitedDispatcherWithRunBlocking() { + val taskCount = 5 + val dispatcher = blockingDispatcher(2) + val barrier = CompletableDeferred() + val count = AtomicInteger(0) + fun blockingCode() { + runBlocking { + count.incrementAndGet() + barrier.await() + count.decrementAndGet() + } + } + runBlocking { + repeat(taskCount) { + launch(dispatcher) { + blockingCode() + } + } + while (count.get() != taskCount) { + Thread.sleep(1) + } + barrier.complete(Unit) + while (count.get() != 0) { + Thread.sleep(1) + } + } + } } diff --git a/kotlinx-coroutines-core/native/src/internal/PermitTransfer.native.kt b/kotlinx-coroutines-core/native/src/internal/PermitTransfer.native.kt new file mode 100644 index 0000000000..80d18490c3 --- /dev/null +++ b/kotlinx-coroutines-core/native/src/internal/PermitTransfer.native.kt @@ -0,0 +1,3 @@ +package kotlinx.coroutines.internal + +internal actual typealias PermitTransfer = BusyPermitTransfer // TODO \ No newline at end of file From bdc99301f00b3516d227732d79c89e5bf6703bfd Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Thu, 28 Mar 2024 11:18:11 +0100 Subject: [PATCH 08/22] PermitTransfer: remove PERMIT_ACQUIRE_PARK_NS and make park() indefinite because preceding unpark() guarantees it to be non-blocking --- .../common/src/internal/PermitTransfer.kt | 5 ----- .../jvm/src/internal/PermitTransfer.jvm.kt | 4 +--- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt b/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt index db9b3cda13..a38c48f643 100644 --- a/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt +++ b/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt @@ -3,11 +3,6 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* import kotlin.jvm.* -@JvmField -internal val PERMIT_ACQUIRE_PARK_NS = systemProp( - "kotlinx.coroutines.permit.acquire.ns", 250L * 1000 * 1000 // 250ms -) - internal class PermitTransferStatus { private val status = atomic(false) fun check(): Boolean = status.value diff --git a/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt b/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt index c520aea25e..7a35159dcf 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt @@ -29,9 +29,7 @@ internal actual class PermitTransfer { } return } - LockSupport.parkNanos( - PERMIT_ACQUIRE_PARK_NS // todo: not sure if it's needed at all, I mean that it is < Long.MAX_VALUE, but at least this way it's safer - ) + LockSupport.park() } } } \ No newline at end of file From 44b596377ec67366f309e7b50c48ffa2e9b5a48f Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Thu, 28 Mar 2024 11:29:07 +0100 Subject: [PATCH 09/22] Clean up: CoroutineScheduler.Worker.getCurrentTaskImpl -> getCurrentTask --- kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt | 1 - .../jvm/src/scheduling/CoroutineScheduler.kt | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt b/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt index a38c48f643..0eb86abba4 100644 --- a/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt +++ b/kotlinx-coroutines-core/common/src/internal/PermitTransfer.kt @@ -1,7 +1,6 @@ package kotlinx.coroutines.internal import kotlinx.atomicfu.* -import kotlin.jvm.* internal class PermitTransferStatus { private val status = atomic(false) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 2fea576629..7332b7c58f 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -834,7 +834,7 @@ internal class CoroutineScheduler( afterTask(taskMode) } - internal fun getCurrentTaskImpl(): TaskImpl? = currentTask as? TaskImpl + internal fun getCurrentTask(): Task? = currentTask private fun beforeTask(taskMode: Int) { if (taskMode == TASK_NON_BLOCKING) return @@ -1096,7 +1096,7 @@ private fun withoutCpuPermit(body: () -> Unit) { private fun withTaskBlockingDispatch(body: () -> Unit) { val worker = Thread.currentThread() as? CoroutineScheduler.Worker ?: return body() - val dispatchAware = worker.getCurrentTaskImpl()?.block as? BlockingDispatchAware ?: return body() + val dispatchAware = (worker.getCurrentTask() as? TaskImpl)?.block as? BlockingDispatchAware ?: return body() dispatchAware.beforeDispatchElsewhere() try { return body() From ce9036d6d0f6d6a8791c16eea8e6ed09f07c4e83 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Thu, 28 Mar 2024 11:29:22 +0100 Subject: [PATCH 10/22] Add doc for BlockingDispatchAware --- .../src/internal/BlockingDispatchAware.kt | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt b/kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt index 930e9a348b..84af5dc653 100644 --- a/kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt +++ b/kotlinx-coroutines-core/common/src/internal/BlockingDispatchAware.kt @@ -1,7 +1,22 @@ package kotlinx.coroutines.internal - +/** + * [Runnables][kotlinx.coroutines.Runnable] that are dispatched on [CoroutineDispatcher][kotlinx.coroutines.CoroutineDispatcher] + * may optionally implement this interface to be notified of dispatch emulation in blocking mode. + * + * This may be needed for controlling dispatcher to release/acquire a permit of the worker that currently + * executes the dispatched Runnable. + * @see LimitedDispatcher.Worker + * @see kotlinx.coroutines.scheduling.withUnlimitedIOScheduler + */ internal interface BlockingDispatchAware { + /** + * Must not throw + */ fun beforeDispatchElsewhere() + + /** + * Must not throw + */ fun afterDispatchBack() } \ No newline at end of file From 435663252fcea58cbc81cbdca11ea723f8cd6b43 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Thu, 28 Mar 2024 12:20:11 +0100 Subject: [PATCH 11/22] Add a test for Default dispatcher liveness and fix a related bug in Worker.releaseCpu --- .../jvm/src/scheduling/CoroutineScheduler.kt | 5 ++++- .../BlockingCoroutineDispatcherTest.kt | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 7332b7c58f..6d31452501 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -694,7 +694,10 @@ internal class CoroutineScheduler( fun releaseCpu(): Boolean { assert { state == WorkerState.CPU_ACQUIRED || state == WorkerState.BLOCKING } return tryReleaseCpu(WorkerState.BLOCKING).also { released -> - if (released) incrementBlockingTasks() + if (released) { + val state = incrementBlockingTasks() + signalBlockingWork(state, false) + } } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index 7addca0cc6..cdea267fb9 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -217,3 +217,25 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { } } } + +class BlockingCoroutineDispatcherTestCorePoolSize1 : SchedulerTestBase() { + init { + corePoolSize = 1 + } + + @Test + fun testLivenessOfDefaultDispatcher(): Unit = runBlocking { + val barrier = CyclicBarrier(2) + val barrier2 = CompletableDeferred() + launch(dispatcher) { + barrier.await() + runBlocking { + barrier2.await() + } + } + val task = async(dispatcher) { 42 } + barrier.await() + task.join() + barrier2.complete(Unit) + } +} From bd29ff6a47d557133f6bf24bddb154cbcf417949 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Sun, 31 Mar 2024 14:58:37 +0200 Subject: [PATCH 12/22] Improve Default dispatcher liveness stress test with corePoolSize=1 --- ...ngCoroutineDispatcherLivenessStressTest.kt | 60 +++++++++++++++++++ .../BlockingCoroutineDispatcherTest.kt | 22 ------- 2 files changed, 60 insertions(+), 22 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt index 20748a2d34..4a65c3c425 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt @@ -4,6 +4,10 @@ import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test +import org.junit.runner.* +import org.junit.runners.* +import java.util.LinkedList +import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.test.* @@ -59,3 +63,59 @@ class BlockingCoroutineDispatcherLivenessStressTest : SchedulerTestBase() { assertEquals(2 * iterations, completed.get()) } } + +@RunWith(Parameterized::class) +class BlockingCoroutineDispatcherTestCorePoolSize1(private val yieldMask: Int) : SchedulerTestBase() { + init { + corePoolSize = 1 + } + + companion object { + @JvmStatic + @Parameterized.Parameters + fun data(): Array> { + return Array(10 * stressTestMultiplierSqrt) { arrayOf(it) } + } + } + + @Test + fun testLivenessOfDefaultDispatcher(): Unit = runBlocking { + val oldRunBlockings = LinkedList() + var maxOldRunBlockings = 0 + var busyWaits = 0 + repeat(5000 * stressTestMultiplierSqrt) { + if (it % 1000 == 0) { + System.err.println("======== $it, rb=${oldRunBlockings.size}, max rb=${maxOldRunBlockings}, busy=$busyWaits") + } + val barrier = CyclicBarrier(2) + val barrier2 = CompletableDeferred() + val blocking = launch(dispatcher) { + barrier.await() + runBlocking { + if ((yieldMask and 1) != 0) yield() + barrier2.await() + if ((yieldMask and 2) != 0) yield() + } + } + oldRunBlockings.addLast(blocking) + val task = async(dispatcher) { + if ((yieldMask and 4) != 0) yield() + 42.also { + if ((yieldMask and 8) != 0) yield() + } + } + barrier.await() + task.join() + barrier2.complete(Unit) + + oldRunBlockings.removeIf(Job::isCompleted) + while (oldRunBlockings.size > 5) { + busyWaits++ + oldRunBlockings.removeIf(Job::isCompleted) + } + if (oldRunBlockings.size > maxOldRunBlockings) { + maxOldRunBlockings = oldRunBlockings.size + } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index cdea267fb9..7addca0cc6 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -217,25 +217,3 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { } } } - -class BlockingCoroutineDispatcherTestCorePoolSize1 : SchedulerTestBase() { - init { - corePoolSize = 1 - } - - @Test - fun testLivenessOfDefaultDispatcher(): Unit = runBlocking { - val barrier = CyclicBarrier(2) - val barrier2 = CompletableDeferred() - launch(dispatcher) { - barrier.await() - runBlocking { - barrier2.await() - } - } - val task = async(dispatcher) { 42 } - barrier.await() - task.join() - barrier2.complete(Unit) - } -} From 183d7f859d8f19fa789c11f96118183a958afa55 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Sun, 31 Mar 2024 15:00:05 +0200 Subject: [PATCH 13/22] Worker should give away local tasks upon requested CPU permit release, otherwise they may be missed by other workers --- .../jvm/src/internal/PermitTransfer.jvm.kt | 2 +- .../jvm/src/scheduling/CoroutineScheduler.kt | 46 +++++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt b/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt index 7a35159dcf..769090872f 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/PermitTransfer.jvm.kt @@ -29,7 +29,7 @@ internal actual class PermitTransfer { } return } - LockSupport.park() + LockSupport.park(this) } } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 6d31452501..068fb59e26 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -695,8 +695,8 @@ internal class CoroutineScheduler( assert { state == WorkerState.CPU_ACQUIRED || state == WorkerState.BLOCKING } return tryReleaseCpu(WorkerState.BLOCKING).also { released -> if (released) { - val state = incrementBlockingTasks() - signalBlockingWork(state, false) + incrementBlockingTasks() + signalCpuWork() } } } @@ -704,13 +704,16 @@ internal class CoroutineScheduler( /** only for [withoutCpuPermit] */ fun reacquireCpu() { assert { state == WorkerState.BLOCKING } - decrementBlockingTasks() - if (tryAcquireCpuPermit()) return + if (tryAcquireCpuPermit()) { + decrementBlockingTasks() + return + } val permitTransfer = PermitTransfer() scheduler.dispatch(permitTransfer.releaseFun { // this code runs in a different worker thread that holds a CPU token val cpuHolder = currentThread() as Worker assert { cpuHolder.state == WorkerState.CPU_ACQUIRED } + cpuHolder.giveAwayLocalTasks() // TODO probably we can move CPU tasks straight into acquiring worker's local queue cpuHolder.state = WorkerState.BLOCKING }, taskContext = NonBlockingContext) permitTransfer.acquire( @@ -718,6 +721,18 @@ internal class CoroutineScheduler( deallocatePermit = ::releaseCpuPermit ) state = WorkerState.CPU_ACQUIRED + decrementBlockingTasks() + } + + fun giveAwayLocalTasks() { + stolenTask.element?.let { task -> + addToGlobalQueue(task) + stolenTask.element = null + } + while (true) { + val task = localQueue.poll() ?: return + addToGlobalQueue(task) + } } override fun run() = runWorker() @@ -1080,26 +1095,29 @@ internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker & * Emulates dispatch to [UnlimitedIoScheduler] in a blocking context. */ internal fun withUnlimitedIOScheduler(blocking: () -> Unit) { - withoutCpuPermit { - withTaskBlockingDispatch { - blocking() + val worker = Thread.currentThread() as? CoroutineScheduler.Worker + ?: return blocking() + with(worker) { + giveAwayLocalTasks() + withoutCpuPermit { + withTaskBlockingDispatch { + blocking() + } } } } -private fun withoutCpuPermit(body: () -> Unit) { - val worker = Thread.currentThread() as? CoroutineScheduler.Worker ?: return body() - val releasedPermit = worker.releaseCpu() +private fun CoroutineScheduler.Worker.withoutCpuPermit(body: () -> Unit) { + val releasedPermit = releaseCpu() try { return body() } finally { - if (releasedPermit) worker.reacquireCpu() + if (releasedPermit) reacquireCpu() } } -private fun withTaskBlockingDispatch(body: () -> Unit) { - val worker = Thread.currentThread() as? CoroutineScheduler.Worker ?: return body() - val dispatchAware = (worker.getCurrentTask() as? TaskImpl)?.block as? BlockingDispatchAware ?: return body() +private fun CoroutineScheduler.Worker.withTaskBlockingDispatch(body: () -> Unit) { + val dispatchAware = (getCurrentTask() as? TaskImpl)?.block as? BlockingDispatchAware ?: return body() dispatchAware.beforeDispatchElsewhere() try { return body() From 811cbd12d234ebc6a0d0f457d085ec9fd9095337 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Sun, 31 Mar 2024 15:00:24 +0200 Subject: [PATCH 14/22] Fix a race in CoroutineScheduler tryPark --- .../jvm/src/scheduling/CoroutineScheduler.kt | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 068fb59e26..a7fb60fe80 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -832,7 +832,20 @@ internal class CoroutineScheduler( */ while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups if (isTerminated || state == WorkerState.TERMINATED) break - tryReleaseCpu(WorkerState.PARKING) + val hadCpu = tryReleaseCpu(WorkerState.PARKING) + if (hadCpu && !globalCpuQueue.isEmpty) { + /* + * Prevents the following race: consider corePoolSize = 1 + * - T_CPU holds the only CPU permit, scans the tasks, doesn't find anything, places itself on a stack + * - T_CPU scans again, doesn't find anything again, suspends at tryPark() + * - T_B (or several workers in BLOCKING mode) also put themselves on the stack, on top of the T_CPU + * - T* (not a worker) dispatches CPU tasks, wakes up T_B + * - T_B can't acquire a CPU permit, scans blocking queue, doesn't find anything, parks + * - T_CPU releases the CPU permit, parks + * - there are tasks in the CPU queue, but all workers are parked, so the scheduler won't make progress until there is another dispatch + */ + break + } interrupted() // Cleanup interruptions park() } From 4c440356999b7e6411bff2e3fe1a241060b15d63 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Sun, 31 Mar 2024 15:04:16 +0200 Subject: [PATCH 15/22] clean up --- kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index 8cf7578a32..eefccd514a 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -19,7 +19,6 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp( "kotlinx.coroutines.scheduler.resolution.ns", 100000L ) - /** * The maximum number of threads allocated for CPU-bound tasks at the default set of dispatchers. * From c7a0fc3766435c95e70c06d4e7f186c17b2a11e4 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Tue, 2 Apr 2024 16:29:54 +0200 Subject: [PATCH 16/22] Extract and rename test --- ...ngCoroutineDispatcherLivenessStressTest.kt | 56 ---------------- ...kingDefaultDispatcherLivenessStressTest.kt | 66 +++++++++++++++++++ 2 files changed, 66 insertions(+), 56 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt index 4a65c3c425..5cecf1bee1 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt @@ -63,59 +63,3 @@ class BlockingCoroutineDispatcherLivenessStressTest : SchedulerTestBase() { assertEquals(2 * iterations, completed.get()) } } - -@RunWith(Parameterized::class) -class BlockingCoroutineDispatcherTestCorePoolSize1(private val yieldMask: Int) : SchedulerTestBase() { - init { - corePoolSize = 1 - } - - companion object { - @JvmStatic - @Parameterized.Parameters - fun data(): Array> { - return Array(10 * stressTestMultiplierSqrt) { arrayOf(it) } - } - } - - @Test - fun testLivenessOfDefaultDispatcher(): Unit = runBlocking { - val oldRunBlockings = LinkedList() - var maxOldRunBlockings = 0 - var busyWaits = 0 - repeat(5000 * stressTestMultiplierSqrt) { - if (it % 1000 == 0) { - System.err.println("======== $it, rb=${oldRunBlockings.size}, max rb=${maxOldRunBlockings}, busy=$busyWaits") - } - val barrier = CyclicBarrier(2) - val barrier2 = CompletableDeferred() - val blocking = launch(dispatcher) { - barrier.await() - runBlocking { - if ((yieldMask and 1) != 0) yield() - barrier2.await() - if ((yieldMask and 2) != 0) yield() - } - } - oldRunBlockings.addLast(blocking) - val task = async(dispatcher) { - if ((yieldMask and 4) != 0) yield() - 42.also { - if ((yieldMask and 8) != 0) yield() - } - } - barrier.await() - task.join() - barrier2.complete(Unit) - - oldRunBlockings.removeIf(Job::isCompleted) - while (oldRunBlockings.size > 5) { - busyWaits++ - oldRunBlockings.removeIf(Job::isCompleted) - } - if (oldRunBlockings.size > maxOldRunBlockings) { - maxOldRunBlockings = oldRunBlockings.size - } - } - } -} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt new file mode 100644 index 0000000000..7024f96e80 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt @@ -0,0 +1,66 @@ +package scheduling + +import kotlinx.coroutines.* +import kotlinx.coroutines.scheduling.* +import kotlinx.coroutines.testing.* +import org.junit.* +import org.junit.runner.* +import org.junit.runners.* +import java.util.* +import java.util.concurrent.* + +@RunWith(Parameterized::class) +class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int) : SchedulerTestBase() { + init { + corePoolSize = 1 + } + + companion object { + @JvmStatic + @Parameterized.Parameters + fun data(): Array> { + return Array(32 * stressTestMultiplierSqrt) { arrayOf(it) } + } + } + + @Test + fun testLivenessOfDefaultDispatcher(): Unit = runBlocking { + val oldRunBlockings = LinkedList() + var maxOldRunBlockings = 0 + var busyWaits = 0 + repeat(5000 * stressTestMultiplierSqrt) { + if (it % 1000 == 0) { + System.err.println("======== $it, rb=${oldRunBlockings.size}, max rb=${maxOldRunBlockings}, busy=$busyWaits") + } + val barrier = CyclicBarrier(2) + val barrier2 = CompletableDeferred() + val blocking = launch(dispatcher) { + barrier.await() + runBlocking { + if ((yieldMask and 1) != 0) yield() + barrier2.await() + if ((yieldMask and 2) != 0) yield() + } + } + oldRunBlockings.addLast(blocking) + val task = async(dispatcher) { + if ((yieldMask and 4) != 0) yield() + 42.also { + if ((yieldMask and 8) != 0) yield() + } + } + barrier.await() + task.join() + barrier2.complete(Unit) + + oldRunBlockings.removeIf(Job::isCompleted) + while (oldRunBlockings.size > 5) { + busyWaits++ + oldRunBlockings.removeIf(Job::isCompleted) + } + if (oldRunBlockings.size > maxOldRunBlockings) { + maxOldRunBlockings = oldRunBlockings.size + } + } + } +} \ No newline at end of file From de0646686714e70db9bfdde82a505aaa41f46251 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Tue, 2 Apr 2024 17:22:16 +0200 Subject: [PATCH 17/22] Make LimitedDispatcher's Worker respect BlockingDispatchAware on its inner tasks also fix a probable worker deallocation race issue --- .../common/src/internal/LimitedDispatcher.kt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index cc593b4ae4..36170dafd6 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -110,7 +110,7 @@ internal class LimitedDispatcher( try { currentTask.run() } catch (e: WorkerPermitTransferCompleted) { - return + if (!tryAllocateWorker()) return } catch (e: Throwable) { handleCoroutineException(EmptyCoroutineContext, e) } @@ -126,12 +126,14 @@ internal class LimitedDispatcher( } override fun beforeDispatchElsewhere() { - // compensate while we are blocked + // compensate while we are blocked and consider that we gave away our permit to a new worker val newWorker = Worker(Runnable {}) dispatcher.dispatch(this@LimitedDispatcher, newWorker) + (currentTask as? BlockingDispatchAware)?.beforeDispatchElsewhere() } override fun afterDispatchBack() { + (currentTask as? BlockingDispatchAware)?.afterDispatchBack() if (tryAllocateWorker()) return val permitTransfer = PermitTransfer() queue.addLast( From adbb5b4c9cc15656b72e89318b0e4585e7c8195f Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Tue, 2 Apr 2024 17:54:05 +0200 Subject: [PATCH 18/22] Extract a base runBlocking liveness test --- ...ckingCoroutineSchedulerLivenessTestBase.kt | 49 +++++++++++++++++++ ...kingDefaultDispatcherLivenessStressTest.kt | 46 +---------------- 2 files changed, 51 insertions(+), 44 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessTestBase.kt diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessTestBase.kt new file mode 100644 index 0000000000..e77bc4734b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessTestBase.kt @@ -0,0 +1,49 @@ +package scheduling + +import kotlinx.coroutines.* +import kotlinx.coroutines.scheduling.* +import kotlinx.coroutines.testing.* +import java.util.* +import java.util.concurrent.* + +open class RunBlockingCoroutineSchedulerLivenessTestBase : SchedulerTestBase() { + protected fun testSchedulerLiveness(targetDispatcher: CoroutineDispatcher, yieldMask: Int = 0b1111): Unit = runBlocking { + val oldRunBlockings = LinkedList() + var maxOldRunBlockings = 0 + var busyWaits = 0 + repeat(5000 * stressTestMultiplierSqrt) { + if (it % 1000 == 0) { + System.err.println("======== $it, rb=${oldRunBlockings.size}, max rb=${maxOldRunBlockings}, busy=$busyWaits") + } + val barrier = CyclicBarrier(2) + val barrier2 = CompletableDeferred() + val blocking = launch(targetDispatcher) { + barrier.await() + runBlocking { + if ((yieldMask and 1) != 0) yield() + barrier2.await() + if ((yieldMask and 2) != 0) yield() + } + } + oldRunBlockings.addLast(blocking) + val task = async(targetDispatcher) { + if ((yieldMask and 4) != 0) yield() + 42.also { + if ((yieldMask and 8) != 0) yield() + } + } + barrier.await() + task.join() + barrier2.complete(Unit) + + oldRunBlockings.removeIf(Job::isCompleted) + while (oldRunBlockings.size > 5) { + busyWaits++ + oldRunBlockings.removeIf(Job::isCompleted) + } + if (oldRunBlockings.size > maxOldRunBlockings) { + maxOldRunBlockings = oldRunBlockings.size + } + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt index 7024f96e80..f2ded4993a 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt @@ -1,16 +1,12 @@ package scheduling -import kotlinx.coroutines.* -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.testing.* import org.junit.* import org.junit.runner.* import org.junit.runners.* -import java.util.* -import java.util.concurrent.* @RunWith(Parameterized::class) -class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int) : SchedulerTestBase() { +class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int) : RunBlockingCoroutineSchedulerLivenessTestBase() { init { corePoolSize = 1 } @@ -24,43 +20,5 @@ class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int) } @Test - fun testLivenessOfDefaultDispatcher(): Unit = runBlocking { - val oldRunBlockings = LinkedList() - var maxOldRunBlockings = 0 - var busyWaits = 0 - repeat(5000 * stressTestMultiplierSqrt) { - if (it % 1000 == 0) { - System.err.println("======== $it, rb=${oldRunBlockings.size}, max rb=${maxOldRunBlockings}, busy=$busyWaits") - } - val barrier = CyclicBarrier(2) - val barrier2 = CompletableDeferred() - val blocking = launch(dispatcher) { - barrier.await() - runBlocking { - if ((yieldMask and 1) != 0) yield() - barrier2.await() - if ((yieldMask and 2) != 0) yield() - } - } - oldRunBlockings.addLast(blocking) - val task = async(dispatcher) { - if ((yieldMask and 4) != 0) yield() - 42.also { - if ((yieldMask and 8) != 0) yield() - } - } - barrier.await() - task.join() - barrier2.complete(Unit) - - oldRunBlockings.removeIf(Job::isCompleted) - while (oldRunBlockings.size > 5) { - busyWaits++ - oldRunBlockings.removeIf(Job::isCompleted) - } - if (oldRunBlockings.size > maxOldRunBlockings) { - maxOldRunBlockings = oldRunBlockings.size - } - } - } + fun testLivenessOfDefaultDispatcher(): Unit = testSchedulerLiveness(dispatcher, yieldMask) } \ No newline at end of file From 3561c1364605df54158a23b9f233f9a4cc8e7da6 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Tue, 2 Apr 2024 17:54:21 +0200 Subject: [PATCH 19/22] Add tests for runBlocking with LimitedDispatcher --- ...kingLimitedDispatcherLivenessStressTest.kt | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt new file mode 100644 index 0000000000..e594b5dc50 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt @@ -0,0 +1,33 @@ +package scheduling + +import kotlinx.coroutines.testing.* +import org.junit.* +import org.junit.runner.* +import org.junit.runners.* + +@RunWith(Parameterized::class) +class RunBlockingLimitedDispatcherLivenessStressTest(private val yieldMask: Int) : RunBlockingCoroutineSchedulerLivenessTestBase() { + init { + corePoolSize = 1 + } + + companion object { + @JvmStatic + @Parameterized.Parameters + fun data(): Array> { + return Array(32 * stressTestMultiplierSqrt) { arrayOf(it) } + } + } + + @Test + fun testLivenessOfLimitedDispatcherOnTopOfDefaultDispatcher() = + testSchedulerLiveness(dispatcher.limitedParallelism(1), yieldMask) + + @Test + fun testLivenessOfLimitedDispatcherOnTopOfIoDispatcher() = testSchedulerLiveness( + // Important: inner limitedDispatcher will be on top of this LimitedDispatcher, so there are two Workers from + // two different LimitedDispatchers that must coordinate their permits, not just one. + // In other words, LimitedDispatcher's Worker should also respect BlockingDispatchAware on its inner tasks + blockingDispatcher.value.limitedParallelism(1), yieldMask + ) +} \ No newline at end of file From c702d2a9b572f7732f355585ae5403c0f0589cde Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Tue, 2 Apr 2024 21:06:43 +0200 Subject: [PATCH 20/22] Move tests around also fix wrong package names --- ...ngCoroutineDispatcherLivenessStressTest.kt | 4 -- .../BlockingCoroutineDispatcherTest.kt | 47 +-------------- .../RunBlockingCoroutineDispatcherTest.kt | 59 +++++++++++++++++++ ...ngCoroutineSchedulerLivenessStressTest.kt} | 41 +++++++++++-- ...kingDefaultDispatcherLivenessStressTest.kt | 24 -------- ...kingLimitedDispatcherLivenessStressTest.kt | 33 ----------- 6 files changed, 97 insertions(+), 111 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineDispatcherTest.kt rename kotlinx-coroutines-core/jvm/test/scheduling/{RunBlockingCoroutineSchedulerLivenessTestBase.kt => RunBlockingCoroutineSchedulerLivenessStressTest.kt} (50%) delete mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt delete mode 100644 kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt index 5cecf1bee1..20748a2d34 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherLivenessStressTest.kt @@ -4,10 +4,6 @@ import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test -import org.junit.runner.* -import org.junit.runners.* -import java.util.LinkedList -import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.test.* diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index 7addca0cc6..49510cde06 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -1,10 +1,10 @@ package kotlinx.coroutines.scheduling +import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.rules.* import java.util.concurrent.* -import java.util.concurrent.atomic.* class BlockingCoroutineDispatcherTest : SchedulerTestBase() { @@ -171,49 +171,4 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { fun testZeroParallelism() { blockingDispatcher(0) } - - @Test - fun testNoCpuStarvationRunBlockingOnDefaultDispatcherThread() { - val maxDepth = CORES_COUNT * 3 + 3 - fun body(depth: Int) { - if (depth == maxDepth) return - runBlocking(dispatcher) { - launch(dispatcher) { - body(depth + 1) - } - } - } - - body(1) - checkPoolThreadsCreated(maxDepth..maxDepth + 1) - } - - @Test - fun testNoStarvationOfLimitedDispatcherWithRunBlocking() { - val taskCount = 5 - val dispatcher = blockingDispatcher(2) - val barrier = CompletableDeferred() - val count = AtomicInteger(0) - fun blockingCode() { - runBlocking { - count.incrementAndGet() - barrier.await() - count.decrementAndGet() - } - } - runBlocking { - repeat(taskCount) { - launch(dispatcher) { - blockingCode() - } - } - while (count.get() != taskCount) { - Thread.sleep(1) - } - barrier.complete(Unit) - while (count.get() != 0) { - Thread.sleep(1) - } - } - } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineDispatcherTest.kt new file mode 100644 index 0000000000..e3806dc7d7 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineDispatcherTest.kt @@ -0,0 +1,59 @@ +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import org.junit.* +import java.util.concurrent.atomic.* + +class RunBlockingCoroutineDispatcherTest : SchedulerTestBase() { + @Test + fun testRecursiveRunBlockingCanExceedDefaultDispatcherLimit() { + val maxDepth = CORES_COUNT * 3 + 3 + fun body(depth: Int) { + if (depth == maxDepth) return + runBlocking(Dispatchers.Default) { + launch(Dispatchers.Default) { + body(depth + 1) + } + } + } + + body(1) + checkPoolThreadsCreated(maxDepth..maxDepth + 1) + } + + @Test + fun testNoDefaultDispatcherStarvationWithRunBlocking() = testRunBlockingCanExceedDispatchersLimit(dispatcher, CORE_POOL_SIZE * 3 + 3) + + @Test + fun testNoIoDispatcherStarvationWithRunBlocking() = testRunBlockingCanExceedDispatchersLimit(blockingDispatcher(2), 5) + + private fun testRunBlockingCanExceedDispatchersLimit(targetDispatcher: CoroutineDispatcher, threadsToReach: Int) { + val barrier = CompletableDeferred() + val count = AtomicInteger(0) + fun blockingCode() { + runBlocking { + count.incrementAndGet() + barrier.await() + count.decrementAndGet() + } + } + runBlocking { + repeat(threadsToReach) { + launch(targetDispatcher) { + blockingCode() + } + } + while (count.get() != threadsToReach) { + Thread.sleep(1) + } + async(targetDispatcher) { + yield() + 42 + }.join() + barrier.complete(Unit) + while (count.get() != 0) { + Thread.sleep(1) + } + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessStressTest.kt similarity index 50% rename from kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessTestBase.kt rename to kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessStressTest.kt index e77bc4734b..ba7d55297f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingCoroutineSchedulerLivenessStressTest.kt @@ -1,13 +1,46 @@ -package scheduling +package kotlinx.coroutines.scheduling import kotlinx.coroutines.* -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.testing.* +import org.junit.* +import org.junit.runner.* +import org.junit.runners.* import java.util.* import java.util.concurrent.* -open class RunBlockingCoroutineSchedulerLivenessTestBase : SchedulerTestBase() { - protected fun testSchedulerLiveness(targetDispatcher: CoroutineDispatcher, yieldMask: Int = 0b1111): Unit = runBlocking { +@RunWith(Parameterized::class) +class RunBlockingCoroutineSchedulerLivenessStressTest(private val yieldMask: Int) : SchedulerTestBase() { + init { + corePoolSize = 1 + } + + companion object { + @JvmStatic + @Parameterized.Parameters + fun data(): Array> { + return Array(16 * stressTestMultiplierSqrt) { arrayOf(it) } + } + } + + @Test + fun testLivenessOfDefaultDispatcher(): Unit = testSchedulerLiveness(dispatcher, yieldMask) + + @Test + fun testLivenessOfIoDispatcher(): Unit = testSchedulerLiveness(blockingDispatcher(1), yieldMask) + + @Test + fun testLivenessOfLimitedDispatcherOnTopOfDefaultDispatcher() = + testSchedulerLiveness(dispatcher.limitedParallelism(1), yieldMask) + + @Test + fun testLivenessOfLimitedDispatcherOnTopOfIoDispatcher() = testSchedulerLiveness( + // Important: inner limitedDispatcher will be on top of this LimitedDispatcher, so there are two Workers from + // two different LimitedDispatchers that must coordinate their permits, not just one. + // In other words, LimitedDispatcher's Worker should also respect BlockingDispatchAware on its inner tasks + blockingDispatcher.value.limitedParallelism(1), yieldMask + ) + + private fun testSchedulerLiveness(targetDispatcher: CoroutineDispatcher, yieldMask: Int = 0b1111): Unit = runBlocking { val oldRunBlockings = LinkedList() var maxOldRunBlockings = 0 var busyWaits = 0 diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt deleted file mode 100644 index f2ded4993a..0000000000 --- a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingDefaultDispatcherLivenessStressTest.kt +++ /dev/null @@ -1,24 +0,0 @@ -package scheduling - -import kotlinx.coroutines.testing.* -import org.junit.* -import org.junit.runner.* -import org.junit.runners.* - -@RunWith(Parameterized::class) -class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int) : RunBlockingCoroutineSchedulerLivenessTestBase() { - init { - corePoolSize = 1 - } - - companion object { - @JvmStatic - @Parameterized.Parameters - fun data(): Array> { - return Array(32 * stressTestMultiplierSqrt) { arrayOf(it) } - } - } - - @Test - fun testLivenessOfDefaultDispatcher(): Unit = testSchedulerLiveness(dispatcher, yieldMask) -} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt deleted file mode 100644 index e594b5dc50..0000000000 --- a/kotlinx-coroutines-core/jvm/test/scheduling/RunBlockingLimitedDispatcherLivenessStressTest.kt +++ /dev/null @@ -1,33 +0,0 @@ -package scheduling - -import kotlinx.coroutines.testing.* -import org.junit.* -import org.junit.runner.* -import org.junit.runners.* - -@RunWith(Parameterized::class) -class RunBlockingLimitedDispatcherLivenessStressTest(private val yieldMask: Int) : RunBlockingCoroutineSchedulerLivenessTestBase() { - init { - corePoolSize = 1 - } - - companion object { - @JvmStatic - @Parameterized.Parameters - fun data(): Array> { - return Array(32 * stressTestMultiplierSqrt) { arrayOf(it) } - } - } - - @Test - fun testLivenessOfLimitedDispatcherOnTopOfDefaultDispatcher() = - testSchedulerLiveness(dispatcher.limitedParallelism(1), yieldMask) - - @Test - fun testLivenessOfLimitedDispatcherOnTopOfIoDispatcher() = testSchedulerLiveness( - // Important: inner limitedDispatcher will be on top of this LimitedDispatcher, so there are two Workers from - // two different LimitedDispatchers that must coordinate their permits, not just one. - // In other words, LimitedDispatcher's Worker should also respect BlockingDispatchAware on its inner tasks - blockingDispatcher.value.limitedParallelism(1), yieldMask - ) -} \ No newline at end of file From 85683463c16f8718d843304d817e9427eb45b175 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Fri, 10 May 2024 19:57:58 +0200 Subject: [PATCH 21/22] add signalCpuWork() --- .../jvm/src/scheduling/CoroutineScheduler.kt | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index a7fb60fe80..c9b500b00e 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -713,7 +713,8 @@ internal class CoroutineScheduler( // this code runs in a different worker thread that holds a CPU token val cpuHolder = currentThread() as Worker assert { cpuHolder.state == WorkerState.CPU_ACQUIRED } - cpuHolder.giveAwayLocalTasks() // TODO probably we can move CPU tasks straight into acquiring worker's local queue + val releasedTasks = cpuHolder.giveAwayLocalTasks() + if (releasedTasks) signalCpuWork() cpuHolder.state = WorkerState.BLOCKING }, taskContext = NonBlockingContext) permitTransfer.acquire( @@ -724,14 +725,18 @@ internal class CoroutineScheduler( decrementBlockingTasks() } - fun giveAwayLocalTasks() { + fun giveAwayLocalTasks(): Boolean { + // probably the right way would be to signalCpuWork() on each task, but it should be fine without it + var givenAwayAny: Boolean = false stolenTask.element?.let { task -> addToGlobalQueue(task) stolenTask.element = null + givenAwayAny = true } while (true) { - val task = localQueue.poll() ?: return + val task = localQueue.poll() ?: return givenAwayAny addToGlobalQueue(task) + givenAwayAny = true } } From 0e1a97d5c1a50fea324b6b950af6c8dc0baeac26 Mon Sep 17 00:00:00 2001 From: Vadim Salavatov Date: Fri, 10 May 2024 20:17:43 +0200 Subject: [PATCH 22/22] add description to IntelliJ-patches.md --- IntelliJ-patches.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/IntelliJ-patches.md b/IntelliJ-patches.md index c8b8750acb..91c0aa2f2d 100644 --- a/IntelliJ-patches.md +++ b/IntelliJ-patches.md @@ -1,2 +1,20 @@ # Included IntelliJ-related patches -- TODO \ No newline at end of file + +## `runBlocking` without Dispatcher starvation + +[IJPL-721](https://youtrack.jetbrains.com/issue/IJPL-721), [#3983](https://github.com/Kotlin/kotlinx.coroutines/issues/3983) + +### Description: +`runBlocking` with its default semantics may cause dispatcher starvation if it is called on a worker thread. +For example, if `runBlocking` happens to block all `Dispatchers.Default` workers, it may lead to a deadlock in the application: +there may be tasks in the CPU queue that `runBlocking`s await, but there are none CPU workers available to run them. + +This patch changes the behavior of `runBlocking` so that it always releases associated computation permits before it parks, +and reacquires them after unpark. It works for every `CoroutineDispatcher` that is built using library primitives: +plain `Dispatcher.*` objects or `.limitedParallelism` limited dispatchers that are on top of them. + +This change in behavior comes with a cost. Permit reacquiring mechanism _may_ need an additional thread park/unpark. +Worker threads that release their computational permits always let go of the local task queue, which means less benefit +from locality, higher contention and transactional costs at the very least. + +This patch doesn't change the fact that `runBlocking` should still be used carefully. \ No newline at end of file