-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathTestBuilders.kt
95 lines (88 loc) · 3.75 KB
/
TestBuilders.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.test
import kotlinx.coroutines.*
import kotlin.coroutines.*
/**
* Executes a [testBody] inside an immediate execution dispatcher.
*
* This is similar to [runBlocking] but it will immediately progress past delays and into [launch] and [async] blocks.
* You can use this to write tests that execute in the presence of calls to [delay] without causing your test to take
* extra time.
*
* ```
* @Test
* fun exampleTest() = runBlockingTest {
* val deferred = async {
* delay(1_000)
* async {
* delay(1_000)
* }.await()
* }
*
* deferred.await() // result available immediately
* }
*
* ```
*
* This method requires that all coroutines launched inside [testBody] complete, or are cancelled, as part of the test
* conditions.
*
* Unhandled exceptions thrown by coroutines in the test will be re-thrown at the end of the test.
*
* @throws UncompletedCoroutinesError If the [testBody] does not complete (or cancel) all coroutines that it launches
* (including coroutines suspended on join/await).
*
* @param context additional context elements. If [context] contains [CoroutineDispatcher] or [CoroutineExceptionHandler],
* then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively.
* @param testBody The code of the unit-test.
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, testBody: suspend TestCoroutineScope.() -> Unit) {
val (safeContext, dispatcher) = context.checkArguments()
val startingJobs = safeContext.activeJobs()
val scope = TestCoroutineScope(safeContext)
val deferred = scope.async {
scope.testBody()
}
dispatcher.advanceUntilIdle()
deferred.getCompletionExceptionOrNull()?.let {
throw it
}
scope.cleanupTestCoroutines()
val endingJobs = safeContext.activeJobs()
if ((endingJobs - startingJobs).isNotEmpty()) {
throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs")
}
}
private fun CoroutineContext.activeJobs(): Set<Job> {
return checkNotNull(this[Job]).children.filter { it.isActive }.toSet()
}
/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineScope].
*/
// todo: need documentation on how this extension is supposed to be used
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineScope.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit): Unit =
runBlockingTest(coroutineContext, block)
/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineDispatcher.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit): Unit =
runBlockingTest(this, block)
private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, DelayController> {
val dispatcher = when (val dispatcher = get(ContinuationInterceptor)) {
is DelayController -> dispatcher
null -> TestCoroutineDispatcher()
else -> throw IllegalArgumentException("Dispatcher must implement DelayController: $dispatcher")
}
val exceptionHandler = when (val handler = get(CoroutineExceptionHandler)) {
is UncaughtExceptionCaptor -> handler
null -> TestCoroutineExceptionHandler()
else -> throw IllegalArgumentException("coroutineExceptionHandler must implement UncaughtExceptionCaptor: $handler")
}
val job = get(Job) ?: SupervisorJob()
return Pair(this + dispatcher + exceptionHandler + job, dispatcher)
}