Skip to content

Experimenta kotlin.time.Duration support #1811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions kotlinx-coroutines-core/common/src/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package kotlinx.coroutines

import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.time.Duration
import kotlin.time.ExperimentalTime

/**
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
Expand Down Expand Up @@ -75,5 +77,31 @@ public suspend fun delay(timeMillis: Long) {
}
}

/**
* Delays coroutine for a given [duration] without blocking a thread and resumes it after the specified time.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context.
*/
@ExperimentalTime
public suspend fun delay(duration: Duration) = delay(duration.toDelayMillis())

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

/**
* Convert this duration to its millisecond value.
* Positive durations are coerced at least `1`.
*/
@ExperimentalTime
internal fun Duration.toDelayMillis(): Long =
when {
this > Duration.ZERO -> toLongMilliseconds().coerceAtLeast(1)
this < Duration.ZERO -> -1
// 0, -0, NaN
else -> 0
}
34 changes: 34 additions & 0 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
import kotlin.time.Duration
import kotlin.time.ExperimentalTime

/**
* Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws
Expand All @@ -32,6 +34,22 @@ public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineSco
}
}

/**
* Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws
* a [TimeoutCancellationException] if the timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* the cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T =
withTimeout(timeout.toDelayMillis(), block)

/**
* Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns
* `null` if this timeout was exceeded.
Expand Down Expand Up @@ -65,6 +83,22 @@ public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend Corout
}
}

/**
* Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that throws an exception on timeout is [withTimeout].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? =
withTimeoutOrNull(timeout.toDelayMillis(), block)

private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
Expand Down
50 changes: 50 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlin.time.Duration
import kotlin.time.ExperimentalTime
import kotlinx.coroutines.flow.internal.unsafeFlow as flow

/**
Expand Down Expand Up @@ -71,6 +73,34 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* Example:
* ```
* flow {
* emit(1)
* delay(90.0.milliseconds)
* emit(2)
* delay(90.0.milliseconds)
* emit(3)
* delay(1010.0.milliseconds)
* emit(4)
* delay(1010.0.milliseconds)
* emit(5)
* }.debounce(1000.0.milliseconds)
* ```
* produces `3, 4, 5`.
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] milliseconds.
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
*
Expand Down Expand Up @@ -133,3 +163,23 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period].
*
* Example:
* ```
* flow {
* repeat(10) {
* emit(it)
* delay(50.0.milliseconds)
* }
* }.sample(100.0.milliseconds)
* ```
* produces `1, 3, 5, 7, 9`.
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelayMillis())
14 changes: 14 additions & 0 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
import kotlin.native.concurrent.*
import kotlin.time.Duration
import kotlin.time.ExperimentalTime

/**
* Scope for [select] invocation.
Expand Down Expand Up @@ -52,6 +54,18 @@ public interface SelectBuilder<in R> {
public fun onTimeout(timeMillis: Long, block: suspend () -> R)
}


/**
* Clause that selects the given [block] after the specified [timeout] passes.
* If timeout is negative or zero, [block] is selected immediately.
*
* **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
*/
@ExperimentalCoroutinesApi
@ExperimentalTime
public fun <R> SelectBuilder<R>.onTimeout(timeout: Duration, block: suspend () -> R) =
onTimeout(timeout.toDelayMillis(), block)

/**
* Clause for [select] expression without additional parameters that does not select any value.
*/
Expand Down
72 changes: 72 additions & 0 deletions kotlinx-coroutines-core/common/test/DelayDurationTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "DEPRECATION")

// KT-21913

package kotlinx.coroutines

import kotlin.test.Test
import kotlin.time.*

@ExperimentalTime
class DelayDurationTest : TestBase() {

@Test
fun testCancellation() = runTest(expected = { it is CancellationException }) {
runAndCancel(1.seconds)
}

@Test
fun testInfinite() = runTest(expected = { it is CancellationException }) {
runAndCancel(Duration.INFINITE)
}

@Test
fun testRegularDelay() = runTest {
val deferred = async {
expect(2)
delay(1.seconds)
expect(4)
}

expect(1)
yield()
expect(3)
deferred.await()
finish(5)
}

@Test
fun testNanoDelay() = runTest {
val deferred = async {
expect(2)
delay(1.nanoseconds)
expect(4)
}

expect(1)
yield()
expect(3)
deferred.await()
finish(5)
}

private suspend fun runAndCancel(time: Duration) = coroutineScope {
expect(1)
val deferred = async {
expect(2)
delay(time)
expectUnreached()
}

yield()
expect(3)
require(deferred.isActive)
deferred.cancel()
finish(4)
deferred.await()
}
}
Loading