diff --git a/docs/topics/cancellation-and-timeouts.md b/docs/topics/cancellation-and-timeouts.md index ac99d5fee3..56524f5200 100644 --- a/docs/topics/cancellation-and-timeouts.md +++ b/docs/topics/cancellation-and-timeouts.md @@ -322,7 +322,7 @@ It produces the following output: I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... -Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms +Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1.3s ``` diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 0e35d5fb38..60e8b75052 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -297,20 +297,21 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls { } public abstract interface class kotlinx/coroutines/Delay { - public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public abstract fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public abstract fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public abstract fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/Delay$DefaultImpls { - public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public static fun invokeOnTimeout-KLykuaI (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public static fun timeoutMessage-LRDsOJo (Lkotlinx/coroutines/Delay;J)Ljava/lang/String; } public final class kotlinx/coroutines/DelayKt { public static final fun awaitCancellation (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun delay-VtjQ1oo (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun toDelayMillis-LRDsOJo (J)J } public abstract interface annotation class kotlinx/coroutines/DelicateCoroutinesApi : java/lang/annotation/Annotation { diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index 373a1eee52..8b2a0eeb4e 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -293,9 +293,9 @@ abstract interface kotlinx.coroutines/CoroutineScope { // kotlinx.coroutines/Cor } abstract interface kotlinx.coroutines/Delay { // kotlinx.coroutines/Delay|null[0] - abstract fun scheduleResumeAfterDelay(kotlin/Long, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines/Delay.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.Long;kotlinx.coroutines.CancellableContinuation){}[0] - open fun invokeOnTimeout(kotlin/Long, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines/Delay.invokeOnTimeout|invokeOnTimeout(kotlin.Long;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] - open suspend fun delay(kotlin/Long) // kotlinx.coroutines/Delay.delay|delay(kotlin.Long){}[0] + abstract fun scheduleResumeAfterDelay(kotlin.time/Duration, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines/Delay.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.time.Duration;kotlinx.coroutines.CancellableContinuation){}[0] + open fun invokeOnTimeout(kotlin.time/Duration, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines/Delay.invokeOnTimeout|invokeOnTimeout(kotlin.time.Duration;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] + open fun timeoutMessage(kotlin.time/Duration): kotlin/String // kotlinx.coroutines/Delay.timeoutMessage|timeoutMessage(kotlin.time.Duration){}[0] } abstract interface kotlinx.coroutines/Job : kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines/Job|null[0] @@ -758,6 +758,7 @@ final fun (kotlin.coroutines/CoroutineContext).kotlinx.coroutines/ensureActive() final fun (kotlin.coroutines/CoroutineContext).kotlinx.coroutines/newCoroutineContext(kotlin.coroutines/CoroutineContext): kotlin.coroutines/CoroutineContext // kotlinx.coroutines/newCoroutineContext|newCoroutineContext@kotlin.coroutines.CoroutineContext(kotlin.coroutines.CoroutineContext){}[0] final fun (kotlin.ranges/IntRange).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.ranges.IntRange(){}[0] final fun (kotlin.ranges/LongRange).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.ranges.LongRange(){}[0] +final fun (kotlin.time/Duration).kotlinx.coroutines/toDelayMillis(): kotlin/Long // kotlinx.coroutines/toDelayMillis|toDelayMillis@kotlin.time.Duration(){}[0] final fun (kotlin/IntArray).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.IntArray(){}[0] final fun (kotlin/LongArray).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.LongArray(){}[0] final fun (kotlinx.coroutines.channels/ReceiveChannel<*>).kotlinx.coroutines.channels/cancelConsumed(kotlin/Throwable?) // kotlinx.coroutines.channels/cancelConsumed|cancelConsumed@kotlinx.coroutines.channels.ReceiveChannel<*>(kotlin.Throwable?){}[0] diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index 67d3d16bb1..a546b1232f 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.time.* import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.Duration.Companion.milliseconds /** * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support @@ -16,19 +17,8 @@ import kotlin.time.Duration.Companion.nanoseconds */ @InternalCoroutinesApi public interface Delay { - - /** @suppress **/ - @Deprecated( - message = "Deprecated without replacement as an internal method never intended for public use", - level = DeprecationLevel.ERROR - ) // Error since 1.6.0 - public suspend fun delay(time: Long) { - if (time <= 0) return // don't delay - return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) } - } - /** - * Schedules resume of a specified [continuation] after a specified delay [timeMillis]. + * Schedules resume of a specified [continuation] after a specified delay [time]. * * Continuation **must be scheduled** to resume even if it is already cancelled, because a cancellation is just * an exception that the coroutine that used `delay` might wanted to catch and process. It might @@ -42,28 +32,20 @@ public interface Delay { * with(continuation) { resumeUndispatchedWith(Unit) } * ``` */ - public fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) + public fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) /** - * Schedules invocation of a specified [block] after a specified delay [timeMillis]. + * Schedules invocation of a specified [block] after a specified delay [timeout]. * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation * request if it is not needed anymore. */ - public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block, context) -} + public fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + DefaultDelay.invokeOnTimeout(timeout, block, context) -/** - * Enhanced [Delay] interface that provides additional diagnostics for [withTimeout]. - * Is going to be removed once there is proper JVM-default support. - * Then we'll be able put this function into [Delay] without breaking binary compatibility. - */ -@InternalCoroutinesApi -internal interface DelayWithTimeoutDiagnostics : Delay { /** * Returns a string that explains that the timeout has occurred, and explains what can be done about it. */ - fun timeoutMessage(timeout: Duration): String + public fun timeoutMessage(timeout: Duration): String = "Timed out waiting for $timeout" } /** @@ -103,8 +85,8 @@ internal interface DelayWithTimeoutDiagnostics : Delay { public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {} /** - * Delays coroutine for at least the given time without blocking a thread and resumes it after a specified time. - * If the given [timeMillis] is non-positive, this function returns immediately. + * Delays coroutine for at least the given [duration] without blocking a thread and resumes it after the specified time. + * If the given [duration] is non-positive, this function returns immediately. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. @@ -116,21 +98,20 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {} * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. * * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. - * @param timeMillis time in milliseconds. */ -public suspend fun delay(timeMillis: Long) { - if (timeMillis <= 0) return // don't delay +public suspend fun delay(duration: Duration) { + if (duration <= Duration.ZERO) return // don't delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation -> - // if timeMillis == Long.MAX_VALUE then just wait forever like awaitCancellation, don't schedule. - if (timeMillis < Long.MAX_VALUE) { - cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) + // instead of actually waiting for an infinite time, just wait forever like awaitCancellation, don't schedule. + if (duration.isFinite()) { + cont.context.delay.scheduleResumeAfterDelay(duration, cont) } } } /** - * Delays coroutine for at least the given [duration] without blocking a thread and resumes it after the specified time. - * If the given [duration] is non-positive, this function returns immediately. + * Delays coroutine for at least the given time without blocking a thread and resumes it after a specified time. + * If the given [timeMillis] is non-positive, this function returns immediately. * * This suspending function is cancellable: if the [Job] of the current coroutine is cancelled while this * suspending function is waiting, this function immediately resumes with [CancellationException]. @@ -142,8 +123,11 @@ public suspend fun delay(timeMillis: Long) { * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. * * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. + * @param timeMillis time in milliseconds. */ -public suspend fun delay(duration: Duration): Unit = delay(duration.toDelayMillis()) +public suspend fun delay(timeMillis: Long) { + delay(timeMillis.milliseconds) +} /** Returns [Delay] implementation of the given context */ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay @@ -152,6 +136,7 @@ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) * Convert this duration to its millisecond value. Durations which have a nanosecond component less than * a single millisecond will be rounded up to the next largest millisecond. */ +@PublishedApi internal fun Duration.toDelayMillis(): Long = when (isPositive()) { true -> plus(999_999L.nanoseconds).inWholeMilliseconds false -> 0L diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 84291a1b69..da55f940b6 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.internal.* import kotlin.concurrent.Volatile import kotlin.coroutines.* import kotlin.jvm.* +import kotlin.time.Duration /** * Extended by [CoroutineDispatcher] implementations that have event loop inside and can @@ -144,24 +145,12 @@ private const val SCHEDULE_OK = 0 private const val SCHEDULE_COMPLETED = 1 private const val SCHEDULE_DISPOSED = 2 -private const val MS_TO_NS = 1_000_000L -private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS - /** * First-line overflow protection -- limit maximal delay. * Delays longer than this one (~146 years) are considered to be delayed "forever". */ private const val MAX_DELAY_NS = Long.MAX_VALUE / 2 -internal fun delayToNanos(timeMillis: Long): Long = when { - timeMillis <= 0 -> 0L - timeMillis >= MAX_MS -> Long.MAX_VALUE - else -> timeMillis * MS_TO_NS -} - -internal fun delayNanosToMillis(timeNanos: Long): Long = - timeNanos / MS_TO_NS - private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY") private typealias Queue = LockFreeTaskQueueCore @@ -224,8 +213,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { rescheduleAllDelayed() } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val timeNanos = delayToNanos(timeMillis) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val timeNanos = time.inWholeNanoseconds if (timeNanos < MAX_DELAY_NS) { val now = nanoTime() DelayedResumeTask(now + timeNanos, continuation).also { task -> @@ -240,8 +229,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } - protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - val timeNanos = delayToNanos(timeMillis) + protected fun scheduleInvokeOnTimeout(timeout: Duration, block: Runnable): DisposableHandle { + val timeNanos = timeout.inWholeNanoseconds return if (timeNanos < MAX_DELAY_NS) { val now = nanoTime() DelayedRunnableTask(now + timeNanos, block).also { task -> diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index 65e68ba299..bbb961ec3c 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -14,9 +14,9 @@ import kotlin.time.* import kotlin.time.Duration.Companion.milliseconds /** - * Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws + * Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws * a [TimeoutCancellationException] if the timeout was exceeded. - * If the given [timeMillis] is non-positive, [TimeoutCancellationException] is thrown immediately. + * If the given [timeout] is non-positive, [TimeoutCancellationException] is thrown immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * the cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -32,23 +32,21 @@ import kotlin.time.Duration.Companion.milliseconds * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. - * - * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T { +public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately") + if (timeout <= Duration.ZERO) throw TimeoutCancellationException("Timed out immediately") return suspendCoroutineUninterceptedOrReturn { uCont -> - setupTimeout(TimeoutCoroutine(timeMillis, uCont), block) + setupTimeout(TimeoutCoroutine(timeout, uCont), block) } } /** - * Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws + * Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws * a [TimeoutCancellationException] if the timeout was exceeded. - * If the given [timeout] is non-positive, [TimeoutCancellationException] is thrown immediately. + * If the given [timeMillis] is non-positive, [TimeoutCancellationException] is thrown immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * the cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -64,18 +62,20 @@ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineSco * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + * + * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T { +public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T { contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - return withTimeout(timeout.toDelayMillis(), block) + return withTimeout(timeMillis.milliseconds, block) } /** - * Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns + * Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns * `null` if this timeout was exceeded. - * If the given [timeMillis] is non-positive, `null` is returned immediately. + * If the given [timeout] is non-positive, `null` is returned immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -91,16 +91,14 @@ public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineSc * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. - * - * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? { - if (timeMillis <= 0L) return null +public suspend fun withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? { + if (timeout <= Duration.ZERO) return null var coroutine: TimeoutCoroutine? = null try { return suspendCoroutineUninterceptedOrReturn { uCont -> - val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont) + val timeoutCoroutine = TimeoutCoroutine(timeout, uCont) coroutine = timeoutCoroutine setupTimeout(timeoutCoroutine, block) } @@ -114,9 +112,9 @@ public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend Corout } /** - * Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns + * Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns * `null` if this timeout was exceeded. - * If the given [timeout] is non-positive, `null` is returned immediately. + * If the given [timeMillis] is non-positive, `null` is returned immediately. * * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of * cancellable suspending function inside the block throws a [TimeoutCancellationException]. @@ -132,9 +130,11 @@ public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend Corout * section of the coroutines guide for details. * * > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + * + * @param timeMillis timeout time in milliseconds. */ -public suspend fun withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? = - withTimeoutOrNull(timeout.toDelayMillis(), block) +public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? = + withTimeoutOrNull(timeMillis.milliseconds, block) private fun setupTimeout( coroutine: TimeoutCoroutine, @@ -150,11 +150,11 @@ private fun setupTimeout( } private class TimeoutCoroutine( - @JvmField val time: Long, + val time: Duration, uCont: Continuation // unintercepted continuation ) : ScopeCoroutine(uCont.context, uCont), Runnable { override fun run() { - cancelCoroutine(TimeoutCancellationException(time, context.delay, this)) + cancelCoroutine(TimeoutCancellationException(context.delay.timeoutMessage(time), this)) } override fun nameString(): String = @@ -163,6 +163,18 @@ private class TimeoutCoroutine( /** * This exception is thrown by [withTimeout] to indicate timeout. + * + * **Pitfall**: This exception is an instance of [CancellationException] and inherits its behavior. + * In particular, if this exception is not caught, it cancels the coroutine it's thrown from. + * ``` + * // This coroutine will simply be cancelled, without any errors being printed + * launch { + * withTimeout(1.seconds) { + * delay(100.seconds) + * } // will throw TimeoutCancellationException + * error("Will not be printed") + * } + * ``` */ public class TimeoutCancellationException internal constructor( message: String, @@ -178,13 +190,3 @@ public class TimeoutCancellationException internal constructor( override fun createCopy(): TimeoutCancellationException = TimeoutCancellationException(message ?: "", coroutine).also { it.initCause(this) } } - -internal fun TimeoutCancellationException( - time: Long, - delay: Delay, - coroutine: Job -) : TimeoutCancellationException { - val message = (delay as? DelayWithTimeoutDiagnostics)?.timeoutMessage(time.milliseconds) - ?: "Timed out waiting for $time ms" - return TimeoutCancellationException(message, coroutine) -} diff --git a/kotlinx-coroutines-core/common/src/Unconfined.kt b/kotlinx-coroutines-core/common/src/Unconfined.kt index 2e16f951b8..d5def2082a 100644 --- a/kotlinx-coroutines-core/common/src/Unconfined.kt +++ b/kotlinx-coroutines-core/common/src/Unconfined.kt @@ -33,7 +33,7 @@ internal object Unconfined : CoroutineDispatcher() { /** * Used to detect calls to [Unconfined.dispatch] from [yield] function. */ -@PublishedApi +@PublishedApi // for `kotlinx-coroutines-test` internal class YieldContext : AbstractCoroutineContextElement(Key) { companion object Key : CoroutineContext.Key diff --git a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt index b9b73603c4..d1abe2df26 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharingStarted.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.internal.IgnoreJreRequirement import kotlin.time.* +import kotlin.time.Duration.Companion.milliseconds /** * A command emitted by [SharingStarted] implementations to control the sharing coroutine in @@ -102,7 +103,7 @@ public fun interface SharingStarted { stopTimeoutMillis: Long = 0, replayExpirationMillis: Long = Long.MAX_VALUE ): SharingStarted = - StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis) + StartedWhileSubscribed(stopTimeoutMillis.milliseconds, replayExpirationMillis.milliseconds) } /** @@ -135,7 +136,7 @@ public fun SharingStarted.Companion.WhileSubscribed( stopTimeout: Duration = Duration.ZERO, replayExpiration: Duration = Duration.INFINITE ): SharingStarted = - StartedWhileSubscribed(stopTimeout.inWholeMilliseconds, replayExpiration.inWholeMilliseconds) + StartedWhileSubscribed(stopTimeout, replayExpiration) // -------------------------------- implementation -------------------------------- @@ -160,12 +161,12 @@ private class StartedLazily : SharingStarted { } private class StartedWhileSubscribed( - private val stopTimeout: Long, - private val replayExpiration: Long + private val stopTimeout: Duration, + private val replayExpiration: Duration, ) : SharingStarted { init { - require(stopTimeout >= 0) { "stopTimeout($stopTimeout ms) cannot be negative" } - require(replayExpiration >= 0) { "replayExpiration($replayExpiration ms) cannot be negative" } + require(stopTimeout >= Duration.ZERO) { "stopTimeout($stopTimeout) cannot be negative" } + require(replayExpiration >= Duration.ZERO) { "replayExpiration($replayExpiration) cannot be negative" } } override fun command(subscriptionCount: StateFlow): Flow = subscriptionCount @@ -174,7 +175,7 @@ private class StartedWhileSubscribed( emit(SharingCommand.START) } else { delay(stopTimeout) - if (replayExpiration > 0) { + if (replayExpiration > Duration.ZERO) { emit(SharingCommand.STOP) delay(replayExpiration) } @@ -187,8 +188,8 @@ private class StartedWhileSubscribed( @OptIn(ExperimentalStdlibApi::class) override fun toString(): String { val params = buildList(2) { - if (stopTimeout > 0) add("stopTimeout=${stopTimeout}ms") - if (replayExpiration < Long.MAX_VALUE) add("replayExpiration=${replayExpiration}ms") + if (stopTimeout > Duration.ZERO) add("stopTimeout=${stopTimeout}ms") + if (replayExpiration < Duration.INFINITE) add("replayExpiration=${replayExpiration}ms") } return "SharingStarted.WhileSubscribed(${params.joinToString()})" } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 2a701c0c12..b832f6feb5 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* import kotlin.time.* +import kotlin.time.Duration.Companion.milliseconds /* Scaffolding for Knit code examples @@ -27,7 +28,7 @@ fun main() = runBlocking { /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout][timeoutMillis]. + * that are followed by the newer values within the given [timeout]. * The latest value is always emitted. * * Example: @@ -35,17 +36,17 @@ fun main() = runBlocking { * ```kotlin * flow { * emit(1) - * delay(90) + * delay(90.milliseconds) * emit(2) - * delay(90) + * delay(90.milliseconds) * emit(3) - * delay(1010) + * delay(1010.milliseconds) * emit(4) - * delay(1010) + * delay(1010.milliseconds) * emit(5) - * }.debounce(1000) + * }.debounce(1000.milliseconds) * ``` - * + * * * produces the following emissions * @@ -55,18 +56,18 @@ fun main() = runBlocking { * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeoutMillis] milliseconds. + * items faster than every [timeout] milliseconds. */ @FlowPreview -public fun Flow.debounce(timeoutMillis: Long): Flow { - require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" } - if (timeoutMillis == 0L) return this - return debounceInternal { timeoutMillis } +public fun Flow.debounce(timeout: Duration): Flow { + require(!timeout.isNegative()) { "Debounce timeout should not be negative" } + if (timeout == Duration.ZERO) return this + return debounceInternal { timeout } } /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout][timeoutMillis]. + * that are followed by the newer values within the given [timeout]. * The latest value is always emitted. * * A variation of [debounce] that allows specifying the timeout value dynamically. @@ -76,23 +77,23 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { * ```kotlin * flow { * emit(1) - * delay(90) + * delay(90.milliseconds) * emit(2) - * delay(90) + * delay(90.milliseconds) * emit(3) - * delay(1010) + * delay(1010.milliseconds) * emit(4) - * delay(1010) + * delay(1010.milliseconds) * emit(5) * }.debounce { * if (it == 1) { - * 0L + * 0.milliseconds * } else { - * 1000L + * 1000.milliseconds * } * } * ``` - * + * * * produces the following emissions * @@ -102,18 +103,19 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeoutMillis] milliseconds. + * items faster than every [timeout] unit. * - * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds. + * @param timeout [T] is the emitted value and the return value is timeout in [Duration]. */ @FlowPreview +@JvmName("debounceDuration") @OverloadResolutionByLambdaReturnType -public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = - debounceInternal(timeoutMillis) +public fun Flow.debounce(timeout: (T) -> Duration): Flow = + debounceInternal(timeout) /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout]. + * that are followed by the newer values within the given [timeout][timeoutMillis]. * The latest value is always emitted. * * Example: @@ -121,17 +123,17 @@ public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = * ```kotlin * flow { * emit(1) - * delay(90.milliseconds) + * delay(90) * emit(2) - * delay(90.milliseconds) + * delay(90) * emit(3) - * delay(1010.milliseconds) + * delay(1010) * emit(4) - * delay(1010.milliseconds) + * delay(1010) * emit(5) - * }.debounce(1000.milliseconds) + * }.debounce(1000) * ``` - * + * * * produces the following emissions * @@ -141,15 +143,15 @@ public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeout] milliseconds. + * items faster than every [timeoutMillis] milliseconds. */ @FlowPreview -public fun Flow.debounce(timeout: Duration): Flow = - debounce(timeout.toDelayMillis()) +public fun Flow.debounce(timeoutMillis: Long): Flow = + debounce(timeoutMillis.milliseconds) /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout]. + * that are followed by the newer values within the given [timeout][timeoutMillis]. * The latest value is always emitted. * * A variation of [debounce] that allows specifying the timeout value dynamically. @@ -159,23 +161,23 @@ public fun Flow.debounce(timeout: Duration): Flow = * ```kotlin * flow { * emit(1) - * delay(90.milliseconds) + * delay(90) * emit(2) - * delay(90.milliseconds) + * delay(90) * emit(3) - * delay(1010.milliseconds) + * delay(1010) * emit(4) - * delay(1010.milliseconds) + * delay(1010) * emit(5) * }.debounce { * if (it == 1) { - * 0.milliseconds + * 0L * } else { - * 1000.milliseconds + * 1000L * } * } * ``` - * + * * * produces the following emissions * @@ -185,19 +187,16 @@ public fun Flow.debounce(timeout: Duration): Flow = * * * Note that the resulting flow does not emit anything as long as the original flow emits - * items faster than every [timeout] unit. + * items faster than every [timeoutMillis] milliseconds. * - * @param timeout [T] is the emitted value and the return value is timeout in [Duration]. + * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds. */ @FlowPreview -@JvmName("debounceDuration") @OverloadResolutionByLambdaReturnType -public fun Flow.debounce(timeout: (T) -> Duration): Flow = - debounceInternal { emittedItem -> - timeout(emittedItem).toDelayMillis() - } +public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = + debounceInternal { emittedItem -> timeoutMillis(emittedItem).milliseconds } -private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Flow = +private fun Flow.debounceInternal(timeoutSelector: (T) -> Duration): Flow = scopedFlow { downstream -> // Produce the values using the default (rendezvous) channel val values = produce { @@ -206,23 +205,23 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl // Now consume the values var lastValue: Any? = null while (lastValue !== DONE) { - var timeoutMillis = 0L // will be always computed when lastValue != null + var timeout = Duration.ZERO // will be always computed when lastValue != null // Compute timeout for this value if (lastValue != null) { - timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue)) - require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" } - if (timeoutMillis == 0L) { + timeout = timeoutSelector(NULL.unbox(lastValue)) + require(!timeout.isNegative()) { "Debounce timeout should not be negative" } + if (timeout == Duration.ZERO) { downstream.emit(NULL.unbox(lastValue)) lastValue = null // Consume the value } } // assert invariant: lastValue != null implies timeoutMillis > 0 - assert { lastValue == null || timeoutMillis > 0 } + assert { lastValue == null || timeout.isPositive() } // wait for the next value with timeout select { // Set timeout when lastValue exists and is not consumed yet if (lastValue != null) { - onTimeout(timeoutMillis) { + onTimeout(timeout) { downstream.emit(NULL.unbox(lastValue)) lastValue = null // Consume the value } @@ -242,7 +241,7 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl } /** - * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. + * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period]. * * Example: * @@ -250,11 +249,11 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl * flow { * repeat(10) { * emit(it) - * delay(110) + * delay(110.milliseconds) * } - * }.sample(200) + * }.sample(200.milliseconds) * ``` - * + * * * produces the following emissions * @@ -266,14 +265,14 @@ private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long): Fl * Note that the latest element is not emitted if it does not fit into the sampling window. */ @FlowPreview -public fun Flow.sample(periodMillis: Long): Flow { - require(periodMillis > 0) { "Sample period should be positive" } +public fun Flow.sample(period: Duration): Flow { + require(period.isPositive()) { "Sample period should be positive" } return scopedFlow { downstream -> val values = produce(capacity = Channel.CONFLATED) { collect { value -> send(value ?: NULL) } } var lastValue: Any? = null - val ticker = fixedPeriodTicker(periodMillis) + val ticker = fixedPeriodTicker(period) while (lastValue !== DONE) { select { values.onReceiveCatching { result -> @@ -301,19 +300,19 @@ public fun Flow.sample(periodMillis: Long): Flow { * TODO this design (and design of the corresponding operator) depends on #540 */ internal fun CoroutineScope.fixedPeriodTicker( - delayMillis: Long, + period: Duration, ): ReceiveChannel { return produce(capacity = 0) { - delay(delayMillis) + delay(period) while (true) { channel.send(Unit) - delay(delayMillis) + delay(period) } } } /** - * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period]. + * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. * * Example: * @@ -321,11 +320,11 @@ internal fun CoroutineScope.fixedPeriodTicker( * flow { * repeat(10) { * emit(it) - * delay(110.milliseconds) + * delay(110) * } - * }.sample(200.milliseconds) + * }.sample(200) * ``` - * + * * * produces the following emissions * @@ -337,7 +336,7 @@ internal fun CoroutineScope.fixedPeriodTicker( * Note that the latest element is not emitted if it does not fit into the sampling window. */ @FlowPreview -public fun Flow.sample(period: Duration): Flow = sample(period.toDelayMillis()) +public fun Flow.sample(periodMillis: Long): Flow = sample(periodMillis.milliseconds) /** * Returns a flow that will emit a [TimeoutCancellationException] if the upstream doesn't emit an item within the given time. @@ -391,13 +390,11 @@ private fun Flow.timeoutInternal( val values = buffer(Channel.RENDEZVOUS).produceIn(this) whileSelect { values.onReceiveCatching { value -> - value.onSuccess { + !value.onSuccess { downStream.emit(it) }.onClosed { it?.let { throw it } - return@onReceiveCatching false - } - return@onReceiveCatching true + }.isClosed } onTimeout(timeout) { throw TimeoutCancellationException("Timed out waiting for $timeout") diff --git a/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt b/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt index 449972648d..8b4e50b3c9 100644 --- a/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt +++ b/kotlinx-coroutines-core/common/src/selects/OnTimeout.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.selects import kotlinx.coroutines.* import kotlin.time.* +import kotlin.time.Duration.Companion.milliseconds /** * Clause that selects the given [block] after a specified timeout passes. @@ -14,7 +15,7 @@ import kotlin.time.* @ExperimentalCoroutinesApi @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public fun SelectBuilder.onTimeout(timeMillis: Long, block: suspend () -> R): Unit = - OnTimeout(timeMillis).selectClause.invoke(block) + onTimeout(timeMillis.milliseconds, block) /** * Clause that selects the given [block] after the specified [timeout] passes. @@ -24,15 +25,15 @@ public fun SelectBuilder.onTimeout(timeMillis: Long, block: suspend () -> */ @ExperimentalCoroutinesApi public fun SelectBuilder.onTimeout(timeout: Duration, block: suspend () -> R): Unit = - onTimeout(timeout.toDelayMillis(), block) + OnTimeout(timeout).selectClause.invoke(block) /** * We implement [SelectBuilder.onTimeout] as a clause, so each invocation creates * an instance of [OnTimeout] that specifies the registration part according to - * the [timeout][timeMillis] parameter. + * the [timeout] parameter. */ private class OnTimeout( - private val timeMillis: Long + private val timeout: Duration ) { @Suppress("UNCHECKED_CAST") val selectClause: SelectClause0 @@ -44,7 +45,7 @@ private class OnTimeout( @Suppress("UNUSED_PARAMETER") private fun register(select: SelectInstance<*>, ignoredParam: Any?) { // Should this clause complete immediately? - if (timeMillis <= 0) { + if (timeout <= Duration.ZERO) { select.selectInRegistrationPhase(Unit) return } @@ -54,7 +55,7 @@ private class OnTimeout( } select as SelectImplementation<*> val context = select.context - val disposableHandle = context.delay.invokeOnTimeout(timeMillis, action, context) + val disposableHandle = context.delay.invokeOnTimeout(timeout, action, context) // Do not forget to clean-up when this `select` is completed or cancelled. select.disposeOnCompletion(disposableHandle) } diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt index 855b00f2c7..e7f9fdd330 100644 --- a/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt +++ b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt @@ -123,7 +123,7 @@ class WithTimeoutDurationTest : TestBase() { "OK" } } catch (e: CancellationException) { - assertEquals("Timed out waiting for 100 ms", e.message) + assertEquals("Timed out waiting for 100ms", e.message) finish(3) } } diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt index 5f2690c198..5e00e31327 100644 --- a/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt +++ b/kotlinx-coroutines-core/common/test/WithTimeoutTest.kt @@ -115,7 +115,7 @@ class WithTimeoutTest : TestBase() { "OK" } } catch (e: CancellationException) { - assertEquals("Timed out waiting for 100 ms", e.message) + assertEquals("Timed out waiting for 100ms", e.message) finish(3) } } diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 771768e008..6dc03217c7 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -1,15 +1,25 @@ package kotlinx.coroutines +import kotlinx.atomicfu.atomic +import kotlinx.coroutines.internal.ThreadSafeHeap +import kotlinx.coroutines.internal.ThreadSafeHeapNode import kotlinx.coroutines.testing.* import kotlin.coroutines.* import kotlin.jvm.* +import kotlin.time.* +import kotlin.time.Duration.Companion.nanoseconds internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay { private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher - private val heap = ArrayList() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap) + private val heap = ThreadSafeHeap() - var currentTime = 0L - private set + /** This counter establishes some order on the events that happen at the same virtual time. */ + private val count = atomic(0) + + private val timeSource = TestTimeSource() + + val currentTime: ComparableTimeMark + get() = timeSource.markNow() init { /* @@ -24,22 +34,20 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine if (delayNanos <= 0) continue if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) { if (usesSharedEventLoop) { - val targetTime = currentTime + delayNanos + val targetTime = currentTime + delayNanos.nanoseconds while (currentTime < targetTime) { - val nextTask = heap.minByOrNull { it.deadline } ?: break - if (nextTask.deadline > targetTime) break - heap.remove(nextTask) - currentTime = nextTask.deadline + val nextTask = heap.removeFirstIf { it.deadline <= targetTime } ?: break + timeSource += nextTask.deadline - currentTime nextTask.run() } - currentTime = maxOf(currentTime, targetTime) + if (targetTime > currentTime) timeSource += targetTime - currentTime } else { error("Unexpected external delay: $delayNanos") } } - val nextTask = heap.minByOrNull { it.deadline } ?: return@launch + val nextTask = heap.removeFirstOrNull() ?: return@launch heap.remove(nextTask) - currentTime = nextTask.deadline + timeSource += nextTask.deadline - currentTime nextTask.run() } } @@ -47,11 +55,17 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine private inner class TimedTask( private val runnable: Runnable, - @JvmField val deadline: Long - ) : DisposableHandle, Runnable by runnable { + val deadline: ComparableTimeMark, + @JvmField val count: Int, + ) : DisposableHandle, Runnable by runnable, Comparable, ThreadSafeHeapNode { + override var heap: ThreadSafeHeap<*>? = null + override var index: Int = 0 + + override fun compareTo(other: TimedTask): Int = + compareValuesBy(this, other, TimedTask::deadline, TimedTask::count) override fun dispose() { - heap.remove(this) + this@VirtualTimeDispatcher.heap.remove(this) } } @@ -61,20 +75,18 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val task = TimedTask(block, deadline(timeMillis)) - heap += task - return task - } + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + schedule(timeout, block) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, deadline(timeMillis)) - heap += task - continuation.invokeOnCancellation { task.dispose() } + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + schedule(time, Runnable { + with(continuation) { resumeUndispatched(Unit) } + }).also { continuation.disposeOnCancellation(it) } } - private fun deadline(timeMillis: Long) = - if (timeMillis == Long.MAX_VALUE) Long.MAX_VALUE else currentTime + timeMillis + private fun schedule(time: Duration, block: Runnable): DisposableHandle = + TimedTask(block, currentTime + time, count.getAndIncrement()).also { heap.addLast(it) } + } /** diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt index 09450a1c50..67a17eee84 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/SharingStartedTest.kt @@ -4,6 +4,8 @@ import kotlinx.coroutines.testing.* import kotlinx.coroutines.* import kotlin.coroutines.* import kotlin.test.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds /** * Functional tests for [SharingStarted] using [withVirtualTime] and a DSL to describe @@ -55,7 +57,7 @@ class SharingStartedTest : TestBase() { subscriptions(1) // resubscribe again rampUpAndDown() subscriptions(0) - afterTime(100, SharingCommand.STOP) + afterTime(100.milliseconds, SharingCommand.STOP) delay(100) } @@ -69,7 +71,7 @@ class SharingStartedTest : TestBase() { subscriptions(1, SharingCommand.START) rampUpAndDown() subscriptions(0, SharingCommand.STOP) - afterTime(200, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) + afterTime(200.milliseconds, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) } @Test @@ -82,13 +84,13 @@ class SharingStartedTest : TestBase() { subscriptions(1) rampUpAndDown() subscriptions(0) - afterTime(400, SharingCommand.STOP) + afterTime(400.milliseconds, SharingCommand.STOP) delay(250) // don't give it time to reset cache subscriptions(1, SharingCommand.START) rampUpAndDown() subscriptions(0) - afterTime(400, SharingCommand.STOP) - afterTime(300, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) + afterTime(400.milliseconds, SharingCommand.STOP) + afterTime(300.milliseconds, SharingCommand.STOP_AND_RESET_REPLAY_CACHE) delay(100) } @@ -129,9 +131,8 @@ class SharingStartedTest : TestBase() { val subscriptionCount = MutableStateFlow(0) var previousCommand: SharingCommand? = null var expectedCommand: SharingCommand? = initialCommand - var expectedTime = 0L - val dispatcher = coroutineContext[ContinuationInterceptor] as VirtualTimeDispatcher + var expectedTime = dispatcher.currentTime val scope = CoroutineScope(coroutineContext + Job()) suspend fun launch() { @@ -153,28 +154,28 @@ class SharingStartedTest : TestBase() { expectedTime = dispatcher.currentTime subscriptionCount.value = count if (command != null) { - afterTime(0, command) + afterTime(Duration.ZERO, command) } else { letItRun() } } - suspend fun afterTime(time: Long = 0, command: SharingCommand) { + suspend fun afterTime(time: Duration, command: SharingCommand) { expectedCommand = command - val remaining = (time - 1).coerceAtLeast(0) // previous letItRun delayed 1ms + val remaining = (time - 1.milliseconds).coerceAtLeast(Duration.ZERO) // previous letItRun delayed 1ms expectedTime += remaining delay(remaining) letItRun() } private suspend fun letItRun() { - delay(1) + delay(1.milliseconds) assertEquals(expectedCommand, previousCommand) // make sure expected command was emitted - expectedTime++ // make one more time tick we've delayed + expectedTime += 1.milliseconds // make one more time tick we've delayed } fun stop() { scope.cancel() } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt b/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt index c98a0672c9..5da69764ca 100644 --- a/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt +++ b/kotlinx-coroutines-core/jsAndWasmJsShared/src/internal/JSDispatcher.kt @@ -2,6 +2,8 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.DurationUnit internal expect abstract class W3CWindow internal expect fun w3cSetTimeout(window: W3CWindow, handler: () -> Unit, timeout: Int): Int @@ -20,11 +22,6 @@ internal expect class WindowMessageQueue(window: W3CWindow) : MessageQueue { override fun reschedule() } -private const val MAX_DELAY = Int.MAX_VALUE.toLong() - -private fun delayToInt(timeMillis: Long): Int = - timeMillis.coerceIn(0, MAX_DELAY).toInt() - internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay { internal val messageQueue = ScheduledMessageQueue(this) @@ -39,13 +36,14 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay messageQueue.enqueue(block) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val handle = w3cSetTimeout({ block.run() }, delayToInt(timeMillis)) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val handle = w3cSetTimeout({ block.run() }, timeout.toInt(DurationUnit.MILLISECONDS)) return ClearTimeout(handle) } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val handle = w3cSetTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val handle = + w3cSetTimeout({ with(continuation) { resumeUndispatched(Unit) } }, time.toInt(DurationUnit.MILLISECONDS)) continuation.invokeOnCancellation(handler = ClearTimeout(handle)) } } @@ -55,13 +53,17 @@ internal class WindowDispatcher(private val window: W3CWindow) : CoroutineDispat override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block) - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val handle = w3cSetTimeout(window, { with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val handle = w3cSetTimeout( + window, + { with(continuation) { resumeUndispatched(Unit) } }, + time.toInt(DurationUnit.MILLISECONDS) + ) continuation.invokeOnCancellation(handler = WindowClearTimeout(handle)) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val handle = w3cSetTimeout(window, block::run, delayToInt(timeMillis)) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val handle = w3cSetTimeout(window, block::run, timeout.toInt(DurationUnit.MILLISECONDS)) return WindowClearTimeout(handle) } diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 3ce7e0d333..e8372089ca 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.Duration private val defaultMainDelayOptIn = systemProp("kotlinx.coroutines.main.delay", false) @@ -91,8 +92,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), * but it's not exposed as public API. */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduleInvokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeout, block) override fun run() { ThreadLocalEventLoop.setEventLoop(this) diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index 15d4ab5c85..75f38ccbad 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -1,7 +1,5 @@ package kotlinx.coroutines -import kotlinx.coroutines.Runnable -import kotlinx.coroutines.scheduling.* import kotlinx.coroutines.scheduling.CoroutineScheduler internal actual abstract class EventLoopImplPlatform: EventLoop() { @@ -122,4 +120,3 @@ internal fun Thread.isIoDispatcherThread(): Boolean { if (this !is CoroutineScheduler.Worker) return false return isIo() } - diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index bdfbe6dbbc..c7eef2f13c 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -5,6 +5,7 @@ import java.io.Closeable import java.util.concurrent.* import kotlin.coroutines.* import kotlin.AutoCloseable +import kotlin.time.Duration /** * [CoroutineDispatcher] that has underlying [Executor] for dispatching tasks. @@ -135,11 +136,11 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val future = (executor as? ScheduledExecutorService)?.scheduleBlock( ResumeUndispatchedRunnable(this, continuation), continuation.context, - timeMillis + time ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { @@ -147,20 +148,20 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) return } // Otherwise fallback to default executor - DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) + DefaultExecutor.scheduleResumeAfterDelay(time, continuation) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeout) return when { future != null -> DisposableFutureHandle(future) - else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + else -> DefaultExecutor.invokeOnTimeout(timeout, block, context) } } - private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, time: Duration): ScheduledFuture<*>? { return try { - schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null diff --git a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt index c07e68dc4e..f30e2b2f3d 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/TickerChannels.kt @@ -2,6 +2,9 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.nanoseconds /** * Mode for [ticker] function. @@ -61,24 +64,26 @@ public fun ticker( context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel { - require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } - require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } + val delay = delayMillis.milliseconds + val initialDelay = initialDelayMillis.milliseconds + require(delayMillis >= 0) { "Expected non-negative delay, but got $delay" } + require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but got $initialDelay" } return GlobalScope.produce(Dispatchers.Unconfined + context, capacity = 0) { when (mode) { - TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delayMillis, initialDelayMillis, channel) - TickerMode.FIXED_DELAY -> fixedDelayTicker(delayMillis, initialDelayMillis, channel) + TickerMode.FIXED_PERIOD -> fixedPeriodTicker(delay, initialDelay, channel) + TickerMode.FIXED_DELAY -> fixedDelayTicker(delay, initialDelay, channel) } } } private suspend fun fixedPeriodTicker( - delayMillis: Long, - initialDelayMillis: Long, + delay: Duration, + initialDelay: Duration, channel: SendChannel ) { - var deadline = nanoTime() + delayToNanos(initialDelayMillis) - delay(initialDelayMillis) - val delayNs = delayToNanos(delayMillis) + var deadline = nanoTime() + initialDelay.inWholeNanoseconds + delay(initialDelay) + val delayNs = delay.inWholeNanoseconds while (true) { deadline += delayNs channel.send(Unit) @@ -87,21 +92,21 @@ private suspend fun fixedPeriodTicker( if (nextDelay == 0L && delayNs != 0L) { val adjustedDelay = delayNs - (now - deadline) % delayNs deadline = now + adjustedDelay - delay(delayNanosToMillis(adjustedDelay)) + delay(adjustedDelay.nanoseconds) } else { - delay(delayNanosToMillis(nextDelay)) + delay(nextDelay.nanoseconds) } } } private suspend fun fixedDelayTicker( - delayMillis: Long, - initialDelayMillis: Long, + delay: Duration, + initialDelay: Duration, channel: SendChannel ) { - delay(initialDelayMillis) + delay(initialDelay) while (true) { channel.send(Unit) - delay(delayMillis) + delay(delay) } } diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 0cace58cf9..1cf273ed99 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -3,6 +3,7 @@ package kotlinx.coroutines.internal import kotlinx.coroutines.* import java.util.* import kotlin.coroutines.* +import kotlin.time.Duration /** * Name of the boolean property that enables using of [FastServiceLoader]. @@ -94,13 +95,13 @@ private class MissingMainCoroutineDispatcher( override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher = missing() - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = missing() override fun dispatch(context: CoroutineContext, block: Runnable) = missing() - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) = missing() private fun missing(): Nothing { diff --git a/kotlinx-coroutines-core/jvm/src/time/Time.kt b/kotlinx-coroutines-core/jvm/src/time/Time.kt index 3bb1d054e6..a604848a21 100644 --- a/kotlinx-coroutines-core/jvm/src/time/Time.kt +++ b/kotlinx-coroutines-core/jvm/src/time/Time.kt @@ -8,29 +8,30 @@ import kotlinx.coroutines.selects.* import java.time.* import java.time.temporal.* import kotlin.contracts.* +import kotlin.time.toKotlinDuration /** * "java.time" adapter method for [kotlinx.coroutines.delay]. */ -public suspend fun delay(duration: Duration): Unit = delay(duration.coerceToMillis()) +public suspend fun delay(duration: Duration): Unit = delay(duration.toKotlinDuration()) /** * "java.time" adapter method for [kotlinx.coroutines.flow.debounce]. */ @FlowPreview -public fun Flow.debounce(timeout: Duration): Flow = debounce(timeout.coerceToMillis()) +public fun Flow.debounce(timeout: Duration): Flow = debounce(timeout.toKotlinDuration()) /** * "java.time" adapter method for [kotlinx.coroutines.flow.sample]. */ @FlowPreview -public fun Flow.sample(period: Duration): Flow = sample(period.coerceToMillis()) +public fun Flow.sample(period: Duration): Flow = sample(period.toKotlinDuration()) /** * "java.time" adapter method for [SelectBuilder.onTimeout]. */ public fun SelectBuilder.onTimeout(duration: Duration, block: suspend () -> R): Unit = - onTimeout(duration.coerceToMillis(), block) + onTimeout(duration.toKotlinDuration(), block) /** * "java.time" adapter method for [kotlinx.coroutines.withTimeout]. @@ -39,35 +40,11 @@ public suspend fun withTimeout(duration: Duration, block: suspend CoroutineS contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } - return kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block) + return withTimeout(duration.toKotlinDuration(), block) } /** * "java.time" adapter method for [kotlinx.coroutines.withTimeoutOrNull]. */ public suspend fun withTimeoutOrNull(duration: Duration, block: suspend CoroutineScope.() -> T): T? = - kotlinx.coroutines.withTimeoutOrNull(duration.coerceToMillis(), block) - -/** - * Coerces the given [Duration] to a millisecond delay. - * Negative values are coerced to zero, values that cannot - * be represented in milliseconds as long ("infinite" duration) are coerced to [Long.MAX_VALUE] - * and durations lesser than a millisecond are coerced to 1 millisecond. - * - * The rationale of coercion: - * 1) Too large durations typically indicate infinity and Long.MAX_VALUE is the - * best approximation of infinity we can provide. - * 2) Coercing too small durations to 1 instead of 0 is crucial for two patterns: - * - Programming with deadlines and delays - * - Non-suspending fast-paths (e.g. `withTimeout(1 nanosecond) { 42 }` should not throw) - */ -private fun Duration.coerceToMillis(): Long { - if (this <= Duration.ZERO) return 0 - if (this <= ChronoUnit.MILLIS.duration) return 1 - - // Maximum scalar values of Duration.ofMillis(Long.MAX_VALUE) - val maxSeconds = 9223372036854775 - val maxNanos = 807000000 - return if (seconds < maxSeconds || seconds == maxSeconds && nano < maxNanos) toMillis() - else Long.MAX_VALUE -} + withTimeoutOrNull(duration.toKotlinDuration(), block) diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt index ac40dc152b..b00d67c793 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild.txt @@ -1,7 +1,6 @@ -kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms +kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.outerChildWithTimeout(StackTraceRecoveryWithTimeoutTest.kt:48) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$testStacktraceIsRecoveredFromLexicalBlockWhenTriggeredByChild$1.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:40) -Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms - at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:116) +Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:86) diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt index 9d5ddb6621..5e53e59f08 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPoint.txt @@ -1,10 +1,9 @@ -kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms +kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.suspendForever(StackTraceRecoveryWithTimeoutTest.kt:42) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$outerWithTimeout$2.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:32) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.outerWithTimeout(StackTraceRecoveryWithTimeoutTest.kt:31) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$testStacktraceIsRecoveredFromSuspensionPoint$1.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:19) -Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms - at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:116) +Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:86) at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:492) diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt index 6f21cc6b30..5155f4890a 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/timeout/testStacktraceIsRecoveredFromSuspensionPointWithChild.txt @@ -1,9 +1,8 @@ -kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms +kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.suspendForever(StackTraceRecoveryWithTimeoutTest.kt:92) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$outerChild$2.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:78) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest.outerChild(StackTraceRecoveryWithTimeoutTest.kt:74) at kotlinx.coroutines.exceptions.StackTraceRecoveryWithTimeoutTest$testStacktraceIsRecoveredFromSuspensionPointWithChild$1.invokeSuspend(StackTraceRecoveryWithTimeoutTest.kt:66) -Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200 ms - at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:116) +Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 200ms at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:86) diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt index 819b05e9ed..6fc6028c9a 100644 --- a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -5,6 +5,7 @@ import org.junit.Test import java.lang.Runnable import java.util.concurrent.* import kotlin.test.* +import kotlin.time.Duration.Companion.seconds class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { @@ -45,13 +46,13 @@ class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { launch(start = CoroutineStart.UNDISPATCHED) { suspendCancellableCoroutine { cont -> expect(1) - (executor.asCoroutineDispatcher() as Delay).scheduleResumeAfterDelay(1_000_000, cont) + (executor.asCoroutineDispatcher() as Delay).scheduleResumeAfterDelay(1_000.seconds, cont) cont.cancel() expect(2) } } expect(3) - assertTrue(executor.getQueue().isEmpty()) + assertTrue(executor.queue.isEmpty()) executor.shutdown() finish(4) } diff --git a/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt b/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt index b0bc05fc8b..31ae908cda 100644 --- a/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt +++ b/kotlinx-coroutines-core/jvm/test/examples/test/FlowDelayTest.kt @@ -6,43 +6,43 @@ import org.junit.Test class FlowDelayTest { @Test - fun testExampleDelay01() { - test("ExampleDelay01") { kotlinx.coroutines.examples.exampleDelay01.main() }.verifyLines( + fun testExampleDelayDuration01() { + test("ExampleDelayDuration01") { kotlinx.coroutines.examples.exampleDelayDuration01.main() }.verifyLines( "3, 4, 5" ) } @Test - fun testExampleDelay02() { - test("ExampleDelay02") { kotlinx.coroutines.examples.exampleDelay02.main() }.verifyLines( + fun testExampleDelayDuration02() { + test("ExampleDelayDuration02") { kotlinx.coroutines.examples.exampleDelayDuration02.main() }.verifyLines( "1, 3, 4, 5" ) } @Test - fun testExampleDelayDuration01() { - test("ExampleDelayDuration01") { kotlinx.coroutines.examples.exampleDelayDuration01.main() }.verifyLines( + fun testExampleDelay01() { + test("ExampleDelay01") { kotlinx.coroutines.examples.exampleDelay01.main() }.verifyLines( "3, 4, 5" ) } @Test - fun testExampleDelayDuration02() { - test("ExampleDelayDuration02") { kotlinx.coroutines.examples.exampleDelayDuration02.main() }.verifyLines( + fun testExampleDelay02() { + test("ExampleDelay02") { kotlinx.coroutines.examples.exampleDelay02.main() }.verifyLines( "1, 3, 4, 5" ) } @Test - fun testExampleDelay03() { - test("ExampleDelay03") { kotlinx.coroutines.examples.exampleDelay03.main() }.verifyLines( + fun testExampleDelayDuration03() { + test("ExampleDelayDuration03") { kotlinx.coroutines.examples.exampleDelayDuration03.main() }.verifyLines( "1, 3, 5, 7, 9" ) } @Test - fun testExampleDelayDuration03() { - test("ExampleDelayDuration03") { kotlinx.coroutines.examples.exampleDelayDuration03.main() }.verifyLines( + fun testExampleDelay03() { + test("ExampleDelay03") { kotlinx.coroutines.examples.exampleDelay03.main() }.verifyLines( "1, 3, 5, 7, 9" ) } diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt index 0d160e6a70..7ccbcdd494 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt @@ -185,7 +185,7 @@ class SharingStressTest : TestBase() { var count = 0L } - private fun log(msg: String) = println("${testStarted.elapsedNow().inWholeMilliseconds} ms: $msg") + private fun log(msg: String) = println("${testStarted.elapsedNow()}: $msg") private fun MutableStateFlow.increment(delta: Int) { update { it + delta } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt index f09b762288..796cbb9669 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/CancellationGuideTest.kt @@ -71,7 +71,7 @@ class CancellationGuideTest { "I'm sleeping 0 ...", "I'm sleeping 1 ...", "I'm sleeping 2 ...", - "Exception in thread \"main\" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms" + "Exception in thread \"main\" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1.3s" ) } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt index dbc6609101..b933f628d2 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerInternalApiStressTest.kt @@ -13,6 +13,7 @@ import kotlin.random.* import kotlin.random.Random import kotlin.test.* import kotlin.time.* +import kotlin.time.Duration.Companion.nanoseconds class CoroutineSchedulerInternalApiStressTest : TestBase() { @@ -71,7 +72,7 @@ class CoroutineSchedulerInternalApiStressTest : TestBase() { ++timesHelped continue } else if (result >= 0L) { - Thread.sleep(result.toDuration(DurationUnit.NANOSECONDS).toDelayMillis()) + Thread.sleep(result.nanoseconds.toDelayMillis()) } else { Thread.sleep(10) } @@ -83,4 +84,3 @@ class CoroutineSchedulerInternalApiStressTest : TestBase() { } } } - diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index 3f4c8d9a01..d84c45396f 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines import kotlinx.coroutines.internal.* import kotlin.coroutines.* +import kotlin.time.Duration internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { @@ -11,12 +12,12 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { delegate.dispatch(context, block) } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - delegate.scheduleResumeAfterDelay(timeMillis, continuation) + override fun scheduleResumeAfterDelay(timeout: Duration, continuation: CancellableContinuation) { + delegate.scheduleResumeAfterDelay(timeout, continuation) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - return delegate.invokeOnTimeout(timeMillis, block, context) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + return delegate.invokeOnTimeout(timeout, block, context) } actual fun enqueue(task: Runnable): Unit { diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index 58128d52fd..f7996d0c2b 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -4,7 +4,9 @@ package kotlinx.coroutines import kotlin.coroutines.* import kotlin.native.concurrent.* -import kotlin.time.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.TimeSource internal actual abstract class EventLoopImplPlatform : EventLoop() { @@ -15,14 +17,14 @@ internal actual abstract class EventLoopImplPlatform : EventLoop() { } protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now) - DefaultExecutor.invokeOnTimeout(delayTimeMillis, delayedTask, EmptyCoroutineContext) + // TODO: protect against overflow + DefaultExecutor.invokeOnTimeout((delayedTask.nanoTime - now).nanoseconds, delayedTask, EmptyCoroutineContext) } } internal class EventLoopImpl: EventLoopImplBase() { - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block, context) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + DefaultDelay.invokeOnTimeout(timeout, block, context) } internal actual fun createEventLoop(): EventLoop = EventLoopImpl() diff --git a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt index 8b217285a5..1b6da4b51d 100644 --- a/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt +++ b/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt @@ -24,17 +24,17 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), worker.executeAfter(0L) { block.run() } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val handle = schedule(timeMillis, Runnable { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val handle = schedule(time, Runnable { with(continuation) { resumeUndispatched(Unit) } }) continuation.disposeOnCancellation(handle) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - schedule(timeMillis, block) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + schedule(timeout, block) - private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle { + private fun schedule(time: Duration, block: Runnable): DisposableHandle { // Workers don't have an API to cancel sent "executeAfter" block, but we are trying // to control the damage and reduce reachable objects by nulling out `block` // that may retain a lot of references, and leaving only an empty shell after a timely disposal @@ -65,7 +65,7 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), } val disposableBlock = DisposableBlock(block) - val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds + val targetMoment = TimeSource.Monotonic.markNow() + time worker.runAfterDelay(disposableBlock, targetMoment) return disposableBlock } diff --git a/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt b/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt index 786f0f215d..9d0f5244ad 100644 --- a/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt @@ -8,6 +8,8 @@ import platform.darwin.* import kotlin.coroutines.* import kotlin.concurrent.* import kotlin.native.internal.NativePtr +import kotlin.time.Duration +import kotlin.time.DurationUnit internal fun isMainThread(): Boolean = CFRunLoopGetCurrent() == CFRunLoopGetMain() @@ -42,23 +44,23 @@ private class DarwinMainDispatcher( } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(timeout: Duration, continuation: CancellableContinuation) { val timer = Timer() val timerBlock: TimerBlock = { timer.dispose() continuation.resume(Unit) } - timer.start(timeMillis, timerBlock) + timer.start(timeout, timerBlock) continuation.disposeOnCancellation(timer) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { val timer = Timer() val timerBlock: TimerBlock = { timer.dispose() block.run() } - timer.start(timeMillis, timerBlock) + timer.start(timeout, timerBlock) return timer } @@ -74,8 +76,8 @@ private val TIMER_DISPOSED = NativePtr.NULL.plus(1) private class Timer : DisposableHandle { private val ref = AtomicNativePtr(TIMER_NEW) - fun start(timeMillis: Long, timerBlock: TimerBlock) { - val fireDate = CFAbsoluteTimeGetCurrent() + timeMillis / 1000.0 + fun start(timeout: Duration, timerBlock: TimerBlock) { + val fireDate = CFAbsoluteTimeGetCurrent() + timeout.toDouble(DurationUnit.SECONDS) val timer = CFRunLoopTimerCreateWithHandler(null, fireDate, 0.0, 0u, 0, timerBlock) CFRunLoopAddTimer(CFRunLoopGetMain(), timer, kCFRunLoopCommonModes) if (!ref.compareAndSet(TIMER_NEW, timer.rawValue)) { diff --git a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt index a0f392e5b0..a132f1863f 100644 --- a/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt +++ b/kotlinx-coroutines-core/wasmWasi/src/EventLoop.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines import kotlin.coroutines.CoroutineContext import kotlin.wasm.* import kotlin.wasm.unsafe.* +import kotlin.time.Duration @WasmImport("wasi_snapshot_preview1", "poll_oneoff") private external fun wasiPollOneOff(ptrToSubscription: Int, eventPtr: Int, nsubscriptions: Int, resultPtr: Int): Int @@ -50,8 +51,8 @@ internal actual object DefaultExecutor : EventLoopImplBase() { // don't do anything: on WASI, the event loop is the default executor, we can't shut it down } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduleInvokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduleInvokeOnTimeout(timeout, block) } internal actual abstract class EventLoopImplPlatform : EventLoop() { diff --git a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api index 77cc854cd6..8a127fbffb 100644 --- a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api +++ b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api @@ -70,11 +70,10 @@ public final class kotlinx/coroutines/test/TestCoroutineScopeKt { public static final fun runCurrent (Lkotlinx/coroutines/test/TestCoroutineScope;)V } -public abstract class kotlinx/coroutines/test/TestDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay, kotlinx/coroutines/DelayWithTimeoutDiagnostics { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; +public abstract class kotlinx/coroutines/test/TestDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public abstract fun getScheduler ()Lkotlinx/coroutines/test/TestCoroutineScheduler; - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V public synthetic fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } diff --git a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api index 38dfad99c6..ff63c2dd2b 100644 --- a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api +++ b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.klib.api @@ -14,24 +14,23 @@ sealed interface kotlinx.coroutines.test/TestScope : kotlinx.coroutines/Coroutin abstract fun (): kotlinx.coroutines.test/TestCoroutineScheduler // kotlinx.coroutines.test/TestScope.testScheduler.|(){}[0] } -abstract class kotlinx.coroutines.test/TestDispatcher : kotlinx.coroutines/CoroutineDispatcher, kotlinx.coroutines/Delay, kotlinx.coroutines/DelayWithTimeoutDiagnostics { // kotlinx.coroutines.test/TestDispatcher|null[0] +abstract class kotlinx.coroutines.test/TestDispatcher : kotlinx.coroutines/CoroutineDispatcher, kotlinx.coroutines/Delay { // kotlinx.coroutines.test/TestDispatcher|null[0] abstract val scheduler // kotlinx.coroutines.test/TestDispatcher.scheduler|{}scheduler[0] abstract fun (): kotlinx.coroutines.test/TestCoroutineScheduler // kotlinx.coroutines.test/TestDispatcher.scheduler.|(){}[0] - open fun invokeOnTimeout(kotlin/Long, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines.test/TestDispatcher.invokeOnTimeout|invokeOnTimeout(kotlin.Long;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] - open fun scheduleResumeAfterDelay(kotlin/Long, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines.test/TestDispatcher.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.Long;kotlinx.coroutines.CancellableContinuation){}[0] + open fun invokeOnTimeout(kotlin.time/Duration, kotlinx.coroutines/Runnable, kotlin.coroutines/CoroutineContext): kotlinx.coroutines/DisposableHandle // kotlinx.coroutines.test/TestDispatcher.invokeOnTimeout|invokeOnTimeout(kotlin.time.Duration;kotlinx.coroutines.Runnable;kotlin.coroutines.CoroutineContext){}[0] + open fun scheduleResumeAfterDelay(kotlin.time/Duration, kotlinx.coroutines/CancellableContinuation) // kotlinx.coroutines.test/TestDispatcher.scheduleResumeAfterDelay|scheduleResumeAfterDelay(kotlin.time.Duration;kotlinx.coroutines.CancellableContinuation){}[0] open fun timeoutMessage(kotlin.time/Duration): kotlin/String // kotlinx.coroutines.test/TestDispatcher.timeoutMessage|timeoutMessage(kotlin.time.Duration){}[0] } final class kotlinx.coroutines.test/TestCoroutineScheduler : kotlin.coroutines/AbstractCoroutineContextElement, kotlin.coroutines/CoroutineContext.Element { // kotlinx.coroutines.test/TestCoroutineScheduler|null[0] constructor () // kotlinx.coroutines.test/TestCoroutineScheduler.|(){}[0] + final val currentTime // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime|{}currentTime[0] + final fun (): kotlin/Long // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime.|(){}[0] final val timeSource // kotlinx.coroutines.test/TestCoroutineScheduler.timeSource|{}timeSource[0] final fun (): kotlin.time/TimeSource.WithComparableMarks // kotlinx.coroutines.test/TestCoroutineScheduler.timeSource.|(){}[0] - final var currentTime // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime|{}currentTime[0] - final fun (): kotlin/Long // kotlinx.coroutines.test/TestCoroutineScheduler.currentTime.|(){}[0] - final fun advanceTimeBy(kotlin.time/Duration) // kotlinx.coroutines.test/TestCoroutineScheduler.advanceTimeBy|advanceTimeBy(kotlin.time.Duration){}[0] final fun advanceTimeBy(kotlin/Long) // kotlinx.coroutines.test/TestCoroutineScheduler.advanceTimeBy|advanceTimeBy(kotlin.Long){}[0] final fun advanceUntilIdle() // kotlinx.coroutines.test/TestCoroutineScheduler.advanceUntilIdle|advanceUntilIdle(){}[0] diff --git a/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt b/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt index bf1b62a171..071c8a5d2f 100644 --- a/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt +++ b/kotlinx-coroutines-test/common/src/TestCoroutineDispatchers.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.internal.TestMainDispatcher import kotlin.coroutines.* +import kotlin.time.Duration /** * Creates an instance of an unconfined [TestDispatcher]. @@ -147,7 +148,7 @@ private class StandardTestDispatcherImpl( ) : TestDispatcher() { override fun dispatch(context: CoroutineContext, block: Runnable) { - scheduler.registerEvent(this, 0, block, context) { false } + scheduler.registerEvent(this, Duration.ZERO, block, context) { false } } override fun toString(): String = "${name ?: "StandardTestDispatcher"}[scheduler=$scheduler]" diff --git a/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt b/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt index 8c70fa8e05..fc216b677b 100644 --- a/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt +++ b/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt @@ -39,11 +39,29 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout /** This counter establishes some order on the events that happen at the same virtual time. */ private val count = atomic(0L) + private val _timeSource = TestTimeSource() + private val startMark = _timeSource.markNow() + + // may only be called under the lock + private fun setVirtualTime(time: ComparableTimeMark) { + val toWait = time - _timeSource.markNow() + when { + toWait < Duration.ZERO -> currentTimeAheadOfEvents() + toWait > Duration.ZERO -> try { + _timeSource += toWait + } catch (_: IllegalStateException) { + throw IllegalStateException( + "The test scheduler encountered too large a value: at ${timeSource.markNow()}, " + + "we tried to advance by $toWait, but the maximum value is ${Long.MAX_VALUE / 2} ms." + ) + } + } + } + /** The current virtual time in milliseconds. */ @ExperimentalCoroutinesApi - public var currentTime: Long = 0 - get() = synchronized(lock) { field } - private set + public val currentTime: Long + get() = synchronized(lock) { startMark.elapsedNow().inWholeMilliseconds } /** A channel for notifying about the fact that a foreground work dispatch recently happened. */ private val dispatchEventsForeground: Channel = Channel(CONFLATED) @@ -52,25 +70,28 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout private val dispatchEvents: Channel = Channel(CONFLATED) /** - * Registers a request for the scheduler to notify [dispatcher] at a virtual moment [timeDeltaMillis] milliseconds + * Registers a request for the scheduler to notify [dispatcher] at a virtual moment [timeDelta] milliseconds * later via [TestDispatcher.processEvent], which will be called with the provided [marker] object. * * Returns the handler which can be used to cancel the registration. */ internal fun registerEvent( dispatcher: TestDispatcher, - timeDeltaMillis: Long, + timeDelta: Duration, marker: T, context: CoroutineContext, isCancelled: (T) -> Boolean ): DisposableHandle { - require(timeDeltaMillis >= 0) { "Attempted scheduling an event earlier in time (with the time delta $timeDeltaMillis)" } + require(timeDelta >= Duration.ZERO) { + "Attempted scheduling an event earlier in time (with the time delta $timeDelta)" + } checkSchedulerInContext(this, context) val count = count.getAndIncrement() val isForeground = context[BackgroundWork] === null return synchronized(lock) { - val time = addClamping(currentTime, timeDeltaMillis) - val event = TestDispatchEvent(dispatcher, count, time, marker as Any, isForeground) { isCancelled(marker) } + val event = TestDispatchEvent( + dispatcher, count, _timeSource.markNow() + timeDelta, marker as Any, isForeground + ) { isCancelled(marker) } events.addLast(event) /** can't be moved above: otherwise, [onDispatchEventForeground] or [onDispatchEvent] could consume the * token sent here before there's actually anything in the event queue. */ @@ -91,9 +112,7 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout val event = synchronized(lock) { if (condition()) return false val event = events.removeFirstOrNull() ?: return false - if (currentTime > event.time) - currentTimeAheadOfEvents() - currentTime = event.time + setVirtualTime(event.time) event } event.dispatcher.processEvent(event.marker) @@ -124,7 +143,7 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout * Runs the tasks that are scheduled to execute at this moment of virtual time. */ public fun runCurrent() { - val timeMark = synchronized(lock) { currentTime } + val timeMark = synchronized(lock) { _timeSource.markNow() } while (true) { val event = synchronized(lock) { events.removeFirstIf { it.time <= timeMark } ?: return @@ -160,20 +179,19 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout */ public fun advanceTimeBy(delayTime: Duration) { require(!delayTime.isNegative()) { "Can not advance time by a negative delay: $delayTime" } - val startingTime = currentTime - val targetTime = addClamping(startingTime, delayTime.inWholeMilliseconds) + val targetTime = synchronized(lock) { _timeSource.markNow() } + delayTime while (true) { val event = synchronized(lock) { - val timeMark = currentTime + val timeMark = _timeSource.markNow() val event = events.removeFirstIf { targetTime > it.time } when { event == null -> { - currentTime = targetTime + setVirtualTime(targetTime) return } timeMark > event.time -> currentTimeAheadOfEvents() else -> { - currentTime = event.time + setVirtualTime(event.time) event } } @@ -219,6 +237,7 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout /** * Returns the [TimeSource] representation of the virtual time of this scheduler. */ + // this is a wrapper around `_timeSource` to prevent external modifications public val timeSource: TimeSource.WithComparableMarks = object : AbstractLongTimeSource(DurationUnit.MILLISECONDS) { override fun read(): Long = currentTime } @@ -234,7 +253,7 @@ private fun invalidSchedulerState(): Nothing = private class TestDispatchEvent( @JvmField val dispatcher: TestDispatcher, private val count: Long, - @JvmField val time: Long, + @JvmField val time: ComparableTimeMark, @JvmField val marker: T, @JvmField val isForeground: Boolean, // TODO: remove once the deprecated API is gone @@ -249,9 +268,6 @@ private class TestDispatchEvent( override fun toString() = "TestDispatchEvent(time=$time, dispatcher=$dispatcher${if (isForeground) "" else ", background"})" } -// works with positive `a`, `b` -private fun addClamping(a: Long, b: Long): Long = (a + b).let { if (it >= 0) it else Long.MAX_VALUE } - internal fun checkSchedulerInContext(scheduler: TestCoroutineScheduler, context: CoroutineContext) { context[TestCoroutineScheduler]?.let { check(it === scheduler) { diff --git a/kotlinx-coroutines-test/common/src/TestDispatcher.kt b/kotlinx-coroutines-test/common/src/TestDispatcher.kt index a4427a1a6f..a30176e101 100644 --- a/kotlinx-coroutines-test/common/src/TestDispatcher.kt +++ b/kotlinx-coroutines-test/common/src/TestDispatcher.kt @@ -14,7 +14,7 @@ import kotlin.time.* * the virtual time. */ @Suppress("INVISIBLE_REFERENCE") -public abstract class TestDispatcher internal constructor() : CoroutineDispatcher(), Delay, DelayWithTimeoutDiagnostics { +public abstract class TestDispatcher internal constructor() : CoroutineDispatcher(), Delay { /** The scheduler that this dispatcher is linked to. */ public abstract val scheduler: TestCoroutineScheduler @@ -25,11 +25,11 @@ public abstract class TestDispatcher internal constructor() : CoroutineDispatche } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val timedRunnable = CancellableContinuationRunnable(continuation, this) val handle = scheduler.registerEvent( this, - timeMillis, + time, timedRunnable, continuation.context, ::cancellableRunnableIsCancelled @@ -38,8 +38,8 @@ public abstract class TestDispatcher internal constructor() : CoroutineDispatche } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduler.registerEvent(this, timeMillis, block, context) { false } + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduler.registerEvent(this, timeout, block, context) { false } /** @suppress */ @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") diff --git a/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt b/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt index f9ab265c8d..0cda6a858a 100644 --- a/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt +++ b/kotlinx-coroutines-test/common/src/internal/TestMainDispatcher.kt @@ -4,6 +4,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.test.* import kotlin.coroutines.* +import kotlin.time.Duration /** * The testable main dispatcher used by kotlinx-coroutines-test. @@ -41,11 +42,11 @@ internal class TestMainDispatcher(createInnerMain: () -> CoroutineDispatcher): delegate.value = null } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = - delay.scheduleResumeAfterDelay(timeMillis, continuation) + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) = + delay.scheduleResumeAfterDelay(time, continuation) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - delay.invokeOnTimeout(timeMillis, block, context) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + delay.invokeOnTimeout(timeout, block, context) companion object { internal val currentTestDispatcher diff --git a/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt index 585f77b7b9..0de3040794 100644 --- a/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/jvm/src/migration/TestCoroutineDispatcher.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.test import kotlinx.coroutines.* import kotlin.coroutines.* +import kotlin.time.Duration /** * @suppress @@ -43,7 +44,7 @@ public class TestCoroutineDispatcher(public override val scheduler: TestCoroutin override fun toString(): String = "TestCoroutineDispatcher[scheduler=$scheduler]" private fun post(block: Runnable, context: CoroutineContext) = - scheduler.registerEvent(this, 0, block, context) { false } + scheduler.registerEvent(this, Duration.ZERO, block, context) { false } public val currentTime: Long get() = scheduler.currentTime diff --git a/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt b/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt index 5ba1216909..271d29af14 100644 --- a/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt +++ b/kotlinx-coroutines-test/jvm/test/migration/TestRunBlockingOrderTest.kt @@ -56,7 +56,7 @@ class TestRunBlockingOrderTest: OrderedExecutionTestBase() { expect(1) delay(100) // move time forward a bit some that naive time + delay gives an overflow launch { - delay(Long.MAX_VALUE / 2) // very long delay + delay(Long.MAX_VALUE / 3_000_000) // very long delay finish(4) } launch { diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 5a881a128e..3f67331e63 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -54,13 +54,13 @@ public final class kotlinx/coroutines/reactor/ReactorFlowKt { public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public fun (Lreactor/core/scheduler/Scheduler;)V - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lreactor/core/scheduler/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt index 5371ff39d7..89cf633e8a 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt @@ -5,6 +5,7 @@ import reactor.core.Disposable import reactor.core.scheduler.Scheduler import java.util.concurrent.TimeUnit import kotlin.coroutines.CoroutineContext +import kotlin.time.Duration /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]. @@ -27,16 +28,16 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val disposable = scheduler.schedule({ with(continuation) { resumeUndispatched(Unit) } - }, timeMillis, TimeUnit.MILLISECONDS) + }, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) continuation.disposeOnCancellation(disposable.asDisposableHandle()) } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = - scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle() + override fun invokeOnTimeout(time: Duration, block: Runnable, context: CoroutineContext): DisposableHandle = + scheduler.schedule(block, time.inWholeNanoseconds, TimeUnit.NANOSECONDS).asDisposableHandle() /** @suppress */ override fun toString(): String = scheduler.toString() diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 803ac90564..e93b40c07a 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -83,13 +83,13 @@ public final class kotlinx/coroutines/rx2/RxSingleKt { public final class kotlinx/coroutines/rx2/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public fun (Lio/reactivex/Scheduler;)V - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index bac20210f6..20b537779d 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -8,6 +8,8 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] @@ -52,7 +54,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) private val workerCounter = atomic(1L) override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - scope.scheduleTask(block, unit.toMillis(delay)) { task -> + scope.scheduleTask(block, delay, unit) { task -> Runnable { scope.launch { task() } } } @@ -81,7 +83,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) } override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + workerScope.scheduleTask(block, delay, unit) { task -> Runnable { blockChannel.trySend(task) } } @@ -106,7 +108,8 @@ private typealias Task = suspend () -> Unit */ private fun CoroutineScope.scheduleTask( block: Runnable, - delayMillis: Long, + delay: Long, + unit: TimeUnit, adaptForScheduling: (Task) -> Runnable ): Disposable { val ctx = coroutineContext @@ -129,11 +132,11 @@ private fun CoroutineScope.scheduleTask( val toSchedule = adaptForScheduling(::task) if (!isActive) return Disposables.disposed() - if (delayMillis <= 0) { + if (delay <= 0) { toSchedule.run() } else { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2 - ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + ctx.delay.invokeOnTimeout(unit.toNanos(delay).nanoseconds, toSchedule, ctx).let { handle = it } } return disposable } @@ -153,16 +156,16 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val disposable = scheduler.scheduleDirect({ with(continuation) { resumeUndispatched(Unit) } - }, timeMillis, TimeUnit.MILLISECONDS) + }, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) continuation.disposeOnCancellation(disposable) } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val disposable = scheduler.scheduleDirect(block, timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index f86276e195..61694f4f5b 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -70,13 +70,13 @@ public final class kotlinx/coroutines/rx3/RxSingleKt { public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay { public fun (Lio/reactivex/rxjava3/core/Scheduler;)V - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index 8f77d4c867..8916f435e4 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -8,6 +8,8 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] @@ -52,7 +54,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) private val workerCounter = atomic(1L) override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - scope.scheduleTask(block, unit.toMillis(delay)) { task -> + scope.scheduleTask(block, delay, unit) { task -> Runnable { scope.launch { task() } } } @@ -81,7 +83,7 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) } override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = - workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + workerScope.scheduleTask(block, delay, unit) { task -> Runnable { blockChannel.trySend(task) } } @@ -106,7 +108,8 @@ private typealias Task = suspend () -> Unit */ private fun CoroutineScope.scheduleTask( block: Runnable, - delayMillis: Long, + delay: Long, + unit: TimeUnit, adaptForScheduling: (Task) -> Runnable ): Disposable { val ctx = coroutineContext @@ -129,11 +132,11 @@ private fun CoroutineScope.scheduleTask( val toSchedule = adaptForScheduling(::task) if (!isActive) return Disposable.disposed() - if (delayMillis <= 0) { + if (delay <= 0) { toSchedule.run() } else { @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2 - ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + ctx.delay.invokeOnTimeout(unit.toNanos(delay).nanoseconds, toSchedule, ctx).let { handle = it } } return disposable } @@ -153,16 +156,16 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val disposable = scheduler.scheduleDirect({ with(continuation) { resumeUndispatched(Unit) } - }, timeMillis, TimeUnit.MILLISECONDS) + }, time.inWholeNanoseconds, TimeUnit.NANOSECONDS) continuation.disposeOnCancellation(disposable) } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val disposable = scheduler.scheduleDirect(block, timeout.inWholeNanoseconds, TimeUnit.NANOSECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt b/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt index fbd6d8135a..8ca9bdfe44 100644 --- a/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt +++ b/ui/kotlinx-coroutines-android/android-unit-tests/test/ordered/tests/TestComponent.kt @@ -17,7 +17,7 @@ public class TestComponent { fun launchDelayed() { scope.launch { - delay(Long.MAX_VALUE / 2) + delay(Long.MAX_VALUE / 2 - 1) delayedLaunchCompleted = true } } diff --git a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api index 090c14e09c..d07d985bd0 100644 --- a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api +++ b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/android/HandlerDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getImmediate ()Lkotlinx/coroutines/android/HandlerDispatcher; - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/android/HandlerDispatcherKt { diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt index 9261b2e1c7..069f422c9c 100644 --- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt +++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.lang.reflect.* import kotlin.coroutines.* +import kotlin.time.Duration /** * Dispatches execution onto Android [Handler]. @@ -136,25 +137,29 @@ internal class HandlerContext private constructor( } } - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } - if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { + if (schedule(time, block)) { continuation.invokeOnCancellation { handler.removeCallbacks(block) } } else { cancelOnRejection(continuation.context, block) } } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + if (schedule(timeout, block)) { return DisposableHandle { handler.removeCallbacks(block) } } cancelOnRejection(context, block) return NonDisposableHandle } + @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") // do not remove the INVISIBLE_REFERENCE suppression: required in K2 + private fun schedule(time: Duration, block: Runnable): Boolean = + handler.postDelayed(block, time.toDelayMillis().coerceAtMost(MAX_DELAY)) + private fun cancelOnRejection(context: CoroutineContext, block: Runnable) { context.cancel(CancellationException("The task was rejected, the handler underlying the dispatcher '${toString()}' was closed")) Dispatchers.IO.dispatch(context, block) diff --git a/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt b/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt index e5a877e305..28a52304ed 100644 --- a/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt +++ b/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt @@ -51,7 +51,7 @@ class DisabledHandlerTest : TestBase() { withContext(disabledDispatcher) { expect(1) delegateToSuper = false - delay(Long.MAX_VALUE - 1) + delay(Long.MAX_VALUE / 2 - 1) expectUnreached() } expectUnreached() diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api index e2c3b8f326..6f6d2c2ec4 100644 --- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api +++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api @@ -3,10 +3,10 @@ public final class kotlinx/coroutines/javafx/JavaFxConvertKt { } public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/javafx/JavaFxDispatcherKt { diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt index 941f458282..54e80aa2c1 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt @@ -3,13 +3,15 @@ package kotlinx.coroutines.javafx import javafx.animation.* import javafx.application.* import javafx.event.* -import javafx.util.* +import javafx.util.Duration as jfxDuration +import kotlin.time.Duration import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.lang.UnsupportedOperationException import java.lang.reflect.* import java.util.concurrent.* import kotlin.coroutines.* +import kotlin.time.DurationUnit /** * Dispatches execution onto JavaFx application thread and provides native [delay] support. @@ -29,23 +31,23 @@ public sealed class JavaFxDispatcher : MainCoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable): Unit = Platform.runLater(block) /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val timeline = schedule(timeMillis) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val timeline = schedule(time) { with(continuation) { resumeUndispatched(Unit) } } continuation.invokeOnCancellation { timeline.stop() } } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val timeline = schedule(timeMillis) { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val timeline = schedule(timeout) { block.run() } return DisposableHandle { timeline.stop() } } - private fun schedule(timeMillis: Long, handler: EventHandler): Timeline = - Timeline(KeyFrame(Duration.millis(timeMillis.toDouble()), handler)).apply { play() } + private fun schedule(time: Duration, handler: EventHandler): Timeline = + Timeline(KeyFrame(jfxDuration.millis(time.toDouble(DurationUnit.MILLISECONDS)), handler)).apply { play() } } internal class JavaFxDispatcherFactory : MainDispatcherFactory { diff --git a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api index d33191fd96..873e515ffb 100644 --- a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api +++ b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api @@ -1,8 +1,8 @@ public abstract class kotlinx/coroutines/swing/SwingDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { - public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; - public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V + public fun invokeOnTimeout-KLykuaI (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; + public fun scheduleResumeAfterDelay-VtjQ1oo (JLkotlinx/coroutines/CancellableContinuation;)V + public fun timeoutMessage-LRDsOJo (J)Ljava/lang/String; } public final class kotlinx/coroutines/swing/SwingDispatcherKt { diff --git a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt index aa378be12d..de5328530a 100644 --- a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt +++ b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt @@ -5,6 +5,8 @@ import kotlinx.coroutines.internal.* import java.awt.event.* import javax.swing.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.DurationUnit /** * Dispatches execution onto Swing event dispatching thread and provides native [delay] support. @@ -23,23 +25,23 @@ public sealed class SwingDispatcher : MainCoroutineDispatcher(), Delay { override fun dispatch(context: CoroutineContext, block: Runnable): Unit = SwingUtilities.invokeLater(block) /** @suppress */ - override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val timer = schedule(timeMillis) { + override fun scheduleResumeAfterDelay(time: Duration, continuation: CancellableContinuation) { + val timer = schedule(time) { with(continuation) { resumeUndispatched(Unit) } } continuation.invokeOnCancellation { timer.stop() } } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val timer = schedule(timeMillis) { + override fun invokeOnTimeout(timeout: Duration, block: Runnable, context: CoroutineContext): DisposableHandle { + val timer = schedule(timeout) { block.run() } return DisposableHandle { timer.stop() } } - private fun schedule(timeMillis: Long, action: ActionListener): Timer = - Timer(timeMillis.coerceAtMost(Int.MAX_VALUE.toLong()).toInt(), action).apply { + private fun schedule(time: Duration, action: ActionListener): Timer = + Timer(time.toInt(DurationUnit.MILLISECONDS), action).apply { isRepeats = false start() }