-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathTestCoroutineScope.kt
215 lines (194 loc) · 8.87 KB
/
TestCoroutineScope.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.test
import kotlinx.coroutines.*
import kotlin.coroutines.*
/**
* A scope which provides detailed control over the execution of coroutines for tests.
*/
@ExperimentalCoroutinesApi
public sealed interface TestCoroutineScope: CoroutineScope, UncaughtExceptionCaptor {
/**
* Called after the test completes.
*
* Calls [UncaughtExceptionCaptor.cleanupTestCoroutinesCaptor] and [DelayController.cleanupTestCoroutines].
* If a new job was created for this scope, the job is completed.
*
* @throws Throwable the first uncaught exception, if there are any uncaught exceptions.
* @throws AssertionError if any pending tasks are active.
*/
@ExperimentalCoroutinesApi
public fun cleanupTestCoroutines()
/**
* The delay-skipping scheduler used by the test dispatchers running the code in this scope.
*/
@ExperimentalCoroutinesApi
public val testScheduler: TestCoroutineScheduler
}
private class TestCoroutineScopeImpl(
override val coroutineContext: CoroutineContext
):
TestCoroutineScope,
UncaughtExceptionCaptor by coroutineContext.uncaughtExceptionCaptor
{
override val testScheduler: TestCoroutineScheduler
get() = coroutineContext[TestCoroutineScheduler]!!
/** These jobs existed before the coroutine scope was used, so it's alright if they don't get cancelled. */
private val initialJobs = coroutineContext.activeJobs()
override fun cleanupTestCoroutines() {
coroutineContext.uncaughtExceptionCaptor.cleanupTestCoroutinesCaptor()
coroutineContext.delayController?.cleanupTestCoroutines()
val jobs = coroutineContext.activeJobs()
if ((jobs - initialJobs).isNotEmpty())
throw UncompletedCoroutinesError("Test finished with active jobs: $jobs")
}
}
private fun CoroutineContext.activeJobs(): Set<Job> {
return checkNotNull(this[Job]).children.filter { it.isActive }.toSet()
}
/**
* A coroutine scope for launching test coroutines.
*
* It ensures that all the test module machinery is properly initialized.
* * If [context] doesn't define a [TestCoroutineScheduler] for orchestrating the virtual time used for delay-skipping,
* a new one is created, unless a [TestDispatcher] is provided, in which case [TestDispatcher.scheduler] is used.
* * If [context] doesn't have a [ContinuationInterceptor], a [TestCoroutineDispatcher] is created.
* * If [context] does not provide a [CoroutineExceptionHandler], [TestCoroutineExceptionHandler] is created
* automatically.
* * If [context] provides a [Job], that job is used for the new scope; otherwise, a [CompletableJob] is created.
*
* @throws IllegalArgumentException if [context] has both [TestCoroutineScheduler] and a [TestDispatcher] linked to a
* different scheduler.
* @throws IllegalArgumentException if [context] has a [ContinuationInterceptor] that is not a [TestDispatcher].
* @throws IllegalArgumentException if [context] has an [CoroutineExceptionHandler] that is not an
* [UncaughtExceptionCaptor].
*/
@Suppress("FunctionName")
@ExperimentalCoroutinesApi
public fun TestCoroutineScope(context: CoroutineContext = EmptyCoroutineContext): TestCoroutineScope {
val scheduler: TestCoroutineScheduler
val dispatcher = when (val dispatcher = context[ContinuationInterceptor]) {
is TestDispatcher -> {
scheduler = dispatcher.scheduler
val ctxScheduler = context[TestCoroutineScheduler]
if (ctxScheduler != null) {
require(dispatcher.scheduler === ctxScheduler) {
"Both a TestCoroutineScheduler $ctxScheduler and TestDispatcher $dispatcher linked to " +
"another scheduler were passed."
}
}
dispatcher
}
null -> {
scheduler = context[TestCoroutineScheduler] ?: TestCoroutineScheduler()
TestCoroutineDispatcher(scheduler)
}
else -> throw IllegalArgumentException("Dispatcher must implement TestDispatcher: $dispatcher")
}
val exceptionHandler = context[CoroutineExceptionHandler].run {
this?.let {
require(this is UncaughtExceptionCaptor) {
"coroutineExceptionHandler must implement UncaughtExceptionCaptor: $context"
}
}
this ?: TestCoroutineExceptionHandler()
}
val job: Job = context[Job] ?: SupervisorJob()
return TestCoroutineScopeImpl(context + scheduler + dispatcher + exceptionHandler + job)
}
private inline val CoroutineContext.uncaughtExceptionCaptor: UncaughtExceptionCaptor
get() {
val handler = this[CoroutineExceptionHandler]
return handler as? UncaughtExceptionCaptor ?: throw IllegalArgumentException(
"TestCoroutineScope requires a UncaughtExceptionCaptor such as " +
"TestCoroutineExceptionHandler as the CoroutineExceptionHandler"
)
}
private inline val CoroutineContext.delayController: DelayController?
get() {
val handler = this[ContinuationInterceptor]
return handler as? DelayController
}
/**
* The current virtual time on [testScheduler][TestCoroutineScope.testScheduler].
* @see TestCoroutineScheduler.currentTime
*/
@ExperimentalCoroutinesApi
public val TestCoroutineScope.currentTime: Long
get() = coroutineContext.delayController?.currentTime ?: testScheduler.currentTime
/**
* Advances the [testScheduler][TestCoroutineScope.testScheduler] by [delayTimeMillis] and runs the tasks up to that
* moment (inclusive).
*
* @see TestCoroutineScheduler.advanceTimeBy
*/
@ExperimentalCoroutinesApi
@Deprecated("The name of this function is misleading: it not only advances the time, but also runs the tasks " +
"scheduled *at* the ending moment.",
ReplaceWith("this.testScheduler.apply { advanceTimeBy(delayTimeMillis); runCurrent() }"),
DeprecationLevel.WARNING)
public fun TestCoroutineScope.advanceTimeBy(delayTimeMillis: Long): Unit =
when (val controller = coroutineContext.delayController) {
null -> {
testScheduler.advanceTimeBy(delayTimeMillis)
testScheduler.runCurrent()
}
else -> {
controller.advanceTimeBy(delayTimeMillis)
Unit
}
}
/**
* Advances the [testScheduler][TestCoroutineScope.testScheduler] to the point where there are no tasks remaining.
* @see TestCoroutineScheduler.advanceUntilIdle
*/
@ExperimentalCoroutinesApi
public fun TestCoroutineScope.advanceUntilIdle(): Unit {
coroutineContext.delayController?.advanceUntilIdle() ?: testScheduler.advanceUntilIdle()
}
/**
* Run any tasks that are pending at the current virtual time, according to
* the [testScheduler][TestCoroutineScope.testScheduler].
*
* @see TestCoroutineScheduler.runCurrent
*/
@ExperimentalCoroutinesApi
public fun TestCoroutineScope.runCurrent(): Unit {
coroutineContext.delayController?.runCurrent() ?: testScheduler.runCurrent()
}
@ExperimentalCoroutinesApi
@Deprecated("The test coroutine scope isn't able to pause its dispatchers in the general case. " +
"Only `TestCoroutineDispatcher` supports pausing; pause it directly.",
ReplaceWith("(this.coroutineContext[ContinuationInterceptor]!! as DelayController).pauseDispatcher(block)",
"kotlin.coroutines.ContinuationInterceptor"),
DeprecationLevel.WARNING)
public suspend fun TestCoroutineScope.pauseDispatcher(block: suspend () -> Unit) {
delayControllerForPausing.pauseDispatcher(block)
}
@ExperimentalCoroutinesApi
@Deprecated("The test coroutine scope isn't able to pause its dispatchers in the general case. " +
"Only `TestCoroutineDispatcher` supports pausing; pause it directly.",
ReplaceWith("(this.coroutineContext[ContinuationInterceptor]!! as DelayController).pauseDispatcher()",
"kotlin.coroutines.ContinuationInterceptor"),
level = DeprecationLevel.WARNING)
public fun TestCoroutineScope.pauseDispatcher() {
delayControllerForPausing.pauseDispatcher()
}
@ExperimentalCoroutinesApi
@Deprecated("The test coroutine scope isn't able to pause its dispatchers in the general case. " +
"Only `TestCoroutineDispatcher` supports pausing; pause it directly.",
ReplaceWith("(this.coroutineContext[ContinuationInterceptor]!! as DelayController).resumeDispatcher()",
"kotlin.coroutines.ContinuationInterceptor"),
level = DeprecationLevel.WARNING)
public fun TestCoroutineScope.resumeDispatcher() {
delayControllerForPausing.resumeDispatcher()
}
private val TestCoroutineScope.delayControllerForPausing: DelayController
get() = coroutineContext.delayController
?: throw IllegalStateException("This scope isn't able to pause its dispatchers")
/**
* Thrown when a test has completed and there are tasks that are not completed or cancelled.
*/
@ExperimentalCoroutinesApi
internal class UncompletedCoroutinesError(message: String): AssertionError(message)