diff --git a/docs/topics/coroutine-context-and-dispatchers.md b/docs/topics/coroutine-context-and-dispatchers.md index ca5f54cd98..d57739f081 100644 --- a/docs/topics/coroutine-context-and-dispatchers.md +++ b/docs/topics/coroutine-context-and-dispatchers.md @@ -115,7 +115,7 @@ Produces the output: ```text Unconfined : I'm working in thread main main runBlocking: I'm working in thread main -Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor +Unconfined : After delay in thread DefaultDispatcher-worker-1 @coroutine#2 main runBlocking: After delay in thread main ``` diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index 442543840d..1816727034 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -9,6 +9,7 @@ import org.junit.Test import java.util.concurrent.* import java.util.concurrent.CancellationException import java.util.concurrent.atomic.* +import kotlinx.coroutines.testing.CountDownLatch import kotlin.test.* class ListenableFutureTest : TestBase() { diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index 67d3d16bb1..2afb526932 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -48,6 +48,8 @@ public interface Delay { * Schedules invocation of a specified [block] after a specified delay [timeMillis]. * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation * request if it is not needed anymore. + * + * [block] must execute quickly, be non-blocking, and must not throw any exceptions. */ public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = DefaultDelay.invokeOnTimeout(timeMillis, block, context) @@ -115,7 +117,14 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {} * * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. * - * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. + * By default, on the JVM and Native, a `Dispatchers.IO` thread is used to calculate when the delay has passed, + * whereas on JS, the `Window.setTimeout` function is used, and on Wasm/WASI, `poll_oneoff` with the monotonic clock + * event type is used. + * It is possible for a [CoroutineDispatcher] to override this behavior and provide its own implementation + * of time tracking. + * For example, [Dispatchers.Main] typically uses the main thread's event loop to track time. + * However, the functionality of defining custom time tracking is not exposed to the public API. + * * @param timeMillis time in milliseconds. */ public suspend fun delay(timeMillis: Long) { @@ -141,7 +150,13 @@ public suspend fun delay(timeMillis: Long) { * * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. * - * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. + * By default, on the JVM and Native, a `Dispatchers.IO` thread is used to calculate when the delay has passed, + * whereas on JS, the `Window.setTimeout` function is used, and on Wasm/WASI, `poll_oneoff` with the monotonic clock + * event type is used. + * It is possible for a [CoroutineDispatcher] to override this behavior and provide its own implementation + * of time tracking. + * For example, [Dispatchers.Main] typically uses the main thread's event loop to track time. + * However, the functionality of defining custom time tracking is not exposed to the public API. */ public suspend fun delay(duration: Duration): Unit = delay(duration.toDelayMillis()) diff --git a/kotlinx-coroutines-core/common/src/Dispatchers.common.kt b/kotlinx-coroutines-core/common/src/Dispatchers.common.kt index c499a47f92..ae51283a95 100644 --- a/kotlinx-coroutines-core/common/src/Dispatchers.common.kt +++ b/kotlinx-coroutines-core/common/src/Dispatchers.common.kt @@ -71,3 +71,27 @@ public expect object Dispatchers { */ public val Unconfined: CoroutineDispatcher } + +/** + * If a task can no longer run because its dispatcher is closed, it is rescheduled to another dispatcher. + * + * This is required to avoid a situation where some finalizers do not run: + * ``` + * val dispatcher = newSingleThreadContext("test") + * launch(dispatcher) { + * val resource = Resource() + * try { + * // do something `suspending` with resource + * } finally { + * resource.close() + * } + * } + * dispatcher.close() + * ``` + * + * `close` needs to run somewhere, but it can't run on the closed dispatcher. + * + * On the JVM and Native, we reschedule to the thread pool backing `Dispatchers.IO`, + * because an arbitrary task may well have blocking behavior. + */ +internal expect fun rescheduleTaskFromClosedDispatcher(task: Runnable) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 3c37159556..e17870392d 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -6,6 +6,48 @@ import kotlin.concurrent.Volatile import kotlin.coroutines.* import kotlin.jvm.* +internal interface UnconfinedEventLoop { + /** + * Returns `true` if calling [yield] in a coroutine in this event loop can avoid yielding and continue executing + * due to there being no other tasks in the queue. + * + * This can only be called from the thread that owns this event loop. + */ + val thisLoopsTaskCanAvoidYielding: Boolean + + /** + * Returns `true` if someone (typically a call to [runUnconfinedEventLoop]) is currently processing the tasks, + * so calling [dispatchUnconfined] is guaranteed to be processed eventually. + * + * This can only be called from the thread that owns this event loop. + */ + val isUnconfinedLoopActive: Boolean + + /** + * Executes [initialBlock] and then processes unconfined tasks until there are no more, blocking the current thread. + * + * This can only be called when no other [runUnconfinedEventLoop] is currently active on this event loop. + * + * This can only be called from the thread that owns this event loop. + */ + fun runUnconfinedEventLoop(initialBlock: () -> Unit) + + /** + * Sends the [task] to this event loop for execution. + * + * This method should only be called while [isUnconfinedLoopActive] is `true`. + * Otherwise, the task may be left unprocessed. + * + * This can only be called from the thread that owns this event loop. + */ + fun dispatchUnconfined(task: DispatchedTask<*>) + + /** + * Tries to interpret this event loop for unconfined tasks as a proper event loop and returns it if successful. + */ + fun tryUseAsEventLoop(): EventLoop? +} + /** * Extended by [CoroutineDispatcher] implementations that have event loop inside and can * be asked to process next event from their event queue. @@ -16,7 +58,7 @@ import kotlin.jvm.* * * @suppress **This an internal API and should not be used from general code.** */ -internal abstract class EventLoop : CoroutineDispatcher() { +internal abstract class EventLoop : CoroutineDispatcher(), UnconfinedEventLoop { /** * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. */ @@ -51,8 +93,6 @@ internal abstract class EventLoop : CoroutineDispatcher() { return 0 } - protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty - protected open val nextTime: Long get() { val queue = unconfinedQueue ?: return Long.MAX_VALUE @@ -66,32 +106,38 @@ internal abstract class EventLoop : CoroutineDispatcher() { return true } + /** + * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context + * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). + * By default, event loop implementation is thread-local and should not processed in the context + * (current thread's event loop should be processed instead). + */ + open fun shouldBeProcessedFromContext(): Boolean = false + /** * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] * into the current event loop. */ - fun dispatchUnconfined(task: DispatchedTask<*>) { - val queue = unconfinedQueue ?: - ArrayDeque>().also { unconfinedQueue = it } + override fun dispatchUnconfined(task: DispatchedTask<*>) { + val queue = unconfinedQueue ?: ArrayDeque>().also { unconfinedQueue = it } queue.addLast(task) } val isActive: Boolean get() = useCount > 0 - val isUnconfinedLoopActive: Boolean + override val isUnconfinedLoopActive: Boolean get() = useCount >= delta(unconfined = true) - // May only be used from the event loop's thread - val isUnconfinedQueueEmpty: Boolean - get() = unconfinedQueue?.isEmpty() ?: true + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = unconfinedQueue?.isEmpty() != false private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L fun incrementUseCount(unconfined: Boolean = false) { useCount += delta(unconfined) - if (!unconfined) shared = true + if (!unconfined) shared = true } fun decrementUseCount(unconfined: Boolean = false) { @@ -110,22 +156,37 @@ internal abstract class EventLoop : CoroutineDispatcher() { } open fun shutdown() {} + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + incrementUseCount(unconfined = true) + try { + initialBlock() + while (true) { + // break when all unconfined continuations where executed + if (!processUnconfinedEvent()) break + } + } finally { + decrementUseCount(unconfined = true) + } + } + + override fun tryUseAsEventLoop(): EventLoop? = this } internal object ThreadLocalEventLoop { - private val ref = commonThreadLocal(Symbol("ThreadLocalEventLoop")) + private val ref = commonThreadLocal(Symbol("ThreadLocalEventLoop")) - internal val eventLoop: EventLoop + internal val unconfinedEventLoop: UnconfinedEventLoop get() = ref.get() ?: createEventLoop().also { ref.set(it) } - internal fun currentOrNull(): EventLoop? = + internal fun currentOrNull(): UnconfinedEventLoop? = ref.get() internal fun resetEventLoop() { ref.set(null) } - internal fun setEventLoop(eventLoop: EventLoop) { + internal fun setEventLoop(eventLoop: UnconfinedEventLoop) { ref.set(eventLoop) } } @@ -162,9 +223,6 @@ private typealias Queue = LockFreeTaskQueueCore internal expect abstract class EventLoopImplPlatform() : EventLoop { // Called to unpark this event loop's thread protected fun unpark() - - // Called to reschedule to DefaultExecutor when this event loop is complete - protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) } internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { @@ -179,8 +237,10 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { get() = _isCompleted.value set(value) { _isCompleted.value = value } - override val isEmpty: Boolean get() { - if (!isUnconfinedQueueEmpty) return false + /** + * Checks that at the moment this method is called, there are no tasks in the delayed tasks queue. + */ + protected val delayedQueueIsEmpty: Boolean get() { val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) return false return when (val queue = _queue.value) { @@ -268,7 +328,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { // todo: we should unpark only when this delayed task became first in the queue unpark() } else { - DefaultExecutor.enqueue(task) + rescheduleTaskFromClosedDispatcher(task) } } @@ -379,12 +439,6 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { return delayedTask.scheduleTask(now, delayedQueue, this) } - // It performs "hard" shutdown for test cleanup purposes - protected fun resetAll() { - _queue.value = null - _delayed.value = null - } - // This is a "soft" (normal) shutdown private fun rescheduleAllDelayed() { val now = nanoTime() @@ -401,6 +455,14 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } + // Called to reschedule when this event loop is complete + protected open fun reschedule(now: Long, delayedTask: DelayedTask) { + val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) + DefaultDelay.invokeOnTimeout(delayTimeMillis, Runnable { + rescheduleTaskFromClosedDispatcher(delayedTask) + }, EmptyCoroutineContext) + } + internal abstract class DelayedTask( /** * This field can be only modified in [scheduleTask] before putting this DelayedTask @@ -523,10 +585,6 @@ internal expect fun createEventLoop(): EventLoop internal expect fun nanoTime(): Long -internal expect object DefaultExecutor { - fun enqueue(task: Runnable) -} - /** * Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and * non-Darwin native targets. diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index 65e68ba299..4099dd79bf 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -31,7 +31,7 @@ import kotlin.time.Duration.Companion.milliseconds * [Asynchronous timeout and resources](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources) * section of the coroutines guide for details. * - * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + * For a description of how waiting for a specific duration is implemented, see [delay]. * * @param timeMillis timeout time in milliseconds. */ @@ -63,7 +63,7 @@ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineSco * [Asynchronous timeout and resources](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources) * section of the coroutines guide for details. * - * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + * For a description of how waiting for a specific duration is implemented, see [delay]. */ public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T { contract { diff --git a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt index b9b73603c4..a7e9bb84e2 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt @@ -96,6 +96,8 @@ public fun interface SharingStarted { * * This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis] * are negative. + * + * For a description of how waiting for a specific duration is implemented, see [delay]. */ @Suppress("FunctionName") public fun WhileSubscribed( @@ -129,6 +131,8 @@ public fun interface SharingStarted { * * This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration] * are negative. + * + * For a description of how waiting for a specific duration is implemented, see [delay]. */ @Suppress("FunctionName") public fun SharingStarted.Companion.WhileSubscribed( diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 2a701c0c12..394ff38760 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -56,6 +56,8 @@ fun main() = runBlocking { * * Note that the resulting flow does not emit anything as long as the original flow emits * items faster than every [timeoutMillis] milliseconds. + * + * For a description of how waiting for a specific duration is implemented, see [delay]. */ @FlowPreview public fun Flow.debounce(timeoutMillis: Long): Flow { @@ -104,6 +106,8 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { * Note that the resulting flow does not emit anything as long as the original flow emits * items faster than every [timeoutMillis] milliseconds. * + * For a description of how waiting for a specific duration is implemented, see [delay]. + * * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds. */ @FlowPreview @@ -142,6 +146,8 @@ public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = * * Note that the resulting flow does not emit anything as long as the original flow emits * items faster than every [timeout] milliseconds. + * + * For a description of how waiting for a specific duration is implemented, see [delay]. */ @FlowPreview public fun Flow.debounce(timeout: Duration): Flow = @@ -187,6 +193,8 @@ public fun Flow.debounce(timeout: Duration): Flow = * Note that the resulting flow does not emit anything as long as the original flow emits * items faster than every [timeout] unit. * + * For a description of how waiting for a specific duration is implemented, see [delay]. + * * @param timeout [T] is the emitted value and the return value is timeout in [Duration]. */ @FlowPreview @@ -264,6 +272,8 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl * * * Note that the latest element is not emitted if it does not fit into the sampling window. + * + * For a description of how waiting for a specific duration is implemented, see [delay]. */ @FlowPreview public fun Flow.sample(periodMillis: Long): Flow { @@ -335,6 +345,8 @@ internal fun CoroutineScope.fixedPeriodTicker( * * * Note that the latest element is not emitted if it does not fit into the sampling window. + * + * For a description of how waiting for a specific duration is implemented, see [delay]. */ @FlowPreview public fun Flow.sample(period: Duration): Flow = sample(period.toDelayMillis()) @@ -377,6 +389,8 @@ public fun Flow.sample(period: Duration): Flow = sample(period.toDelay * * Note that delaying on the downstream doesn't trigger the timeout. * + * For a description of how waiting for a specific duration is implemented, see [delay]. + * * @param timeout Timeout duration. If non-positive, the flow is timed out immediately */ @FlowPreview diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 4c8f54e877..258d9923c9 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -290,14 +290,15 @@ internal fun DispatchedContinuation.yieldUndispatched(): Boolean = * [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty). * Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise. */ +@Suppress("NOTHING_TO_INLINE") private inline fun DispatchedContinuation<*>.executeUnconfined( contState: Any?, mode: Int, doYield: Boolean = false, - block: () -> Unit + noinline block: () -> Unit ): Boolean { assert { mode != MODE_UNINITIALIZED } // invalid execution mode - val eventLoop = ThreadLocalEventLoop.eventLoop + val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop // If we are yielding and unconfined queue is empty, we can bail out as part of fast path - if (doYield && eventLoop.isUnconfinedQueueEmpty) return false + if (doYield && eventLoop.thisLoopsTaskCanAvoidYielding) return false return if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow _state = contState diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt index ad5fed1205..a619827a96 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt @@ -165,7 +165,7 @@ internal fun DispatchedTask.resume(delegate: Continuation, undispatche } private fun DispatchedTask<*>.resumeUnconfined() { - val eventLoop = ThreadLocalEventLoop.eventLoop + val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop if (eventLoop.isUnconfinedLoopActive) { // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow eventLoop.dispatchUnconfined(this) @@ -177,25 +177,18 @@ private fun DispatchedTask<*>.resumeUnconfined() { } } -internal inline fun DispatchedTask<*>.runUnconfinedEventLoop( - eventLoop: EventLoop, +internal fun DispatchedTask<*>.runUnconfinedEventLoop( + eventLoop: UnconfinedEventLoop, block: () -> Unit ) { - eventLoop.incrementUseCount(unconfined = true) try { - block() - while (true) { - // break when all unconfined continuations where executed - if (!eventLoop.processUnconfinedEvent()) break - } + eventLoop.runUnconfinedEventLoop(block) } catch (e: Throwable) { /* * This exception doesn't happen normally, only if we have a bug in implementation. * Report it as a fatal exception. */ handleFatalException(e) - } finally { - eventLoop.decrementUseCount(unconfined = true) } } diff --git a/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt b/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt index 449972648d..93d57fd1e9 100644 --- a/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt +++ b/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt @@ -7,6 +7,8 @@ import kotlin.time.* * Clause that selects the given [block] after a specified timeout passes. * If timeout is negative or zero, [block] is selected immediately. * + * For a description of how waiting for a specific duration is implemented, see [delay]. + * * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future. * * @param timeMillis timeout time in milliseconds. @@ -20,6 +22,8 @@ public fun SelectBuilder.onTimeout(timeMillis: Long, block: suspend () -> * Clause that selects the given [block] after the specified [timeout] passes. * If timeout is negative or zero, [block] is selected immediately. * + * For a description of how waiting for a specific duration is implemented, see [delay]. + * * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future. */ @ExperimentalCoroutinesApi diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 771768e008..d89685796a 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -19,7 +19,7 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine */ enclosingScope.launch { while (true) { - val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() + val delayNanos = ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent() ?: error("Event loop is missing, virtual time source works only as part of event loop") if (delayNanos <= 0) continue if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) { diff --git a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt index 6fd11ab107..2a15fd8c48 100644 --- a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt +++ b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt @@ -33,6 +33,8 @@ import kotlin.jvm.JvmName * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations * in this blocked thread until the completion of this coroutine. * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. + * If new tasks are submitted to the dispatcher created by [runBlocking] after this function returns, + * they are resubmitted to [Dispatchers.IO]. * * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, @@ -58,10 +60,10 @@ public fun runBlocking( val newContext: CoroutineContext if (contextInterceptor == null) { // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop + eventLoop = ThreadLocalEventLoop.unconfinedEventLoop.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context + eventLoop) } else { - eventLoop = ThreadLocalEventLoop.currentOrNull() + eventLoop = ThreadLocalEventLoop.currentOrNull()?.useAsEventLoopForRunBlockingOrFail() newContext = GlobalScope.newCoroutineContext(context) } return runBlockingImpl(newContext, eventLoop, block) @@ -71,3 +73,6 @@ public fun runBlocking( internal expect fun runBlockingImpl( newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T ): T + +private fun UnconfinedEventLoop.useAsEventLoopForRunBlockingOrFail(): EventLoop = + tryUseAsEventLoop() ?: throw IllegalStateException("runBlocking can not be run in direct dispatchers") \ No newline at end of file diff --git a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt b/kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt similarity index 56% rename from kotlinx-coroutines-core/concurrent/src/Dispatchers.kt rename to kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt index d18efdc35f..b32264b02f 100644 --- a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/concurrent/src/Dispatchers.concurrent.kt @@ -39,4 +39,26 @@ package kotlinx.coroutines @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public expect val Dispatchers.IO: CoroutineDispatcher - +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) { + /** + * We do not create a separate view of [Dispatchers.IO] for the cleanup needs. + * + * If [Dispatchers.IO] is not flooded with other tasks + the cleanup view does not have more threads than + * [Dispatchers.IO], there is no difference between sending cleanup tasks to [Dispatchers.IO] and creating + * a separate view of [Dispatchers.IO] for cleanup. + * + * If [Dispatchers.IO] is flooded with other tasks, we are at a crossroads: + * - On the one hand, we do not want to create too many threads. + * Some clients are carefully monitoring the number of threads and are relying on it not being larger than + * the system property + the sum of explicit `limitedParallelism` arguments in the system. + * - On the other hand, we don't want to delay productive tasks in favor of cleanup tasks. + * + * The first consideration wins on two accounts: + * - As of writing this, [Dispatchers.IO] has been in use as the cleanup dispatcher for dispatchers obtained + * from JVM executors for years, and this has not caused any issues that we know of. + * - On the other hand, some people likely rely on the number of threads not exceeding the number they control. + * If we were to create a separate view of [Dispatchers.IO] for cleanup, this number would be exceeded, which + * is a regression. + */ + Dispatchers.IO.dispatch(Dispatchers.IO, task) +} diff --git a/kotlinx-coroutines-core/concurrent/test/DefaultDelayTest.kt b/kotlinx-coroutines-core/concurrent/test/DefaultDelayTest.kt new file mode 100644 index 0000000000..3acbb99732 --- /dev/null +++ b/kotlinx-coroutines-core/concurrent/test/DefaultDelayTest.kt @@ -0,0 +1,17 @@ +package kotlinx.coroutines + +import kotlin.test.* +import kotlinx.coroutines.testing.* + +class DefaultDelayTest: TestBase() { + @Test + fun testDelayOnUnconfined() = runTest { + val latch = CountDownLatch(1) + launch(Dispatchers.Unconfined) { + delay(1) + latch.await() + } + delay(10) + latch.countDown() + } +} diff --git a/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt b/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt index f4512e52ed..a0d2a70a65 100644 --- a/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt @@ -100,7 +100,7 @@ class RunBlockingTest : TestBase() { } } expectUnreached() - } catch (e: CancellationException) { + } catch (_: CancellationException) { finish(4) } } @@ -195,6 +195,22 @@ class RunBlockingTest : TestBase() { } } + /** Tests that tasks scheduled on a closed `runBlocking` event loop get processed in an I/O thread. */ + @OptIn(ExperimentalStdlibApi::class) + @Test + fun testLeakedEventLoopGetsProcessedInIO() { + val dispatcher = runBlocking { + coroutineContext[CoroutineDispatcher.Key] + }!! + runBlocking { + GlobalScope.launch(dispatcher) { + assertTrue(runningOnIoThread()) + delay(1.milliseconds) + assertTrue(runningOnIoThread()) + }.join() + } + } + /** Will not compile if [runBlocking] doesn't have the "runs exactly once" contract. */ @Test fun testContract() { @@ -205,3 +221,5 @@ class RunBlockingTest : TestBase() { rb.hashCode() // unused } } + +internal expect fun runningOnIoThread(): Boolean diff --git a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt index 90549eecf4..d62f96557e 100644 --- a/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt @@ -2,24 +2,23 @@ package kotlinx.coroutines import kotlin.coroutines.* -internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoop() +internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoopImpl() internal actual fun nanoTime(): Long = unsupported() -internal class UnconfinedEventLoop : EventLoop() { +private class UnconfinedEventLoopImpl : EventLoop() { override fun dispatch(context: CoroutineContext, block: Runnable): Unit = unsupported() } internal actual abstract class EventLoopImplPlatform : EventLoop() { protected actual fun unpark(): Unit = unsupported() - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask): Unit = unsupported() -} - -internal actual object DefaultExecutor { - public actual fun enqueue(task: Runnable): Unit = unsupported() } private fun unsupported(): Nothing = throw UnsupportedOperationException("runBlocking event loop is not supported") internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() + +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) { + Dispatchers.Default.dispatch(Dispatchers.Default, task) +} diff --git a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt index f497dc803c..619d7d6809 100644 --- a/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/src/AbstractTimeSource.kt @@ -10,8 +10,8 @@ internal abstract class AbstractTimeSource { abstract fun currentTimeMillis(): Long abstract fun nanoTime(): Long abstract fun wrapTask(block: Runnable): Runnable - abstract fun trackTask() - abstract fun unTrackTask() + abstract fun trackTask(obj: Any) + abstract fun unTrackTask(obj: Any) abstract fun registerTimeLoopThread() abstract fun unregisterTimeLoopThread() abstract fun parkNanos(blocker: Any, nanos: Long) // should return immediately when nanos <= 0 @@ -27,43 +27,109 @@ internal inline fun mockTimeSource(source: AbstractTimeSource?) { timeSource = source } +/** + * The current system time in milliseconds. + * + * This is only used for automatically-generated tests in place of [System.currentTimeMillis], + * it is not used in production code. + */ @InlineOnly internal inline fun currentTimeMillis(): Long = timeSource?.currentTimeMillis() ?: System.currentTimeMillis() +/** + * The reading from a high-precision monotonic clock used for measuring time intervals. + * Logically equivalent to [kotlin.time.TimeSource.Monotonic.markNow]. + */ @InlineOnly internal actual inline fun nanoTime(): Long = timeSource?.nanoTime() ?: System.nanoTime() +/** + * Calls [trackTask] and returns a wrapped version of the given [block] that calls [unTrackTask] after it completes. + * Is optimized to just returning [block] if [trackTask] and [unTrackTask] are no-ops. + */ @InlineOnly internal inline fun wrapTask(block: Runnable): Runnable = timeSource?.wrapTask(block) ?: block +/** + * Increments the number of tasks not under our control. + * + * Virtual time source uses this to decide whether to jump to the moment of virtual time when the next sleeping thread + * should wake up. + * If there are some uncontrollable tasks, it will not jump to the moment of the next sleeping thread, + * because the uncontrollable tasks may change the shared state in a way that affects the sleeping thread. + * + * Example: + * + * ``` + * thread { // controlled thread + * while (true) { + * if (sharedState == 42) { + * break + * } + * Thread.sleep(1000) + * } + * } + * + * thread { // uncontrolled thread + * sharedState = 42 + * } + * ``` + * + * If the second thread is not tracked, the first thread effectively enters a spin loop until the second thread + * physically changes the shared state. + * + * Every call to [trackTask] must be accompanied by a call to [unTrackTask] with the same [obj], + * but [unTrackTask] can be called even if the corresponding [trackTask] wasn't called. + */ @InlineOnly -internal inline fun trackTask() { - timeSource?.trackTask() +internal inline fun trackTask(obj: Any) { + timeSource?.trackTask(obj) } +/** + * Marks the task [obj] as complete. If [obj] wasn't tracked, does nothing. See [trackTask] for more details. + */ @InlineOnly -internal inline fun unTrackTask() { - timeSource?.unTrackTask() +internal inline fun unTrackTask(obj: Any) { + timeSource?.unTrackTask(obj) } +/** + * Increases the registered number of nested loops of the form + * `while (nanoTime() < deadline) { parkNanos(deadline - nanoTime()) }` corresponding to the object [key] + * and signals that the current thread is in such a loop. + * + * While at least one such loop is running, other threads are allowed to call [unpark] on the current thread + * and wake it up. Before [registerTimeLoopThread] is called, [unpark] is not guaranteed to have any effect. + */ @InlineOnly internal inline fun registerTimeLoopThread() { timeSource?.registerTimeLoopThread() } +/** + * The opposite of [registerTimeLoopThread]. + */ @InlineOnly internal inline fun unregisterTimeLoopThread() { timeSource?.unregisterTimeLoopThread() } +/** + * Waits for either the specified number of nanoseconds to pass or until [unpark] is called. + */ @InlineOnly internal inline fun parkNanos(blocker: Any, nanos: Long) { timeSource?.parkNanos(blocker, nanos) ?: LockSupport.parkNanos(blocker, nanos) } +/** + * Preliminarily unparks the specified thread that's currently parked in [parkNanos]. + * Can be called even before the thread is parked. + */ @InlineOnly internal inline fun unpark(thread: Thread) { timeSource?.unpark(thread) ?: LockSupport.unpark(thread) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt new file mode 100644 index 0000000000..80d878707b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/DefaultDelay.kt @@ -0,0 +1,174 @@ +@file:JvmName("DefaultExecutorKt") +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import kotlinx.coroutines.internal.* +import kotlinx.coroutines.scheduling.scheduleBackgroundIoTask +import kotlin.coroutines.* +import kotlin.time.Duration + +private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false) + +@PublishedApi +internal actual val DefaultDelay: Delay = initializeDefaultDelay() + +private fun initializeDefaultDelay(): Delay { + // Opt-out flag + if (!defaultMainDelayOptIn) return DefaultDelayImpl + val main = Dispatchers.Main + /* + * When we already are working with UI and Main threads, it makes + * no sense to create a separate thread with timer that cannot be controller + * by the UI runtime. + */ + return if (main.isMissing() || main !is Delay) DefaultDelayImpl else main +} + +/** + * This method can be invoked after all coroutines are completed to wait for the default delay executor to shut down + * in response to the lack of tasks. + * + * This is only useful in tests to ensure that setting a fresh virtual time source will not confuse the default delay + * still running the previous test. + * + * Does nothing if the default delay executor is not in use. + * + * @throws IllegalStateException if the shutdown process notices new tasks entering the system + * @throws IllegalStateException if the shutdown process times out + */ +internal fun ensureDefaultDelayDeinitialized(timeout: Duration) { + (DefaultDelay as? DefaultDelayImpl)?.shutdownForTests(timeout) +} + +private object DefaultDelayImpl : EventLoopImplBase(), Runnable { + const val THREAD_NAME = "kotlinx.coroutines.DefaultDelay" + + init { + incrementUseCount() // this event loop is never completed + } + + private val _thread = atomic(null) + + /** Can only happen when tests close the default executor */ + override fun reschedule(now: Long, delayedTask: DelayedTask) { + throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown") + } + + /** + * All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on + * ``` + * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } + * ``` + * + * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), + * but it's not exposed as public API. + */ + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeMillis, block) + + override fun run() { + val currentThread = Thread.currentThread() + if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread + val oldName = currentThread.name + currentThread.name = THREAD_NAME + try { + ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop) + registerTimeLoopThread() + unTrackTask(this) /** see the comment in [startThreadOrObtainSleepingThread] */ + try { + while (true) { + Thread.interrupted() // just reset interruption flag + val parkNanos = processNextEvent() + if (parkNanos == Long.MAX_VALUE) break // no more events + if (parkNanos > 0) parkNanos(this@DefaultDelayImpl, parkNanos) + } + } finally { + _thread.value = null + unregisterTimeLoopThread() + ThreadLocalEventLoop.resetEventLoop() + // recheck if queues are empty after _thread reference was set to null (!!!) + if (delayedQueueIsEmpty) { + notifyAboutThreadExiting() + } else { + /* recreate the thread, as there is still work to do, + and `unpark` could have awoken the thread we're currently running on */ + startThreadOrObtainSleepingThread() + } + } + } finally { + currentThread.name = oldName + } + } + + override fun startThreadOrObtainSleepingThread(): Thread? { + // Check if the thread is already running + _thread.value?.let { return it } + /* Now we know that at the moment of this call the thread was not initially running. + This means that whatever thread is going to be running by the end of this function, + it's going to notice the tasks it's supposed to run. + We can return `null` unconditionally. */ + /** If this function is called from a thread that's already registered as a time loop thread, + because a time loop thread is not parked right now, the time source will not advance time *currently*, + but it may do that as soon as the thread calling this is parked, which may happen earlier than the default + delay thread has a chance to run. + Because of that, we notify the time source that something is actually happening right now. + This would work automatically if instead of [scheduleBackgroundIoTask] we used [CoroutineDispatcher.dispatch] on + [Dispatchers.IO], but then, none of the delays would be skipped, as the whole time a [DefaultDelay] thread runs + would be considered as a task. + Therefore, we register a task right now and mark it as completed as soon as a [DefaultDelay] time loop gets + registered. */ + trackTask(this) + scheduleBackgroundIoTask(this) + return null + } + + fun shutdownForTests(timeout: Duration) { + if (_thread.value != null) { + val end = System.currentTimeMillis() + timeout.inWholeMilliseconds + while (true) { + synchronized(this) { + unpark(_thread.value ?: return) + val toWait = end - System.currentTimeMillis() + check(toWait > 0) { "Timeout waiting for DefaultExecutor to shutdown" } + (this as Object).wait(toWait) + } + } + } + } + + private fun notifyAboutThreadExiting() { + synchronized(this) { (this as Object).notifyAll() } + } + + override fun toString(): String = "DefaultDelay" +} + +private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop { + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = defaultDelayRunningUnconfinedLoop() + + override val isUnconfinedLoopActive: Boolean get() = false + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + ioView.dispatch(ioView, Runnable { + ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock) + }) + } + + override fun dispatchUnconfined(task: DispatchedTask<*>) = + defaultDelayRunningUnconfinedLoop() + + override fun tryUseAsEventLoop(): EventLoop? = null +} + +private fun defaultDelayRunningUnconfinedLoop(): Nothing { + throw UnsupportedOperationException( + "This method can only be called from the thread where an unconfined event loop is running, " + + "but no tasks can run on this thread." + ) +} + + +/** A view separate from [Dispatchers.IO]. + * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ +private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt deleted file mode 100644 index 3ce7e0d333..0000000000 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ /dev/null @@ -1,194 +0,0 @@ -package kotlinx.coroutines - -import kotlinx.coroutines.internal.* -import java.util.concurrent.* -import kotlin.coroutines.* - -private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false) - -@PublishedApi -internal actual val DefaultDelay: Delay = initializeDefaultDelay() - -private fun initializeDefaultDelay(): Delay { - // Opt-out flag - if (!defaultMainDelayOptIn) return DefaultExecutor - val main = Dispatchers.Main - /* - * When we already are working with UI and Main threads, it makes - * no sense to create a separate thread with timer that cannot be controller - * by the UI runtime. - */ - return if (main.isMissing() || main !is Delay) DefaultExecutor else main -} - -@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") -internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { - const val THREAD_NAME = "kotlinx.coroutines.DefaultExecutor" - - init { - incrementUseCount() // this event loop is never completed - } - - private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds - - private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos( - try { - java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS) - } catch (e: SecurityException) { - DEFAULT_KEEP_ALIVE_MS - }) - - @Suppress("ObjectPropertyName") - @Volatile - private var _thread: Thread? = null - - override val thread: Thread - get() = _thread ?: createThreadSync() - - private const val FRESH = 0 - private const val ACTIVE = 1 - private const val SHUTDOWN_REQ = 2 - private const val SHUTDOWN_ACK = 3 - private const val SHUTDOWN = 4 - - @Volatile - private var debugStatus: Int = FRESH - - private val isShutDown: Boolean get() = debugStatus == SHUTDOWN - - private val isShutdownRequested: Boolean get() { - val debugStatus = debugStatus - return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK - } - - actual override fun enqueue(task: Runnable) { - if (isShutDown) shutdownError() - super.enqueue(task) - } - - override fun reschedule(now: Long, delayedTask: DelayedTask) { - // Reschedule on default executor can only be invoked after Dispatchers.shutdown - shutdownError() - } - - private fun shutdownError() { - throw RejectedExecutionException("DefaultExecutor was shut down. " + - "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " + - "Please refer to Dispatchers.shutdown documentation for more details") - } - - override fun shutdown() { - debugStatus = SHUTDOWN - super.shutdown() - } - - /** - * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on - * ``` - * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } - * ``` - * - * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), - * but it's not exposed as public API. - */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduleInvokeOnTimeout(timeMillis, block) - - override fun run() { - ThreadLocalEventLoop.setEventLoop(this) - registerTimeLoopThread() - try { - var shutdownNanos = Long.MAX_VALUE - if (!notifyStartup()) return - while (true) { - Thread.interrupted() // just reset interruption flag - var parkNanos = processNextEvent() - if (parkNanos == Long.MAX_VALUE) { - // nothing to do, initialize shutdown timeout - val now = nanoTime() - if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS - val tillShutdown = shutdownNanos - now - if (tillShutdown <= 0) return // shut thread down - parkNanos = parkNanos.coerceAtMost(tillShutdown) - } else - shutdownNanos = Long.MAX_VALUE - if (parkNanos > 0) { - // check if shutdown was requested and bail out in this case - if (isShutdownRequested) return - parkNanos(this, parkNanos) - } - } - } finally { - _thread = null // this thread is dead - acknowledgeShutdownIfNeeded() - unregisterTimeLoopThread() - // recheck if queues are empty after _thread reference was set to null (!!!) - if (!isEmpty) thread // recreate thread if it is needed - } - } - - @Synchronized - private fun createThreadSync(): Thread { - return _thread ?: Thread(this, THREAD_NAME).apply { - _thread = this - /* - * `DefaultExecutor` is a global singleton that creates its thread lazily. - * To isolate the classloaders properly, we are inherting the context classloader from - * the singleton itself instead of using parent' thread one - * in order not to accidentally capture temporary application classloader. - */ - contextClassLoader = this@DefaultExecutor.javaClass.classLoader - isDaemon = true - start() - } - } - - // used for tests - @Synchronized - internal fun ensureStarted() { - assert { _thread == null } // ensure we are at a clean state - assert { debugStatus == FRESH || debugStatus == SHUTDOWN_ACK } - debugStatus = FRESH - createThreadSync() // create fresh thread - while (debugStatus == FRESH) (this as Object).wait() - } - - @Synchronized - private fun notifyStartup(): Boolean { - if (isShutdownRequested) return false - debugStatus = ACTIVE - (this as Object).notifyAll() - return true - } - - @Synchronized // used _only_ for tests - fun shutdownForTests(timeout: Long) { - val deadline = System.currentTimeMillis() + timeout - if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ - // loop while there is anything to do immediately or deadline passes - while (debugStatus != SHUTDOWN_ACK && _thread != null) { - _thread?.let { unpark(it) } // wake up thread if present - val remaining = deadline - System.currentTimeMillis() - if (remaining <= 0) break - (this as Object).wait(timeout) - } - // restore fresh status - debugStatus = FRESH - } - - @Synchronized - private fun acknowledgeShutdownIfNeeded() { - if (!isShutdownRequested) return - debugStatus = SHUTDOWN_ACK - resetAll() // clear queues - (this as Object).notifyAll() - } - - // User only for testing and nothing else - internal val isThreadPresent - get() = _thread != null - - override fun toString(): String { - return "DefaultExecutor" - } -} diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index a6acc129cc..04aa1addea 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -19,7 +19,8 @@ public actual object Dispatchers { public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher @JvmStatic - public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined + public actual val Unconfined: CoroutineDispatcher get() = + kotlinx.coroutines.Unconfined /** * The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads. @@ -66,8 +67,8 @@ public actual object Dispatchers { /** * Shuts down built-in dispatchers, such as [Default] and [IO], * stopping all the threads associated with them and making them reject all new tasks. - * Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`) - * and to handle rejected tasks from other dispatchers is also shut down. + * Dispatchers used as fallbacks for time-related operations (`delay`, `withTimeout`) + * and to handle rejected tasks from other dispatchers are also shut down. * * This is a **delicate** API. It is not supposed to be called from a general * application-level code and its invocation is irreversible. @@ -85,7 +86,6 @@ public actual object Dispatchers { */ @DelicateCoroutinesApi public fun shutdown() { - DefaultExecutor.shutdown() // Also shuts down Dispatchers.IO DefaultScheduler.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 15d4ab5c85..9a2125ba0e 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -1,27 +1,25 @@ package kotlinx.coroutines -import kotlinx.coroutines.Runnable -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.scheduling.CoroutineScheduler internal actual abstract class EventLoopImplPlatform: EventLoop() { - - protected abstract val thread: Thread + /** Returns `null` if a thread was created and doesn't need to be awoken. + * Returns a thread to awaken if the thread already existed when this method was called. */ + protected abstract fun startThreadOrObtainSleepingThread(): Thread? protected actual fun unpark() { - val thread = thread // atomic read - if (Thread.currentThread() !== thread) - unpark(thread) + startThreadOrObtainSleepingThread()?.let(::unpark) } - protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - DefaultExecutor.schedule(now, delayedTask) - } } internal class BlockingEventLoop( - override val thread: Thread -) : EventLoopImplBase() + private val thread: Thread +) : EventLoopImplBase() { + override fun startThreadOrObtainSleepingThread(): Thread? = + if (Thread.currentThread() !== thread) thread else null + +} internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread()) @@ -48,7 +46,7 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr public fun processNextEventInCurrentThread(): Long = // This API is used in Ktor for serverless integration where a single thread awaits a blocking call // (and, to avoid actual blocking, does something via this call), see #850 - ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE + ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent() ?: Long.MAX_VALUE internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() @@ -122,4 +120,3 @@ internal fun Thread.isIoDispatcherThread(): Boolean { if (this !is CoroutineScheduler.Worker) return false return isIo() } - diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 9bd48b1537..fd1db40b79 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -130,9 +130,9 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) try { executor.execute(wrapTask(block)) } catch (e: RejectedExecutionException) { - unTrackTask() + unTrackTask(block) cancelJobOnRejection(context, e) - Dispatchers.IO.dispatch(context, block) + rescheduleTaskFromClosedDispatcher(block) } } @@ -147,15 +147,15 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) continuation.invokeOnCancellation(CancelFutureOnCancel(future)) return } - // Otherwise fallback to default executor - DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) + // Otherwise fallback to default delay + DefaultDelay.scheduleResumeAfterDelay(timeMillis, continuation) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) - else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + else -> DefaultDelay.invokeOnTimeout(timeMillis, block, context) } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 3430ebadec..5a4dcf9316 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -391,9 +391,11 @@ internal class CoroutineScheduler( * - Concurrent [close] that effectively shutdowns the worker thread. * Used for [yield]. */ - fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) { - trackTask() // this is needed for virtual time support + fun dispatch( + block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false, track: Boolean = true + ) { val task = createTask(block, taskContext) + if (track) trackTask(task) // this is needed for virtual time support val isBlockingTask = task.isBlocking // Invariant: we increment counter **before** publishing the task // so executing thread can safely decrement the number of blocking tasks @@ -588,7 +590,7 @@ internal class CoroutineScheduler( val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { - unTrackTask() + unTrackTask(task) } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 28d5537108..a510f02f46 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -37,11 +37,11 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { @InternalCoroutinesApi override fun dispatchYield(context: CoroutineContext, block: Runnable) { - DefaultScheduler.dispatchWithContext(block, BlockingContext, true) + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = true, track = true) } override fun dispatch(context: CoroutineContext, block: Runnable) { - DefaultScheduler.dispatchWithContext(block, BlockingContext, false) + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = true) } override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { @@ -58,6 +58,10 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { } } +internal fun scheduleBackgroundIoTask(block: Runnable) { + DefaultScheduler.dispatchWithContext(block, BlockingContext, fair = false, track = false) +} + // Dispatchers.IO internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { @@ -126,8 +130,8 @@ internal open class SchedulerCoroutineDispatcher( coroutineScheduler.dispatch(block, fair = true) } - internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) { - coroutineScheduler.dispatch(block, context, fair) + internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean, track: Boolean) { + coroutineScheduler.dispatch(block, context, fair = fair, track = track) } override fun close() { diff --git a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt index 58b8024547..773b518b8b 100644 --- a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt @@ -8,7 +8,7 @@ class DefaultExecutorStressTest : TestBase() { @Test fun testDelay() = runTest { val iterations = 100_000 * stressTestMultiplier - withContext(DefaultExecutor) { + withContext(DefaultDelay as CoroutineDispatcher) { expect(1) var expected = 1 repeat(iterations) { @@ -33,30 +33,4 @@ class DefaultExecutorStressTest : TestBase() { } finish(2 + iterations * 4) } - - @Test - fun testWorkerShutdown() = withVirtualTimeSource { - val iterations = 1_000 * stressTestMultiplier - // wait for the worker to shut down - suspend fun awaitWorkerShutdown() { - val executorTimeoutMs = 1000L - delay(executorTimeoutMs) - while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop - assertFalse(DefaultExecutor.isThreadPresent) // just to make sure - } - runTest { - awaitWorkerShutdown() // so that the worker shuts down after the initial launch - repeat (iterations) { - val job = launch(Dispatchers.Unconfined) { - // this line runs in the main thread - delay(1) - // this line runs in the DefaultExecutor worker - } - delay(100) // yield the execution, allow the worker to spawn - assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned - job.join() - awaitWorkerShutdown() - } - } - } } diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index 32573ca1f6..06560391c1 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -29,12 +29,12 @@ class DispatchersToStringTest { ) } // Not overridden at all, limited parallelism returns `this` - assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString()) assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString()) assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString()) - assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + assertEquals("DefaultDelay", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited") assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString()) @@ -53,4 +53,4 @@ class DispatchersToStringTest { assertEquals("Named", named.toString()) } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt index 551d1977c0..c31f6e67bc 100644 --- a/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt @@ -49,25 +49,6 @@ class EventLoopsTest : TestBase() { finish(5) } - @Test - fun testEventLoopInDefaultExecutor() = runTest { - expect(1) - withContext(Dispatchers.Unconfined) { - delay(1) - assertTrue(Thread.currentThread().name.startsWith(DefaultExecutor.THREAD_NAME)) - expect(2) - // now runBlocking inside default executor thread --> should use outer event loop - DefaultExecutor.enqueue(Runnable { - expect(4) // will execute when runBlocking runs loop - }) - expect(3) - runBlocking { - expect(5) - } - } - finish(6) - } - /** * Simple test for [processNextEventInCurrentThread] API use-case. */ @@ -159,4 +140,4 @@ class EventLoopsTest : TestBase() { waitingThread.value = null } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt index 3c60407bb9..19484bdc7b 100644 --- a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt @@ -2,9 +2,10 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import org.junit.Test -import java.util.concurrent.* import kotlin.coroutines.* import kotlin.test.* +import java.util.concurrent.Executors +import java.util.concurrent.RejectedExecutionException class ExecutorsTest : TestBase() { private fun checkThreadName(prefix: String) { @@ -45,7 +46,7 @@ class ExecutorsTest : TestBase() { @Test fun testConvertedDispatcherToExecutor() { - val executor: ExecutorService = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") } + val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") } val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher() assertSame(executor, dispatcher.asExecutor()) executor.shutdown() diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt index 144e4e9dc4..5b1fdf24cb 100644 --- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt @@ -5,9 +5,10 @@ import org.junit.* import org.junit.Test import org.junit.runner.* import org.junit.runners.* -import java.util.concurrent.* +import java.util.concurrent.Executors import kotlin.coroutines.* import kotlin.test.* +import kotlin.time.Duration.Companion.seconds @RunWith(Parameterized::class) class FailingCoroutinesMachineryTest( @@ -139,7 +140,7 @@ class FailingCoroutinesMachineryTest( } private fun checkException() { - latch.await(2, TimeUnit.SECONDS) + latch.await(2.seconds) val e = caught assertNotNull(e) // First condition -- failure in context element diff --git a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt index 16fc64e83e..4d80fd28b1 100644 --- a/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JobChildStressTest.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.testing.* import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.test.* +import kotlinx.coroutines.testing.CountDownLatch /** * Testing the procedure of attaching a child to the parent job. diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt index ef0d146d7a..4134f21d0d 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationInvariantStressTest.kt @@ -2,7 +2,6 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import org.junit.Test -import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.* diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt index 37a53fc9c3..bbb7062566 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt @@ -182,3 +182,5 @@ class RunBlockingJvmTest : TestBase() { return result.get().getOrThrow() } } + +internal actual fun runningOnIoThread(): Boolean = Thread.currentThread().isIoDispatcherThread() diff --git a/kotlinx-coroutines-core/jvm/test/ThreadLocalStressTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadLocalStressTest.kt index 63ed3f2300..115ad0ce1a 100644 --- a/kotlinx-coroutines-core/jvm/test/ThreadLocalStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ThreadLocalStressTest.kt @@ -2,7 +2,6 @@ package kotlinx.coroutines import kotlinx.coroutines.testing.* import kotlinx.coroutines.sync.* -import java.util.concurrent.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.test.* @@ -139,7 +138,7 @@ class ThreadLocalStressTest : TestBase() { cancel() semaphore.acquire() } - } catch (e: CancellationException) { + } catch (_: CancellationException) { // Ignore cancellation } } @@ -154,7 +153,7 @@ class ThreadLocalStressTest : TestBase() { cancel() semaphore.acquire() } - } catch (e: CancellationException) { + } catch (_: CancellationException) { // Ignore cancellation } } diff --git a/kotlinx-coroutines-core/jvm/test/UnconfinedConcurrentStressTest.kt b/kotlinx-coroutines-core/jvm/test/UnconfinedConcurrentStressTest.kt index 61aa7daacc..2987b704ef 100644 --- a/kotlinx-coroutines-core/jvm/test/UnconfinedConcurrentStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/UnconfinedConcurrentStressTest.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.testing.* import org.junit.* import org.junit.Test import java.util.concurrent.* +import kotlinx.coroutines.testing.CountDownLatch import kotlin.test.* class UnconfinedConcurrentStressTest : TestBase() { diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index 8a461087c3..1c139841a9 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -3,18 +3,18 @@ package kotlinx.coroutines import java.io.* import java.util.concurrent.* import java.util.concurrent.locks.* +import kotlin.time.Duration.Companion.seconds -private const val SHUTDOWN_TIMEOUT = 1000L +private val SHUTDOWN_TIMEOUT = 1.seconds internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) { - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) + ensureDefaultDelayDeinitialized(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) val testTimeSource = VirtualTimeSource(log) mockTimeSource(testTimeSource) - DefaultExecutor.ensureStarted() // should start with new time source try { block() } finally { - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) + ensureDefaultDelayDeinitialized(SHUTDOWN_TIMEOUT) testTimeSource.shutdown() mockTimeSource(null) // restore time source } @@ -48,7 +48,7 @@ internal class VirtualTimeSource( @Volatile private var time: Long = 0 - private var trackedTasks = 0 + private val trackedTasks = HashSet() private val threads = ConcurrentHashMap() @@ -56,22 +56,21 @@ internal class VirtualTimeSource( override fun nanoTime(): Long = time override fun wrapTask(block: Runnable): Runnable { - trackTask() + trackTask(block) return Runnable { try { block.run() } - finally { unTrackTask() } + finally { unTrackTask(block) } } } @Synchronized - override fun trackTask() { - trackedTasks++ + override fun trackTask(obj: Any) { + trackedTasks.add(obj) } @Synchronized - override fun unTrackTask() { - assert(trackedTasks > 0) - trackedTasks-- + override fun unTrackTask(obj: Any) { + trackedTasks.remove(obj) } @Synchronized @@ -125,7 +124,7 @@ internal class VirtualTimeSource( return } if (threads[mainThread] == null) return - if (trackedTasks != 0) return + if (trackedTasks.isNotEmpty()) return val minParkedTill = minParkedTill() if (minParkedTill <= time) return time = minParkedTill diff --git a/kotlinx-coroutines-core/jvm/test/channels/InvokeOnCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/InvokeOnCloseStressTest.kt index 19aa0402e2..66eb9c7bd4 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/InvokeOnCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/InvokeOnCloseStressTest.kt @@ -4,7 +4,6 @@ import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test -import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.coroutines.* import kotlin.test.* diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 65102095b1..f60a32505b 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -14,7 +14,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() @Parameterized.Parameters(name = "{0}") @JvmStatic fun params(): Collection> = - Channel.values().map { arrayOf(it) } + Channel.entries.map { arrayOf(it) } } enum class Channel { @@ -35,13 +35,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun testDelay() = withVirtualTimeSource { runTest { val delayChannel = channelFactory(delay = 10000) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(5000) delayChannel.checkEmpty() delay(5100) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() delay(5100) @@ -57,13 +57,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() delay(500) delayChannel.checkEmpty() delay(300) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() // Regular delay delay(750) delayChannel.checkEmpty() delay(260) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -72,7 +72,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun testReceive() = withVirtualTimeSource { runTest { val delayChannel = channelFactory(delay = 1000) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() var value = withTimeoutOrNull(750) { delayChannel.receive() 1 @@ -93,6 +93,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun testComplexOperator() = withVirtualTimeSource { runTest { val producer = GlobalScope.produce { + delay(1) // ensure that the ordering of dispatches doesn't affect the result for (i in 1..7) { send(i) delay(1000) @@ -158,7 +159,7 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() fun ReceiveChannel.checkEmpty() = assertNull(tryReceive().getOrNull()) -fun ReceiveChannel.checkNotEmpty() { - assertNotNull(tryReceive().getOrNull()) +suspend fun ReceiveChannel.receiveSingle() { + receive() assertNull(tryReceive().getOrNull()) } diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt index 051d670743..0f7e6b1bf9 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt @@ -9,15 +9,15 @@ class TickerChannelTest : TestBase() { fun testFixedDelayChannelBackpressure() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0, mode = TickerMode.FIXED_DELAY) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(1500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(500) delayChannel.checkEmpty() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -26,17 +26,17 @@ class TickerChannelTest : TestBase() { fun testDelayChannelBackpressure() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(1500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(500) delayChannel.checkEmpty() delay(520) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } @@ -45,17 +45,17 @@ class TickerChannelTest : TestBase() { fun testDelayChannelBackpressure2() = withVirtualTimeSource { runTest { val delayChannel = ticker(delayMillis = 200, initialDelayMillis = 0) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.checkEmpty() delay(500) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(110) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delay(110) delayChannel.checkEmpty() delay(110) - delayChannel.checkNotEmpty() + delayChannel.receiveSingle() delayChannel.cancel() } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index 1cf6c2bd4d..1614bceba0 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -20,7 +20,7 @@ class DispatcherGuideTest { test("ExampleContext02") { kotlinx.coroutines.guide.exampleContext02.main() }.verifyLinesStart( "Unconfined : I'm working in thread main", "main runBlocking: I'm working in thread main", - "Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor", + "Unconfined : After delay in thread DefaultDispatcher-worker-1 @coroutine#2", "main runBlocking: After delay in thread main" ) } diff --git a/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt b/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt index 81178e193c..49d7a7cb34 100644 --- a/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt +++ b/kotlinx-coroutines-core/jvm/test/jdk8/future/FutureTest.kt @@ -14,6 +14,7 @@ import kotlin.concurrent.withLock import kotlin.coroutines.* import kotlin.reflect.* import kotlin.test.* +import kotlinx.coroutines.testing.CountDownLatch class FutureTest : TestBase() { @Before diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index d0a5551567..3821cb6824 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -24,7 +24,6 @@ fun test(name: String, block: () -> R): List = outputException(name) try { captureOutput(name, stdoutEnabled = OUT_ENABLED) { log -> DefaultScheduler.usePrivateScheduler() - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) resetCoroutineId() val threadsBefore = currentThreads() try { @@ -35,9 +34,8 @@ fun test(name: String, block: () -> R): List = outputException(name) } finally { // the shutdown log.println("--- shutting down") - DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) shutdownDispatcherPools(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks + DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks } checkTestThreads(threadsBefore) // check thread if the main completed successfully } @@ -55,7 +53,7 @@ private fun shutdownDispatcherPools(timeout: Long) { (thread.dispatcher.executor as ExecutorService).apply { shutdown() awaitTermination(timeout, TimeUnit.MILLISECONDS) - shutdownNow().forEach { DefaultExecutor.enqueue(it) } + shutdownNow().forEach { rescheduleTaskFromClosedDispatcher(it) } } } } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt index ee21be23fc..398d1150f7 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines.scheduling -import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import org.junit.* import org.junit.Test diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt index c891878ffb..5f0bdf6c53 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerOversubscriptionTest.kt @@ -2,9 +2,10 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.testing.* import kotlinx.coroutines.* -import org.junit.Test import java.util.concurrent.* import java.util.concurrent.atomic.AtomicInteger +import kotlinx.coroutines.testing.CountDownLatch +import kotlin.test.Test class CoroutineSchedulerOversubscriptionTest : TestBase() { diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt index 615cdeea7c..3ff423a4bf 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt @@ -4,9 +4,7 @@ import kotlinx.coroutines.testing.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* -import org.junit.* -import org.junit.Test -import java.util.concurrent.* +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.* import kotlin.coroutines.* import kotlin.test.* @@ -27,7 +25,7 @@ class CoroutineSchedulerStressTest : TestBase() { private val processed = AtomicInteger(0) private val finishLatch = CountDownLatch(1) - @After + @AfterTest fun tearDown() { dispatcher.close() } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt index fe09090362..adeec3e35b 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt @@ -1,9 +1,6 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.testing.* -import org.junit.Test -import java.lang.Runnable -import java.util.concurrent.* import kotlin.coroutines.* import kotlin.test.* diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/DefaultDispatchersTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/DefaultDispatchersTest.kt index f06f7cbe88..adde1d3b6b 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/DefaultDispatchersTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/DefaultDispatchersTest.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.* import org.junit.Test import java.util.concurrent.* import java.util.concurrent.atomic.* +import kotlinx.coroutines.testing.CountDownLatch import kotlin.test.* class DefaultDispatchersTest : TestBase() { diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index 33e32838da..81d66422ba 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -13,33 +13,6 @@ abstract class SchedulerTestBase : TestBase() { companion object { val CORES_COUNT = AVAILABLE_PROCESSORS - /** - * Asserts that [expectedThreadsCount] pool worker threads were created. - * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking - */ - fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { - val threadsCount = maxSequenceNumber()!! - assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount") - } - - /** - * Asserts that any number of pool worker threads in [range] were created. - * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking - */ - fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) { - val maxSequenceNumber = maxSequenceNumber()!! - val r = (range.first)..(range.last + base) - assertTrue( - maxSequenceNumber in r, - "Expected pool threads to be in interval $r, but has $maxSequenceNumber" - ) - } - - private fun maxSequenceNumber(): Int? { - return Thread.getAllStackTraces().keys.asSequence().filter { it is CoroutineScheduler.Worker } - .map { sequenceNumber(it.name) }.maxOrNull() - } - private fun sequenceNumber(threadName: String): Int { val suffix = threadName.substring(threadName.lastIndexOf("-") + 1) val separatorIndex = suffix.indexOf(' ') @@ -49,8 +22,6 @@ abstract class SchedulerTestBase : TestBase() { return suffix.substring(0, separatorIndex).toInt() } - - suspend fun Iterable.joinAll() = forEach { it.join() } } protected var corePoolSize = CORES_COUNT @@ -85,19 +56,43 @@ abstract class SchedulerTestBase : TestBase() { return _dispatcher!!.limitedParallelism(parallelism) } + /** + * Asserts that [expectedThreadsCount] pool worker threads were created. + * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking + */ + fun checkPoolThreadsCreated(expectedThreadsCount: Int = CORES_COUNT) { + val threadsCount = maxSequenceNumber()!! + assertEquals(expectedThreadsCount, threadsCount, "Expected $expectedThreadsCount pool threads, but has $threadsCount") + } + + /** + * Asserts that any number of pool worker threads in [range] were created. + * Note that 'created' doesn't mean 'exists' because pool supports dynamic shrinking + */ + fun checkPoolThreadsCreated(range: IntRange, base: Int = CORES_COUNT) { + val maxSequenceNumber = maxSequenceNumber()!! + val r = (range.first)..(range.last + base) + assertTrue( + maxSequenceNumber in r, + "Expected pool threads to be in interval $r, but has $maxSequenceNumber" + ) + } + + private fun maxSequenceNumber(): Int? { + return Thread.getAllStackTraces().keys.asSequence().filter { + it is CoroutineScheduler.Worker && it.scheduler === _dispatcher?.executor + }.map { sequenceNumber(it.name) }.maxOrNull() + } + @After fun after() { - runBlocking { - withTimeout(5_000) { - _dispatcher?.close() - } - } + _dispatcher?.close() } } /** * Implementation note: - * Our [Dispatcher.IO] is a [limitedParallelism][CoroutineDispatcher.limitedParallelism] dispatcher + * Our [Dispatchers.IO] is a [limitedParallelism][CoroutineDispatcher.limitedParallelism] dispatcher * on top of unbounded scheduler. We want to test this scenario, but on top of non-singleton * scheduler so we can control the number of threads, thus this method. */ @@ -106,11 +101,11 @@ internal fun SchedulerCoroutineDispatcher.blocking(parallelism: Int = 16): Corou @InternalCoroutinesApi override fun dispatchYield(context: CoroutineContext, block: Runnable) { - this@blocking.dispatchWithContext(block, BlockingContext, true) + this@blocking.dispatchWithContext(block, BlockingContext, fair = true, track = true) } override fun dispatch(context: CoroutineContext, block: Runnable) { - this@blocking.dispatchWithContext(block, BlockingContext, false) + this@blocking.dispatchWithContext(block, BlockingContext, fair = false, track = true) } }.limitedParallelism(parallelism) } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt index 1bf2434f16..7c714954be 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueStressTest.kt @@ -1,10 +1,8 @@ package kotlinx.coroutines.scheduling import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* import org.junit.* import org.junit.Test -import java.util.concurrent.* import kotlin.concurrent.* import kotlin.jvm.internal.* import kotlin.test.* diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index 3f4c8d9a01..2334ab7164 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -3,32 +3,8 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* -internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { - - private val delegate = WorkerDispatcher(name = "DefaultExecutor") - - override fun dispatch(context: CoroutineContext, block: Runnable) { - delegate.dispatch(context, block) - } - - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - delegate.scheduleResumeAfterDelay(timeMillis, continuation) - } - - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - return delegate.invokeOnTimeout(timeMillis, block, context) - } - - actual fun enqueue(task: Runnable): Unit { - delegate.dispatch(EmptyCoroutineContext, task) - } -} - internal expect fun createDefaultDispatcher(): CoroutineDispatcher -@PublishedApi -internal actual val DefaultDelay: Delay = DefaultExecutor - public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { val combined = coroutineContext + context return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) diff --git a/kotlinx-coroutines-core/native/src/DefaultDelay.kt b/kotlinx-coroutines-core/native/src/DefaultDelay.kt new file mode 100644 index 0000000000..b4570744e8 --- /dev/null +++ b/kotlinx-coroutines-core/native/src/DefaultDelay.kt @@ -0,0 +1,102 @@ +package kotlinx.coroutines + +import kotlinx.atomicfu.* +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* +import kotlin.native.concurrent.ObsoleteWorkersApi +import kotlin.native.concurrent.Worker + +@PublishedApi +internal actual val DefaultDelay: Delay get() = DefaultDelayImpl + +@OptIn(ObsoleteWorkersApi::class) +private object DefaultDelayImpl : EventLoopImplBase(), Runnable { + init { + incrementUseCount() // this event loop is never completed + } + + private val _thread = atomic(null) + + /** Can only happen when tests close the default executor */ + override fun reschedule(now: Long, delayedTask: DelayedTask) { + throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown") + } + + /** + * All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on + * ``` + * runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } } + * ``` + * + * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), + * but it's not exposed as public API. + */ + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeMillis, block) + + override fun run() { + val currentThread = Worker.current + // Identity comparisons do not work for value classes, but comparing `null` with non-null should still work + if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread + ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop) + try { + while (true) { + val parkNanos = processNextEvent() + if (parkNanos == Long.MAX_VALUE) break // no more events + if (parkNanos > 0) currentThread.park(parkNanos / 1000L, true) + } + } finally { + _thread.value = null + ThreadLocalEventLoop.resetEventLoop() + // recheck if queues are empty after _thread reference was set to null (!!!) + if (!delayedQueueIsEmpty) { + /* recreate the thread, as there is still work to do, + and `unpark` could have awoken the thread we're currently running on */ + startThreadOrObtainSleepingThread() + } + } + } + + override fun startThreadOrObtainSleepingThread(): Worker? { + // Check if the thread is already running + _thread.value?.let { return it } + /* Now we know that at the moment of this call the thread was not initially running. + This means that whatever thread is going to be running by the end of this function, + it's going to notice the tasks it's supposed to run. + We can return `null` unconditionally. */ + scheduleBackgroundIoTask(this) + return null + } + + override fun toString(): String = "DefaultDelay" +} + +private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop { + override val thisLoopsTaskCanAvoidYielding: Boolean + get() = defaultDelayRunningUnconfinedLoop() + + override val isUnconfinedLoopActive: Boolean get() = false + + override fun runUnconfinedEventLoop(initialBlock: () -> Unit) { + ioView.dispatch(ioView, Runnable { + ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock) + }) + } + + override fun dispatchUnconfined(task: DispatchedTask<*>) = + defaultDelayRunningUnconfinedLoop() + + override fun tryUseAsEventLoop(): EventLoop? = null +} + +private fun defaultDelayRunningUnconfinedLoop(): Nothing { + throw UnsupportedOperationException( + "This method can only be called from the thread where an unconfined event loop is running, " + + "but no tasks can run on this thread." + ) +} + +/** A view separate from [Dispatchers.IO]. + * [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */ +private val ioView by lazy { Dispatchers.IO.limitedParallelism(Int.MAX_VALUE) } +// Without `lazy`, there is a cycle between `ioView` and `DefaultDelay` initialization, leading to a segfault. diff --git a/kotlinx-coroutines-core/native/src/Dispatchers.kt b/kotlinx-coroutines-core/native/src/Dispatchers.kt index e66c05f61d..0e210e4c52 100644 --- a/kotlinx-coroutines-core/native/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/native/src/Dispatchers.kt @@ -1,6 +1,5 @@ package kotlinx.coroutines -import kotlinx.coroutines.internal.* import kotlin.coroutines.* @@ -41,9 +40,14 @@ internal object DefaultIoScheduler : CoroutineDispatcher() { io.dispatchYield(context, block) } + internal fun dispatchToUnlimitedPool(block: Runnable) { + unlimitedPool.dispatch(EmptyCoroutineContext, block) + } + override fun toString(): String = "Dispatchers.IO" } +internal fun scheduleBackgroundIoTask(block: Runnable) = DefaultIoScheduler.dispatchToUnlimitedPool(block) @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public actual val Dispatchers.IO: CoroutineDispatcher get() = IO diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index 58128d52fd..9cc87a48a7 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -2,30 +2,29 @@ package kotlinx.coroutines -import kotlin.coroutines.* import kotlin.native.concurrent.* import kotlin.time.* internal actual abstract class EventLoopImplPlatform : EventLoop() { - - private val current = Worker.current + /** Returns `null` if a thread was created and doesn't need to be awoken. + * Returns a thread to awaken if the thread already existed when this method was called. */ + protected abstract fun startThreadOrObtainSleepingThread(): Worker? protected actual fun unpark() { - current.executeAfter(0L, {})// send an empty task to unpark the waiting event loop - } - - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) - DefaultExecutor.invokeOnTimeout(delayTimeMillis, delayedTask, EmptyCoroutineContext) + startThreadOrObtainSleepingThread()?.let { + it.executeAfter(0L, {}) + } } } -internal class EventLoopImpl: EventLoopImplBase() { - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block, context) +internal class BlockingEventLoop( + private val worker: Worker +) : EventLoopImplBase() { + override fun startThreadOrObtainSleepingThread(): Worker? = + if (Worker.current.id != worker.id) worker else null } -internal actual fun createEventLoop(): EventLoop = EventLoopImpl() +internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Worker.current) private val startingPoint = TimeSource.Monotonic.markNow() diff --git a/kotlinx-coroutines-core/native/test/RunBlockingNativeTest.kt b/kotlinx-coroutines-core/native/test/RunBlockingNativeTest.kt new file mode 100644 index 0000000000..f388f90398 --- /dev/null +++ b/kotlinx-coroutines-core/native/test/RunBlockingNativeTest.kt @@ -0,0 +1,7 @@ +package kotlinx.coroutines + +import kotlin.native.concurrent.ObsoleteWorkersApi +import kotlin.native.concurrent.Worker + +@OptIn(ObsoleteWorkersApi::class) +internal actual fun runningOnIoThread(): Boolean = Worker.current.name.startsWith("Dispatchers.IO") diff --git a/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt b/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt index 5d200d328a..bc9fe4a019 100644 --- a/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/nativeOther/src/Dispatchers.kt @@ -8,9 +8,13 @@ internal actual fun createMainDispatcher(default: CoroutineDispatcher): MainCoro internal actual fun createDefaultDispatcher(): CoroutineDispatcher = DefaultDispatcher +/** + * This is not just `private val DefaultDispatcher = newFixedThreadPoolContext(...)` to + * 1. Prevent casting [Dispatchers.Default] to [CloseableCoroutineDispatcher] and closing it + * 2. Make it non-[Delay] + */ private object DefaultDispatcher : CoroutineDispatcher() { // Be consistent with JVM -- at least 2 threads to provide some liveness guarantees in case of improper uses - @OptIn(ExperimentalStdlibApi::class) private val ctx = newFixedThreadPoolContext(Platform.getAvailableProcessors().coerceAtLeast(2), "Dispatchers.Default") override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt index a0f392e5b0..1c7ebe44a0 100644 --- a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt +++ b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt @@ -13,7 +13,7 @@ private external fun wasiRawClockTimeGet(clockId: Int, precision: Long, resultPt private const val CLOCKID_MONOTONIC = 1 -internal actual fun createEventLoop(): EventLoop = DefaultExecutor +internal actual fun createEventLoop(): EventLoop = GlobalEventLoop internal actual fun nanoTime(): Long = withScopedMemoryAllocator { allocator: MemoryAllocator -> val ptrTo8Bytes = allocator.allocate(8) @@ -38,7 +38,7 @@ private fun sleep(nanos: Long, ptrTo32Bytes: Pointer, ptrTo8Bytes: Pointer, ptrT check(returnCode == 0) { "poll_oneoff failed with the return code $returnCode" } } -internal actual object DefaultExecutor : EventLoopImplBase() { +private object GlobalEventLoop : EventLoopImplBase() { init { if (kotlin.wasm.internal.onExportedFunctionExit == null) { @@ -59,12 +59,6 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() { // do nothing: in WASI, no external callbacks can be invoked while `poll_oneoff` is running, // so it is both impossible and unnecessary to unpark the event loop } - - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - // throw; on WASI, the event loop is the default executor, we can't shut it down or reschedule tasks - // to anyone else - throw UnsupportedOperationException("runBlocking event loop is not supported") - } } internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block() @@ -74,7 +68,7 @@ internal fun runEventLoop() { val ptrToSubscription = initializeSubscriptionPtr(allocator) val ptrTo32Bytes = allocator.allocate(32) val ptrTo8Bytes = allocator.allocate(8) - val eventLoop = DefaultExecutor + val eventLoop = GlobalEventLoop eventLoop.incrementUseCount() try { while (true) { @@ -117,4 +111,7 @@ private fun initializeSubscriptionPtr(allocator: MemoryAllocator): Pointer { return ptrToSubscription } -internal actual fun createDefaultDispatcher(): CoroutineDispatcher = DefaultExecutor +internal actual fun createDefaultDispatcher(): CoroutineDispatcher = GlobalEventLoop + +internal actual fun rescheduleTaskFromClosedDispatcher(task: Runnable) = + GlobalEventLoop.enqueue(task) diff --git a/kotlinx-coroutines-debug/test/DebugTestBase.kt b/kotlinx-coroutines-debug/test/DebugTestBase.kt index 93cb2f60bf..720910802e 100644 --- a/kotlinx-coroutines-debug/test/DebugTestBase.kt +++ b/kotlinx-coroutines-debug/test/DebugTestBase.kt @@ -2,7 +2,6 @@ package kotlinx.coroutines.debug import kotlinx.coroutines.testing.* -import kotlinx.coroutines.* import kotlinx.coroutines.debug.junit4.* import org.junit.* diff --git a/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt b/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt index bafea1f031..6ce7bd9bf0 100644 --- a/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt +++ b/kotlinx-coroutines-debug/test/RunningThreadStackMergeTest.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.debug.internal.* import org.junit.Test import java.util.concurrent.* import kotlin.test.* +import kotlinx.coroutines.testing.CountDownLatch class RunningThreadStackMergeTest : DebugTestBase() { diff --git a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt index b09324cff2..863bf70d3f 100644 --- a/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt @@ -82,7 +82,7 @@ class FlowAsPublisherTest : TestBase() { try { expect(1) publisher.awaitFirstOrNull() - } catch (e: CancellationException) { + } catch (_: CancellationException) { expect(3) } finish(4) diff --git a/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt index 05bb8789bc..a1254a6036 100644 --- a/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt @@ -6,7 +6,7 @@ import kotlinx.coroutines.flow.* import org.junit.* -class CancelledParentAttachTest : TestBase() {; +class CancelledParentAttachTest : TestBase() { @Test fun testFlow() = runTest { diff --git a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt index 18615e2b36..3cbc48eac0 100644 --- a/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt @@ -4,9 +4,7 @@ import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.* -import org.junit.Test import org.reactivestreams.* -import java.util.concurrent.* import kotlin.test.* class FlowAsPublisherTest : TestBase() { @@ -157,7 +155,7 @@ class FlowAsPublisherTest : TestBase() { try { expect(1) publisher.awaitFirstOrNull() - } catch (e: CancellationException) { + } catch (_: CancellationException) { expect(3) } finish(4) diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 51e9562a4b..fe9ac489c3 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -161,7 +161,7 @@ class IntegrationTest( } }.let { assertTrue("Expected the message to contain '$message', got '${it.message}'") { - it.message?.contains(message) ?: false + it.message?.contains(message) == true } } } @@ -203,7 +203,7 @@ class IntegrationTest( } catch (e: NoSuchElementException) { // intentionally blank } - }.let { assertTrue(it.message?.contains("onSubscribe") ?: false) } + }.let { assertTrue(it.message?.contains("onSubscribe") == true) } } @Test @@ -222,4 +222,3 @@ class IntegrationTest( } } - diff --git a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt index df01e471e5..57492c5d2f 100644 --- a/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/PublishTest.kt @@ -7,9 +7,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.* import kotlinx.coroutines.testing.exceptions.* -import org.junit.Test import org.reactivestreams.* -import java.util.concurrent.* import kotlin.test.* class PublishTest : TestBase() { @@ -137,7 +135,7 @@ class PublishTest : TestBase() { try { expect(2) publisher.awaitFirstOrNull() - } catch (e: CancellationException) { + } catch (_: CancellationException) { expect(5) } finish(6) @@ -229,7 +227,7 @@ class PublishTest : TestBase() { val result: ChannelResult = producerScope!!.trySend(1) val e = result.exceptionOrNull()!! assertIs(e, "The actual error: $e") - assertTrue(producerScope!!.isClosedForSend) + assertTrue(producerScope.isClosedForSend) assertTrue(result.isFailure) } finish(7) @@ -247,7 +245,7 @@ class PublishTest : TestBase() { pub.collect { throw TestException() } - } catch (e: TestException) { + } catch (_: TestException) { finish(3) } } diff --git a/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt b/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt index 9a4fc5230d..7d51c6c00a 100644 --- a/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/ReactiveStreamTckTest.kt @@ -16,10 +16,10 @@ class ReactiveStreamTckTest : TestBase() { } @DataProvider(name = "dispatchers") - public fun dispatchers(): Array> = Dispatcher.values().map { arrayOf(it) }.toTypedArray() + fun dispatchers(): Array> = Dispatcher.values().map { arrayOf(it) }.toTypedArray() - public class ReactiveStreamTckTestSuite( + class ReactiveStreamTckTestSuite( private val dispatcher: Dispatcher ) : PublisherVerification(TestEnvironment(500, 500)) { @@ -34,7 +34,7 @@ class ReactiveStreamTckTest : TestBase() { } @Test - public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() { + override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() { throw SkipException("Skipped") } diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt index 0a94c8e43f..900e76a166 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -4,11 +4,9 @@ import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* -import org.junit.Test import org.reactivestreams.* import reactor.core.publisher.* import reactor.util.context.Context -import java.util.concurrent.* import kotlin.test.* @Suppress("ReactiveStreamsSubscriberImplementation") diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt index 634bbcd51e..36ede9aa46 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxTest.kt @@ -133,7 +133,7 @@ class FluxTest : TestBase() { // Test exception is not reported to global handler val flow = flux { throw TestException() }.asFlow() repeat(2000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect { } } diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt index 0bcacef06c..46c9397e2a 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsFlowableTest.kt @@ -3,9 +3,7 @@ package kotlinx.coroutines.rx2 import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* -import org.junit.Test import org.reactivestreams.* -import java.util.concurrent.* import kotlin.test.* @Suppress("ReactiveStreamsSubscriberImplementation") diff --git a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt index c976d9b615..6fb8cb8726 100644 --- a/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/FlowAsObservableTest.kt @@ -5,8 +5,6 @@ import io.reactivex.* import io.reactivex.disposables.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* -import org.junit.Test -import java.util.concurrent.* import kotlin.test.* class FlowAsObservableTest : TestBase() { @@ -109,7 +107,7 @@ class FlowAsObservableTest : TestBase() { expect(3) throw TestException() } - } catch (e: TestException) { + } catch (_: TestException) { finish(5) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt index 4f16966422..9eff4fd820 100644 --- a/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/LeakedExceptionTest.kt @@ -23,7 +23,7 @@ class LeakedExceptionTest : TestBase() { val flow = rxSingle(dispatcher) { throw TestException() }.toFlowable().asFlow() runBlocking { repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } @@ -39,7 +39,7 @@ class LeakedExceptionTest : TestBase() { .asFlow() runBlocking { repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } @@ -53,7 +53,7 @@ class LeakedExceptionTest : TestBase() { val flow = rxFlowable(dispatcher) { throw TestException() }.asFlow() runBlocking { repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } @@ -82,7 +82,7 @@ class LeakedExceptionTest : TestBase() { throw TestException() }.asFlow() runBlocking { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt index 0d34a4e5b0..c66ebd3282 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableExceptionHandlingTest.kt @@ -3,14 +3,11 @@ package kotlinx.coroutines.rx2 import kotlinx.coroutines.testing.* import io.reactivex.exceptions.* import kotlinx.coroutines.* -import org.junit.* -import org.junit.Test -import java.util.concurrent.* import kotlin.test.* class ObservableExceptionHandlingTest : TestBase() { - @Before + @BeforeTest fun setup() { ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } diff --git a/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt index cf163b71de..934b3310b6 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt @@ -7,16 +7,15 @@ import io.reactivex.plugins.* import io.reactivex.schedulers.* import kotlinx.coroutines.* import kotlinx.coroutines.sync.* -import org.junit.* -import org.junit.Test import java.lang.Runnable -import java.util.concurrent.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.* import kotlin.test.* class SchedulerTest : TestBase() { - @Before + @BeforeTest fun setup() { ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } @@ -164,7 +163,7 @@ class SchedulerTest : TestBase() { cdl1.countDown() try { cdl2.await() - } catch (e: InterruptedException) { + } catch (_: InterruptedException) { // this is the expected outcome cdl3.countDown() } @@ -490,4 +489,4 @@ class SchedulerTest : TestBase() { } typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable -typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable \ No newline at end of file +typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt index cc565c832a..7e7ddc5c2a 100644 --- a/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsFlowableTest.kt @@ -3,9 +3,7 @@ package kotlinx.coroutines.rx3 import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* -import org.junit.Test import org.reactivestreams.* -import java.util.concurrent.* import kotlin.test.* @Suppress("ReactiveStreamsSubscriberImplementation") diff --git a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt index 8bacb2f249..f3b3740e17 100644 --- a/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/FlowAsObservableTest.kt @@ -5,8 +5,6 @@ import io.reactivex.rxjava3.core.* import io.reactivex.rxjava3.disposables.* import kotlinx.coroutines.* import kotlinx.coroutines.flow.* -import org.junit.Test -import java.util.concurrent.* import kotlin.test.* class FlowAsObservableTest : TestBase() { @@ -109,7 +107,7 @@ class FlowAsObservableTest : TestBase() { expect(3) throw TestException() } - } catch (e: TestException) { + } catch (_: TestException) { finish(5) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt b/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt index 5f9caf5c45..f4391d7a32 100644 --- a/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/LeakedExceptionTest.kt @@ -23,7 +23,7 @@ class LeakedExceptionTest : TestBase() { val flow = rxSingle(dispatcher) { throw TestException() }.toFlowable().asFlow() runBlocking { repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } @@ -39,7 +39,7 @@ class LeakedExceptionTest : TestBase() { .asFlow() runBlocking { repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } @@ -53,7 +53,7 @@ class LeakedExceptionTest : TestBase() { val flow = rxFlowable(dispatcher) { throw TestException() }.asFlow() runBlocking { repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } @@ -82,7 +82,7 @@ class LeakedExceptionTest : TestBase() { throw TestException() }.asFlow() runBlocking { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect {} } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt index 63fb73bf1e..e5854860c1 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableExceptionHandlingTest.kt @@ -3,14 +3,11 @@ package kotlinx.coroutines.rx3 import kotlinx.coroutines.testing.* import io.reactivex.rxjava3.exceptions.* import kotlinx.coroutines.* -import org.junit.* -import org.junit.Test -import java.util.concurrent.* import kotlin.test.* class ObservableExceptionHandlingTest : TestBase() { - @Before + @BeforeTest fun setup() { ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } diff --git a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt index fd59503722..6b0b634874 100644 --- a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt @@ -7,16 +7,15 @@ import io.reactivex.rxjava3.plugins.* import io.reactivex.rxjava3.schedulers.* import kotlinx.coroutines.* import kotlinx.coroutines.sync.* -import org.junit.* -import org.junit.Test import java.lang.Runnable -import java.util.concurrent.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.* import kotlin.test.* class SchedulerTest : TestBase() { - @Before + @BeforeTest fun setup() { ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") } @@ -164,7 +163,7 @@ class SchedulerTest : TestBase() { cdl1.countDown() try { cdl2.await() - } catch (e: InterruptedException) { + } catch (_: InterruptedException) { // this is the expected outcome cdl3.countDown() } @@ -490,4 +489,4 @@ class SchedulerTest : TestBase() { } typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable -typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable \ No newline at end of file +typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable diff --git a/test-utils/build.gradle.kts b/test-utils/build.gradle.kts index 3461d5292b..2e301cce4a 100644 --- a/test-utils/build.gradle.kts +++ b/test-utils/build.gradle.kts @@ -8,6 +8,10 @@ kotlin { api("org.jetbrains.kotlin:kotlin-test-common:${version("kotlin")}") api("org.jetbrains.kotlin:kotlin-test-annotations-common:${version("kotlin")}") } + val concurrentMain by creating { + configureDirectoryPaths() + dependsOn(commonMain.get()) + } jvmMain.dependencies { api("org.jetbrains.kotlin:kotlin-test:${version("kotlin")}") // Workaround to make addSuppressed work in tests @@ -16,6 +20,8 @@ kotlin { api("org.jetbrains.kotlin:kotlin-test-junit:${version("kotlin")}") api("junit:junit:${version("junit")}") } + jvmMain { dependsOn(concurrentMain) } + nativeMain { dependsOn(concurrentMain) } jsMain.dependencies { api("org.jetbrains.kotlin:kotlin-test-js:${version("kotlin")}") } diff --git a/test-utils/concurrent/src/BlockingStructures.kt b/test-utils/concurrent/src/BlockingStructures.kt new file mode 100644 index 0000000000..b29b15a07e --- /dev/null +++ b/test-utils/concurrent/src/BlockingStructures.kt @@ -0,0 +1,8 @@ +package kotlinx.coroutines.testing + +expect class CountDownLatch(initial: Int) { + fun countDown() + fun await() +} + +expect fun CountDownLatch.await(timeout: kotlin.time.Duration): Boolean diff --git a/test-utils/jvm/src/BlockingStructures.kt b/test-utils/jvm/src/BlockingStructures.kt new file mode 100644 index 0000000000..43af07679a --- /dev/null +++ b/test-utils/jvm/src/BlockingStructures.kt @@ -0,0 +1,6 @@ +package kotlinx.coroutines.testing + +actual typealias CountDownLatch = java.util.concurrent.CountDownLatch + +actual fun CountDownLatch.await(timeout: kotlin.time.Duration): Boolean = + await(timeout.inWholeMilliseconds, java.util.concurrent.TimeUnit.MILLISECONDS) diff --git a/test-utils/jvm/src/TestBase.kt b/test-utils/jvm/src/TestBase.kt index 194e71b474..b531556458 100644 --- a/test-utils/jvm/src/TestBase.kt +++ b/test-utils/jvm/src/TestBase.kt @@ -233,7 +233,6 @@ fun initPoolsBeforeTest() { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") fun shutdownPoolsAfterTest() { DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) DefaultScheduler.restore() } diff --git a/test-utils/native/src/BlockingStructures.kt b/test-utils/native/src/BlockingStructures.kt new file mode 100644 index 0000000000..2da155dd72 --- /dev/null +++ b/test-utils/native/src/BlockingStructures.kt @@ -0,0 +1,41 @@ +package kotlinx.coroutines.testing + +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeoutOrNull +import kotlin.time.Duration + +actual class CountDownLatch actual constructor(initial: Int) { + private val counter = atomic(initial) + private val wakeUpSignal = Channel(Channel.CONFLATED) + + actual fun countDown() { + if (counter.decrementAndGet() <= 0) { + wakeUpSignal.trySend(Unit) + } + } + + actual fun await() { + if (counter.value > 0) { + runBlocking { + wakeUpSignal.receive() + wakeUpSignal.trySend(Unit) + } + } + } + + internal fun awaitTimingOut(timeout: Duration): Boolean = counter.value > 0 || runBlocking { + val result = withTimeoutOrNull(timeout) { + wakeUpSignal.receive() + } + if (result == null) { + false + } else { + wakeUpSignal.trySend(Unit) + true + } + } +} + +actual fun CountDownLatch.await(timeout: Duration) = awaitTimingOut(timeout)