-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathTimeout.kt
158 lines (145 loc) · 6.65 KB
/
Timeout.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
import kotlin.time.*
/**
* Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws
* a [TimeoutCancellationException] if the timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* the cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*
* @param timeMillis timeout time in milliseconds.
*/
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately")
return suspendCoroutineUninterceptedOrReturn { uCont ->
setupTimeout(TimeoutCoroutine(timeMillis, uCont), block)
}
}
/**
* Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws
* a [TimeoutCancellationException] if the timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* the cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return withTimeout(timeout.toDelayMillis(), block)
}
/**
* Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that throws an exception on timeout is [withTimeout].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*
* @param timeMillis timeout time in milliseconds.
*/
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T? {
if (timeMillis <= 0L) return null
var coroutine: TimeoutCoroutine<T?, T?>? = null
try {
return suspendCoroutineUninterceptedOrReturn { uCont ->
val timeoutCoroutine = TimeoutCoroutine(timeMillis, uCont)
coroutine = timeoutCoroutine
setupTimeout<T?, T?>(timeoutCoroutine, block)
}
} catch (e: TimeoutCancellationException) {
// Return null if it's our exception, otherwise propagate it upstream (e.g. in case of nested withTimeouts)
if (e.coroutine === coroutine) {
return null
}
throw e
}
}
/**
* Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that throws an exception on timeout is [withTimeout].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? =
withTimeoutOrNull(timeout.toDelayMillis(), block)
private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
): Any? {
// schedule cancellation of this coroutine on time
val cont = coroutine.uCont
val context = cont.context
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
// restart the block using a new coroutine with a new job,
// however, start it undispatched, because we already are in the proper context
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)
}
private class TimeoutCoroutine<U, in T: U>(
@JvmField val time: Long,
uCont: Continuation<U> // unintercepted continuation
) : ScopeCoroutine<T>(uCont.context, uCont), Runnable {
override fun run() {
cancelCoroutine(TimeoutCancellationException(time, this))
}
override fun nameString(): String =
"${super.nameString()}(timeMillis=$time)"
}
/**
* This exception is thrown by [withTimeout] to indicate timeout.
*/
public class TimeoutCancellationException internal constructor(
message: String,
@JvmField internal val coroutine: Job?
) : CancellationException(message), CopyableThrowable<TimeoutCancellationException> {
/**
* Creates a timeout exception with the given message.
* This constructor is needed for exception stack-traces recovery.
*/
@Suppress("UNUSED")
internal constructor(message: String) : this(message, null)
// message is never null in fact
override fun createCopy(): TimeoutCancellationException? =
TimeoutCancellationException(message ?: "", coroutine).also { it.initCause(this) }
}
@Suppress("FunctionName")
internal fun TimeoutCancellationException(
time: Long,
coroutine: Job
) : TimeoutCancellationException = TimeoutCancellationException("Timed out waiting for $time ms", coroutine)