-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathTestCoroutineScheduler.kt
262 lines (233 loc) · 11 KB
/
TestCoroutineScheduler.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.test
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlin.time.*
/**
* This is a scheduler for coroutines used in tests, providing the delay-skipping behavior.
*
* [Test dispatchers][TestDispatcher] are parameterized with a scheduler. Several dispatchers can share the
* same scheduler, in which case their knowledge about the virtual time will be synchronized. When the dispatchers
* require scheduling an event at a later point in time, they notify the scheduler, which will establish the order of
* the tasks.
*
* The scheduler can be queried to advance the time (via [advanceTimeBy]), run all the scheduled tasks advancing the
* virtual time as needed (via [advanceUntilIdle]), or run the tasks that are scheduled to run as soon as possible but
* haven't yet been dispatched (via [runCurrent]).
*/
@ExperimentalCoroutinesApi
public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCoroutineScheduler),
CoroutineContext.Element {
/** @suppress */
public companion object Key : CoroutineContext.Key<TestCoroutineScheduler>
/** This heap stores the knowledge about which dispatchers are interested in which moments of virtual time. */
// TODO: all the synchronization is done via a separate lock, so a non-thread-safe priority queue can be used.
private val events = ThreadSafeHeap<TestDispatchEvent<Any>>()
/** Establishes that [currentTime] can't exceed the time of the earliest event in [events]. */
private val lock = SynchronizedObject()
/** This counter establishes some order on the events that happen at the same virtual time. */
private val count = atomic(0L)
/** The current virtual time in milliseconds. */
@ExperimentalCoroutinesApi
public var currentTime: Long = 0
get() = synchronized(lock) { field }
private set
/** A channel for notifying about the fact that a dispatch recently happened. */
private val dispatchEvents: Channel<Unit> = Channel(CONFLATED)
/**
* Registers a request for the scheduler to notify [dispatcher] at a virtual moment [timeDeltaMillis] milliseconds
* later via [TestDispatcher.processEvent], which will be called with the provided [marker] object.
*
* Returns the handler which can be used to cancel the registration.
*/
internal fun <T : Any> registerEvent(
dispatcher: TestDispatcher,
timeDeltaMillis: Long,
marker: T,
context: CoroutineContext,
isCancelled: (T) -> Boolean
): DisposableHandle {
require(timeDeltaMillis >= 0) { "Attempted scheduling an event earlier in time (with the time delta $timeDeltaMillis)" }
checkSchedulerInContext(this, context)
val count = count.getAndIncrement()
val isForeground = context[BackgroundWork] === null
return synchronized(lock) {
val time = addClamping(currentTime, timeDeltaMillis)
val event = TestDispatchEvent(dispatcher, count, time, marker as Any, isForeground) { isCancelled(marker) }
events.addLast(event)
/** can't be moved above: otherwise, [onDispatchEvent] could consume the token sent here before there's
* actually anything in the event queue. */
sendDispatchEvent(context)
DisposableHandle {
synchronized(lock) {
events.remove(event)
}
}
}
}
/**
* Runs the next enqueued task, advancing the virtual time to the time of its scheduled awakening,
* unless [condition] holds.
*/
internal fun tryRunNextTaskUnless(condition: () -> Boolean): Boolean {
val event = synchronized(lock) {
if (condition()) return false
val event = events.removeFirstOrNull() ?: return false
if (currentTime > event.time)
currentTimeAheadOfEvents()
currentTime = event.time
event
}
event.dispatcher.processEvent(event.marker)
return true
}
/**
* Runs the enqueued tasks in the specified order, advancing the virtual time as needed until there are no more
* tasks associated with the dispatchers linked to this scheduler.
*
* A breaking change from [TestCoroutineDispatcher.advanceTimeBy] is that it no longer returns the total number of
* milliseconds by which the execution of this method has advanced the virtual time. If you want to recreate that
* functionality, query [currentTime] before and after the execution to achieve the same result.
*/
@ExperimentalCoroutinesApi
public fun advanceUntilIdle(): Unit = advanceUntilIdleOr { events.none(TestDispatchEvent<*>::isForeground) }
/**
* [condition]: guaranteed to be invoked under the lock.
*/
internal fun advanceUntilIdleOr(condition: () -> Boolean) {
while (true) {
if (!tryRunNextTaskUnless(condition))
return
}
}
/**
* Runs the tasks that are scheduled to execute at this moment of virtual time.
*/
@ExperimentalCoroutinesApi
public fun runCurrent() {
val timeMark = synchronized(lock) { currentTime }
while (true) {
val event = synchronized(lock) {
events.removeFirstIf { it.time <= timeMark } ?: return
}
event.dispatcher.processEvent(event.marker)
}
}
/**
* Moves the virtual clock of this dispatcher forward by [the specified amount][delayTimeMillis], running the
* scheduled tasks in the meantime.
*
* Breaking changes from [TestCoroutineDispatcher.advanceTimeBy]:
* * Intentionally doesn't return a `Long` value, as its use cases are unclear. We may restore it in the future;
* please describe your use cases at [the issue tracker](https://github.com/Kotlin/kotlinx.coroutines/issues/).
* For now, it's possible to query [currentTime] before and after execution of this method, to the same effect.
* * It doesn't run the tasks that are scheduled at exactly [currentTime] + [delayTimeMillis]. For example,
* advancing the time by one millisecond used to run the tasks at the current millisecond *and* the next
* millisecond, but now will stop just before executing any task starting at the next millisecond.
* * Overflowing the target time used to lead to nothing being done, but will now run the tasks scheduled at up to
* (but not including) [Long.MAX_VALUE].
*
* @throws IllegalStateException if passed a negative [delay][delayTimeMillis].
*/
@ExperimentalCoroutinesApi
public fun advanceTimeBy(delayTimeMillis: Long) {
require(delayTimeMillis >= 0) { "Can not advance time by a negative delay: $delayTimeMillis" }
val startingTime = currentTime
val targetTime = addClamping(startingTime, delayTimeMillis)
while (true) {
val event = synchronized(lock) {
val timeMark = currentTime
val event = events.removeFirstIf { targetTime > it.time }
when {
event == null -> {
currentTime = targetTime
return
}
timeMark > event.time -> currentTimeAheadOfEvents()
else -> {
currentTime = event.time
event
}
}
}
event.dispatcher.processEvent(event.marker)
}
}
/**
* Checks that the only tasks remaining in the scheduler are cancelled.
*/
internal fun isIdle(strict: Boolean = true): Boolean =
synchronized(lock) {
if (strict) events.isEmpty else events.none { !it.isCancelled() }
}
/**
* Notifies this scheduler about a dispatch event.
*
* [context] is the context in which the task will be dispatched.
*/
internal fun sendDispatchEvent(context: CoroutineContext) {
if (context[BackgroundWork] !== BackgroundWork)
dispatchEvents.trySend(Unit)
}
/**
* Consumes the knowledge that a dispatch event happened recently.
*/
internal val onDispatchEvent: SelectClause1<Unit> get() = dispatchEvents.onReceive
/**
* Returns the [TimeSource] representation of the virtual time of this scheduler.
*/
@ExperimentalCoroutinesApi
@ExperimentalTime
public val timeSource: TimeSource.WithComparableMarks = object : AbstractLongTimeSource(DurationUnit.MILLISECONDS) {
override fun read(): Long = currentTime
}
}
// Some error-throwing functions for pretty stack traces
private fun currentTimeAheadOfEvents(): Nothing = invalidSchedulerState()
private fun invalidSchedulerState(): Nothing =
throw IllegalStateException("The test scheduler entered an invalid state. Please report this at https://github.com/Kotlin/kotlinx.coroutines/issues.")
/** [ThreadSafeHeap] node representing a scheduled task, ordered by the planned execution time. */
private class TestDispatchEvent<T>(
@JvmField val dispatcher: TestDispatcher,
private val count: Long,
@JvmField val time: Long,
@JvmField val marker: T,
@JvmField val isForeground: Boolean,
// TODO: remove once the deprecated API is gone
@JvmField val isCancelled: () -> Boolean
) : Comparable<TestDispatchEvent<*>>, ThreadSafeHeapNode {
override var heap: ThreadSafeHeap<*>? = null
override var index: Int = 0
override fun compareTo(other: TestDispatchEvent<*>) =
compareValuesBy(this, other, TestDispatchEvent<*>::time, TestDispatchEvent<*>::count)
override fun toString() = "TestDispatchEvent(time=$time, dispatcher=$dispatcher${if (isForeground) "" else ", background"})"
}
// works with positive `a`, `b`
private fun addClamping(a: Long, b: Long): Long = (a + b).let { if (it >= 0) it else Long.MAX_VALUE }
internal fun checkSchedulerInContext(scheduler: TestCoroutineScheduler, context: CoroutineContext) {
context[TestCoroutineScheduler]?.let {
check(it === scheduler) {
"Detected use of different schedulers. If you need to use several test coroutine dispatchers, " +
"create one `TestCoroutineScheduler` and pass it to each of them."
}
}
}
/**
* A coroutine context key denoting that the work is to be executed in the background.
* @see [TestScope.backgroundScope]
*/
internal object BackgroundWork : CoroutineContext.Key<BackgroundWork>, CoroutineContext.Element {
override val key: CoroutineContext.Key<*>
get() = this
override fun toString(): String = "BackgroundWork"
}
private fun<T> ThreadSafeHeap<T>.none(predicate: (T) -> Boolean) where T: ThreadSafeHeapNode, T: Comparable<T> =
find(predicate) == null