diff --git a/integration-testing/build.gradle.kts b/integration-testing/build.gradle.kts index dc68f14d36..78a3e32b65 100644 --- a/integration-testing/build.gradle.kts +++ b/integration-testing/build.gradle.kts @@ -134,6 +134,15 @@ sourceSets { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") } } + + create("javaConsumersTest") { + compileClasspath += sourceSets.test.get().runtimeClasspath + runtimeClasspath += sourceSets.test.get().runtimeClasspath + + dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") + } + } } kotlin { @@ -199,6 +208,12 @@ tasks { classpath = sourceSet.runtimeClasspath } + create("javaConsumersTest") { + val sourceSet = sourceSets[name] + testClassesDirs = sourceSet.output.classesDirs + classpath = sourceSet.runtimeClasspath + } + check { dependsOn( "jvmCoreTest", @@ -206,9 +221,10 @@ tasks { "mavenTest", "debugAgentTest", "coreAgentTest", + "javaConsumersTest", ":jpmsTest:check", "smokeTest:build", - "java8Test:check" + "java8Test:check", ) } diff --git a/integration-testing/src/javaConsumersTest/java/RunBlockingJavaTest.java b/integration-testing/src/javaConsumersTest/java/RunBlockingJavaTest.java new file mode 100644 index 0000000000..49294b4d53 --- /dev/null +++ b/integration-testing/src/javaConsumersTest/java/RunBlockingJavaTest.java @@ -0,0 +1,21 @@ +import kotlinx.coroutines.BuildersKt; +import kotlinx.coroutines.Dispatchers; +import org.junit.Test; +import org.junit.Assert; + +public class RunBlockingJavaTest { + Boolean entered = false; + + /** This code will not compile if `runBlocking` doesn't declare `@Throws(InterruptedException::class)` */ + @Test + public void testRunBlocking() { + try { + BuildersKt.runBlocking(Dispatchers.getIO(), (scope, continuation) -> { + entered = true; + return null; + }); + } catch (InterruptedException e) { + } + Assert.assertTrue(entered); + } +} diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 0e35d5fb38..c8f1e550eb 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -27,6 +27,8 @@ public final class kotlinx/coroutines/BuildersKt { public static synthetic fun launch$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job; public static final fun runBlocking (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object; public static synthetic fun runBlocking$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/lang/Object; + public static final fun runBlockingK (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object; + public static synthetic fun runBlockingK$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/lang/Object; public static final fun withContext (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 84291a1b69..3c37159556 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -65,13 +65,6 @@ internal abstract class EventLoop : CoroutineDispatcher() { task.run() return true } - /** - * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context - * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). - * By default, event loop implementation is thread-local and should not processed in the context - * (current thread's event loop should be processed instead). - */ - open fun shouldBeProcessedFromContext(): Boolean = false /** * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] diff --git a/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt b/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt index 0be8a104db..72dd3ef834 100644 --- a/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt @@ -1,9 +1,6 @@ package kotlinx.coroutines.internal -internal expect class ReentrantLock() { - fun tryLock(): Boolean - fun unlock() -} +internal expect class ReentrantLock() internal expect inline fun ReentrantLock.withLock(action: () -> T): T diff --git a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt index 7c0581b9d9..6fd11ab107 100644 --- a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt +++ b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt @@ -1,6 +1,15 @@ +@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND") +@file:JvmMultifileClass +@file:JvmName("BuildersKt") + package kotlinx.coroutines +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract import kotlin.coroutines.* +import kotlin.jvm.JvmMultifileClass +import kotlin.jvm.JvmName /** * Runs a new coroutine and **blocks** the current thread until its completion. @@ -20,5 +29,45 @@ import kotlin.coroutines.* * * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will * block, potentially leading to thread starvation issues. + * + * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations + * in this blocked thread until the completion of this coroutine. + * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. + * + * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of + * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, + * then this invocation uses the outer event loop. + * + * If this blocked thread is interrupted (see `Thread.interrupt`), then the coroutine job is cancelled and + * this `runBlocking` invocation throws `InterruptedException`. + * + * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available + * for a newly created coroutine. + * + * @param context the context of the coroutine. The default value is an event loop on the current thread. + * @param block the coroutine code. */ -public expect fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T +@OptIn(ExperimentalContracts::class) +@JvmName("runBlockingK") +public fun runBlocking( + context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T +): T { + contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } + val contextInterceptor = context[ContinuationInterceptor] + val eventLoop: EventLoop? + val newContext: CoroutineContext + if (contextInterceptor == null) { + // create or use private event loop if no dispatcher is specified + eventLoop = ThreadLocalEventLoop.eventLoop + newContext = GlobalScope.newCoroutineContext(context + eventLoop) + } else { + eventLoop = ThreadLocalEventLoop.currentOrNull() + newContext = GlobalScope.newCoroutineContext(context) + } + return runBlockingImpl(newContext, eventLoop, block) +} + +/** We can't inline it, because an `expect fun` can't have contracts. */ +internal expect fun runBlockingImpl( + newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T +): T diff --git a/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt b/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt index 43f7976ffa..f4512e52ed 100644 --- a/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt @@ -194,4 +194,14 @@ class RunBlockingTest : TestBase() { } } } + + /** Will not compile if [runBlocking] doesn't have the "runs exactly once" contract. */ + @Test + fun testContract() { + val rb: Int + runBlocking { + rb = 42 + } + rb.hashCode() // unused + } } diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index 8f72e28606..91a6972d23 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -1,71 +1,31 @@ @file:JvmMultifileClass @file:JvmName("BuildersKt") -@file:OptIn(ExperimentalContracts::class) -@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND") package kotlinx.coroutines -import java.util.concurrent.locks.* -import kotlin.contracts.* import kotlin.coroutines.* /** - * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion. + * The same as [runBlocking], but for consumption from Java. + * From Kotlin's point of view, this function has the exact same signature as the regular [runBlocking]. + * This is done so that it can not be called from Kotlin, despite the fact that it is public. * - * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in - * `main` functions and in tests. + * We do not expose this [runBlocking] in the documentation, because it is not supposed to be used from Kotlin. * - * Calling [runBlocking] from a suspend function is redundant. - * For example, the following code is incorrect: - * ``` - * suspend fun loadConfiguration() { - * // DO NOT DO THIS: - * val data = runBlocking { // <- redundant and blocks the thread, do not do that - * fetchConfigurationData() // suspending function - * } - * ``` - * - * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will - * block, potentially leading to thread starvation issues. - * - * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations - * in this blocked thread until the completion of this coroutine. - * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. - * - * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of - * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, - * then this invocation uses the outer event loop. - * - * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and - * this `runBlocking` invocation throws [InterruptedException]. - * - * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available - * for a newly created coroutine. - * - * @param context the context of the coroutine. The default value is an event loop on the current thread. - * @param block the coroutine code. + * @suppress */ @Throws(InterruptedException::class) -public actual fun runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { - contract { - callsInPlace(block, InvocationKind.EXACTLY_ONCE) - } - val currentThread = Thread.currentThread() - val contextInterceptor = context[ContinuationInterceptor] - val eventLoop: EventLoop? - val newContext: CoroutineContext - if (contextInterceptor == null) { - // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop - newContext = GlobalScope.newCoroutineContext(context + eventLoop) - } else { - // See if context's interceptor is an event loop that we shall use (to support TestContext) - // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) - eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() - newContext = GlobalScope.newCoroutineContext(context) - } - val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) +@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") +@kotlin.internal.LowPriorityInOverloadResolution +public fun runBlocking( + context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T +): T = runBlocking(context, block) + +@Throws(InterruptedException::class) +internal actual fun runBlockingImpl( + newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T +): T { + val coroutine = BlockingCoroutine(newContext, Thread.currentThread(), eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt index af6e564210..dd51f04c39 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt @@ -15,6 +15,8 @@ Caused by: java.util.concurrent.CancellationException: Channel was cancelled at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt) at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt) at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt) - at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt) - at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source) + at kotlinx.coroutines.BuildersKt__BuildersKt.runBlockingImpl(Builders.kt) + at kotlinx.coroutines.BuildersKt.runBlockingImpl(Unknown Source) + at kotlinx.coroutines.BuildersKt__Builders_concurrentKt.runBlockingK(Builders.concurrent.kt) + at kotlinx.coroutines.BuildersKt.runBlockingK(Unknown Source) at kotlinx.coroutines.testing.TestBase.runTest(TestBase.kt) diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt index dcb908cc3a..37a53fc9c3 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt @@ -8,14 +8,6 @@ import kotlin.test.* import kotlin.time.Duration class RunBlockingJvmTest : TestBase() { - @Test - fun testContract() { - val rb: Int - runBlocking { - rb = 42 - } - rb.hashCode() // unused - } /** Tests that the [runBlocking] coroutine runs to completion even it was interrupted. */ @Test diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt index 4f94f19b53..58c33ada3e 100644 --- a/kotlinx-coroutines-core/native/src/Builders.kt +++ b/kotlinx-coroutines-core/native/src/Builders.kt @@ -1,101 +1,49 @@ -@file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class) -@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND") +@file:OptIn(ObsoleteWorkersApi::class) package kotlinx.coroutines -import kotlinx.cinterop.* -import kotlin.contracts.* import kotlin.coroutines.* import kotlin.native.concurrent.* -/** - * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion. - * - * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in - * `main` functions and in tests. - * - * Calling [runBlocking] from a suspend function is redundant. - * For example, the following code is incorrect: - * ``` - * suspend fun loadConfiguration() { - * // DO NOT DO THIS: - * val data = runBlocking { // <- redundant and blocks the thread, do not do that - * fetchConfigurationData() // suspending function - * } - * ``` - * - * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will - * block, potentially leading to thread starvation issues. - * - * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations - * in this blocked thread until the completion of this coroutine. - * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. - * - * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of - * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, - * then this invocation uses the outer event loop. - * - * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and - * this `runBlocking` invocation throws [InterruptedException]. - * - * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available - * for a newly created coroutine. - * - * @param context the context of the coroutine. The default value is an event loop on the current thread. - * @param block the coroutine code. - */ -public actual fun runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { - contract { - callsInPlace(block, InvocationKind.EXACTLY_ONCE) - } - val contextInterceptor = context[ContinuationInterceptor] - val eventLoop: EventLoop? - val newContext: CoroutineContext - if (contextInterceptor == null) { - // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop - newContext = GlobalScope.newCoroutineContext(context + eventLoop) - } else { - // See if context's interceptor is an event loop that we shall use (to support TestContext) - // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) - eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() - newContext = GlobalScope.newCoroutineContext(context) - } - val coroutine = BlockingCoroutine(newContext, eventLoop) - var completed = false - ThreadLocalKeepAlive.addCheck { !completed } +internal actual fun runBlockingImpl( + newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T +): T { + val coroutine = BlockingCoroutine(newContext, Worker.current, eventLoop) + ThreadLocalKeepAlive.registerUsage() try { coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } finally { - completed = true + ThreadLocalKeepAlive.unregisterUsage() } } @ThreadLocal private object ThreadLocalKeepAlive { - /** If any of these checks passes, this means this [Worker] is still used. */ - private var checks = mutableListOf<() -> Boolean>() + /** If larger than 0, this means this [Worker] is still used. */ + private var usages = 0 /** Whether the worker currently tries to keep itself alive. */ private var keepAliveLoopActive = false - /** Adds another stopgap that must be passed before the [Worker] can be terminated. */ - fun addCheck(terminationForbidden: () -> Boolean) { - checks.add(terminationForbidden) + /** Ensure that the worker is kept alive until the matching [unregisterUsage] is called. */ + fun registerUsage() { + usages++ if (!keepAliveLoopActive) keepAlive() } + /** Undo [registerUsage]. */ + fun unregisterUsage() { + usages-- + } + /** * Send a ping to the worker to prevent it from terminating while this coroutine is running, * ensuring that continuations don't get dropped and forgotten. */ private fun keepAlive() { - // only keep the checks that still forbid the termination - checks = checks.filter { it() }.toMutableList() // if there are no checks left, we no longer keep the worker alive, it can be terminated - keepAliveLoopActive = checks.isNotEmpty() + keepAliveLoopActive = usages > 0 if (keepAliveLoopActive) { Worker.current.executeAfter(afterMicroseconds = 100_000) { keepAlive() @@ -106,9 +54,9 @@ private object ThreadLocalKeepAlive { private class BlockingCoroutine( parentContext: CoroutineContext, + private val joinWorker: Worker, private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true, true) { - private val joinWorker = Worker.current override val isScopedCoroutine: Boolean get() = true