Skip to content

Commit d3f40a9

Browse files
committed
Implement an eventLoop for runBlockingTest.
As a result, runBlockingTest is now correct in the presence of repeated time-delayed dispatch from other dispatchers. Changes: - runBlockingTest will now allow a 30 second timeout for other dispatchers to complete coroutines - Introduced WaitConfig, SingleDispatcherWaitConfig, and MultiDispatcherWaitConfig to configure runBlockingTest timeout behavior - Added DelayController.queueState as a ConflatedBroadcastChannel to observe or poll the queue status of a TestCoroutineDispatcher - Added queue status of Idle, HasCurrentTask and HasDelayedTask as public APIs for quering the status of a TestCoroutineDispatcher - Added dependency on runBlocking from runBlockingTest - Improved documentation for threading concerns around resumeDispatcher
1 parent baade87 commit d3f40a9

File tree

5 files changed

+486
-112
lines changed

5 files changed

+486
-112
lines changed

kotlinx-coroutines-test/src/DelayController.kt

+67
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package kotlinx.coroutines.test
22

33
import kotlinx.coroutines.CoroutineDispatcher
44
import kotlinx.coroutines.ExperimentalCoroutinesApi
5+
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
56

67
/**
78
* Control the virtual clock time of a [CoroutineDispatcher].
@@ -112,9 +113,75 @@ public interface DelayController {
112113
* Resumed dispatchers will automatically progress through all coroutines scheduled at the current time. To advance
113114
* time and execute coroutines scheduled in the future use, one of [advanceTimeBy],
114115
* or [advanceUntilIdle].
116+
*
117+
* When the dispatcher is resumed, all execution be immediate in the thread that triggered it. This means
118+
* that the following code will not switch back from Dispatchers.IO after `withContext`
119+
*
120+
* ```
121+
* runBlockingTest {
122+
* withContext(Dispatchers.IO) { doIo() }
123+
* // runBlockingTest is still on Dispatchers.IO here
124+
* }
125+
* ```
126+
*
127+
* For tests that need accurate threading behavior, [pauseDispatcher] will ensure that the following test dispatches
128+
* on the correct thread.
129+
*
130+
* ```
131+
* runBlockingTest {
132+
* pauseDispatcher()
133+
* withContext(Dispatchers.IO) { doIo() }
134+
* // runBlockingTest has returned to it's starting thread here
135+
* }
136+
* ```
115137
*/
116138
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
117139
public fun resumeDispatcher()
140+
141+
/**
142+
* Represents the queue state of a DelayController.
143+
*
144+
* Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple
145+
* [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest].
146+
*/
147+
public sealed class QueueState {
148+
/**
149+
* A [DelayController] that is idle does not currently have any tasks to perform.
150+
*
151+
* This may happen if all coroutines in this [DelayController] have completed, or if they are all suspended
152+
* waiting on other dispatchers.
153+
*/
154+
public object Idle: QueueState() {
155+
override fun toString() = "Idle"
156+
}
157+
158+
/**
159+
* A [DelayController] that has a task that will execute in response to a call to [runCurrent].
160+
*
161+
* There may also be delayed tasks scheduled, in which case [HasCurrentTask] takes priority since current tasks
162+
* will execute at an earlier virtual time.
163+
*/
164+
public object HasCurrentTask: QueueState() {
165+
override fun toString() = "HasCurrentTask"
166+
}
167+
168+
/**
169+
* A [DelayController] that has delayed tasks has a task scheduled for sometime in the future.
170+
*
171+
* If there are also tasks at the current time, [HasCurrentTask] will take priority.
172+
*/
173+
public object HasDelayedTask: QueueState(){
174+
override fun toString() = "HasDelayedTask"
175+
}
176+
}
177+
178+
/**
179+
* A ConflatedBroadcastChannel that is up to date with the current [QueueState].
180+
*
181+
* Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple
182+
* [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest].
183+
*/
184+
public val queueState: ConflatedBroadcastChannel<QueueState>
118185
}
119186

120187
/**

kotlinx-coroutines-test/src/TestBuilders.kt

+167-47
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,88 @@
55
package kotlinx.coroutines.test
66

77
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.selects.select
9+
import java.lang.StringBuilder
810
import kotlin.coroutines.*
911

12+
private const val DEFAULT_TEST_TIMEOUT = 30_000L
13+
14+
/**
15+
* A strategy for waiting on coroutines executed on other dispatchers inside a [runBlockingTest].
16+
*
17+
* Most tests should use [MultiDispatcherWaitConfig]. As an optimization, a test that executes coroutines only on a
18+
* [TestCoroutineDispatcher] and never interacts with other dispatchers may use [SingleDispatcherWaitConfig].
19+
*
20+
* A test may subclass this to customize the wait in advanced cases.
21+
*/
22+
interface WaitConfig {
23+
/**
24+
* How long (in wall-clock time) to wait for other Dispatchers to complete coroutines during a [runBlockingTest].
25+
*
26+
* This delay is not related to the virtual time of a [TestCoroutineDispatcher], but is how long a test should allow
27+
* another dispatcher, like Dispatchers.IO, to perform a time consuming activity such as reading from a database.
28+
*/
29+
val wait: Long
30+
}
31+
32+
/**
33+
* Do not wait for coroutines executing on another [Dispatcher] in [runBlockingTest].
34+
35+
* Always fails with an uncompleted coroutine when any coroutine in the test executes on any other dispatcher (including
36+
* calls to [withContext]). It should not be used for most tests, instead use the default value of
37+
* [MultiDispatcherWaitConfig].
38+
*
39+
* This configuration should only be used as an optimization for tests that intentionally create an uncompleted
40+
* coroutine and execute all coroutines on the [TestCoroutineDispatcher] used by [runBlockingTest].
41+
*
42+
* If in doubt, prefer [MultiDispatcherWaitConfig].
43+
*/
44+
object SingleDispatcherWaitConfig : WaitConfig {
45+
/**
46+
* This value is ignored by [runBlockingTest] on [SingleDispatcherWaitConfig]
47+
*/
48+
override val wait = 0L
49+
50+
override fun toString() = "SingleDispatcherWaitConfig"
51+
}
52+
53+
/**
54+
* Wait up to 30 seconds for any coroutines running on another [Dispatcher] to complete in [runBlockingTest].
55+
*
56+
* This is the default value for [runBlockingTest] and the recommendation for most tests. This configuration will allow
57+
* for coroutines to be launched on another dispatcher inside the test (e.g. calls to `withContext(Dispatchers.IO)`).
58+
*
59+
* This allows for code like the following to be tested correctly using [runBlockingTest]:
60+
*
61+
* ```
62+
* suspend fun delayOnDefault() = withContext(Dispatchers.Default) {
63+
* // this delay does not use the virtual-time of runBlockingTest since it's executing on Dispatchers.Default
64+
* delay(50)
65+
* }
66+
*
67+
* runBlockingTest {
68+
* // Note: This test takes at least 50ms (real time) to run
69+
*
70+
* // delayOnDefault will suspend the runBlockingTest coroutine for 50ms [real-time: 0; virtual-time: 0]
71+
* delayOnDefault()
72+
* // runBlockingTest resumes 50ms later (real time) [real-time: 50; virtual-time: 0]
73+
*
74+
* delay(10)
75+
* //this delay will auto-progress since it's in runBlockingTest [real-time: 50; virtual-time: 10]
76+
* }
77+
* ```
78+
*/
79+
object MultiDispatcherWaitConfig: WaitConfig {
80+
/**
81+
* Default wait is 30 seconds.
82+
*
83+
* {@inheritDoc}
84+
*/
85+
override val wait = DEFAULT_TEST_TIMEOUT
86+
87+
override fun toString() = "MultiDispatcherWaitConfig[wait = 30s]"
88+
}
89+
1090
/**
1191
* Executes a [testBody] inside an immediate execution dispatcher.
1292
*
@@ -38,64 +118,104 @@ import kotlin.coroutines.*
38118
* (including coroutines suspended on join/await).
39119
*
40120
* @param context additional context elements. If [context] contains [CoroutineDispatcher] or [CoroutineExceptionHandler],
41-
* then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively.
121+
* then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively.
122+
* @param waitConfig strategy for waiting on other dispatchers to complete during the test. [SingleDispatcherWaitConfig]
123+
* will never wait, other values will wait for [WaitConfig.wait]ms.
42124
* @param testBody The code of the unit-test.
43125
*/
44126
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
45-
public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, testBody: suspend TestCoroutineScope.() -> Unit) {
127+
public fun runBlockingTest(
128+
context: CoroutineContext = EmptyCoroutineContext,
129+
waitConfig: WaitConfig = MultiDispatcherWaitConfig,
130+
testBody: suspend TestCoroutineScope.() -> Unit
131+
) {
46132
val (safeContext, dispatcher) = context.checkArguments()
47133
val startingJobs = safeContext.activeJobs()
48-
val scope = TestCoroutineScope(safeContext)
49134

50-
val deferred = scope.async {
51-
scope.testBody()
135+
var testScope: TestCoroutineScope? = null
136+
137+
val deferred = CoroutineScope(safeContext).async {
138+
val localTestScope = TestCoroutineScope(coroutineContext)
139+
testScope = localTestScope
140+
localTestScope.testBody()
52141
}
53142

54-
// run any outstanding coroutines that can be completed by advancing virtual-time
55-
dispatcher.advanceUntilIdle()
56-
57-
// fetch results from the coroutine - this may require a thread hop if some child coroutine was *completed* on
58-
// another thread during this test so we must use an invokeOnCompletion handler to retrieve the result.
59-
60-
// There are two code paths for fetching the error:
61-
//
62-
// 1. The job was already completed (happy path, normal test)
63-
// - invokeOnCompletion was executed immediately and errorThrownByTestOrNull is already at it's final value so
64-
// we can throw it
65-
// 2. The job has not already completed (always fail the test due to error or time-based non-determinism)
66-
// - invokeOnCompletion will not be triggered right away. To avoid introducing wall non-deterministic behavior
67-
// (the deferred may complete between here and the call to activeJobs below) this will always be considered a
68-
// test failure.
69-
// - this will not happen if all coroutines are only waiting to complete due to thread hops, but may happen
70-
// if another thread triggers completion concurrently with this cleanup code.
71-
//
72-
// give test code errors a priority in the happy path, throw here if the error is already known.
73-
val (wasCompletedAfterTest, errorThrownByTestOrNull) = deferred.getResultIfKnown()
74-
errorThrownByTestOrNull?.let { throw it }
75-
76-
scope.cleanupTestCoroutines()
77-
val endingJobs = safeContext.activeJobs()
78-
if ((endingJobs - startingJobs).isNotEmpty()) {
79-
throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs")
143+
val didTimeout = deferred.waitForCompletion(waitConfig, dispatcher)
144+
145+
if (deferred.isCompleted) {
146+
deferred.getCompletionExceptionOrNull()?.let {
147+
throw it
148+
}
80149
}
81150

82-
if (!wasCompletedAfterTest) {
83-
// Handle path #2, we are going to fail the test in an opinionated way at this point so let the developer know
84-
// how to fix it.
85-
throw UncompletedCoroutinesError("Test completed all jobs after cleanup code started. This is " +
86-
"thrown to avoid non-deterministic behavior in tests (the next execution may fail randomly). Ensure " +
87-
"all threads started by the test are completed before returning from runBlockingTest.")
151+
testScope!!.cleanupTestCoroutines()
152+
val endingJobs = safeContext.activeJobs()
153+
154+
// TODO: should these be separate exceptions to allow for tests to detect difference?
155+
if (didTimeout) {
156+
val message = """
157+
runBlockingTest timed out after waiting ${waitConfig.wait}ms for coroutines to complete due waitConfig = $waitConfig.
158+
Active jobs after test (may be empty): $endingJobs
159+
""".trimIndent()
160+
throw UncompletedCoroutinesError(message)
161+
} else if ((endingJobs - startingJobs).isNotEmpty()) {
162+
val message = StringBuilder("Test finished with active jobs: ")
163+
message.append(endingJobs)
164+
if (waitConfig == SingleDispatcherWaitConfig) {
165+
message.append("""
166+
167+
Note: runBlockingTest did not wait for other dispatchers due to argument waitConfig = $waitConfig
168+
169+
Tip: If this runBlockingTest starts any code on another dispatcher (such as Dispatchers.Default,
170+
Dispatchers.IO, etc) in any of the functions it calls it will never pass when configured with
171+
SingleDispatcherWaitConfig. Please update your test to use the default value of MultiDispatcherWaitConfig
172+
like:
173+
174+
runBlockingTest { }
175+
176+
""".trimIndent())
177+
}
178+
throw UncompletedCoroutinesError(message.toString())
88179
}
89180
}
90181

91-
private fun Deferred<Unit>.getResultIfKnown(): Pair<Boolean, Throwable?> {
92-
var testError: Throwable? = null
93-
var wasExecuted = false
94-
invokeOnCompletion { errorFromTestOrNull ->
95-
testError = errorFromTestOrNull
96-
wasExecuted = true
97-
}.dispose()
98-
return Pair(wasExecuted, testError)
182+
private fun Deferred<Unit>.waitForCompletion(waitConfig: WaitConfig, dispatcher: DelayController): Boolean {
183+
var didTimeout = false
184+
when (waitConfig) {
185+
SingleDispatcherWaitConfig -> dispatcher.advanceUntilIdle()
186+
else -> {
187+
runBlocking {
188+
val subscription = dispatcher.queueState.openSubscription()
189+
dispatcher.advanceUntilIdle()
190+
191+
var finished = false
192+
try {
193+
while (!finished) {
194+
finished = select {
195+
this@waitForCompletion.onAwait {
196+
true
197+
}
198+
onTimeout(waitConfig.wait) {
199+
didTimeout = true
200+
true
201+
}
202+
subscription.onReceive { queueState ->
203+
when (queueState) {
204+
DelayController.QueueState.Idle -> Unit
205+
else -> dispatcher.advanceUntilIdle()
206+
}
207+
false
208+
}
209+
}
210+
}
211+
} finally {
212+
subscription.cancel()
213+
}
214+
}
215+
216+
}
217+
}
218+
return didTimeout
99219
}
100220

101221
private fun CoroutineContext.activeJobs(): Set<Job> {
@@ -107,13 +227,13 @@ private fun CoroutineContext.activeJobs(): Set<Job> {
107227
*/
108228
// todo: need documentation on how this extension is supposed to be used
109229
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
110-
public fun TestCoroutineScope.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, block)
230+
public fun TestCoroutineScope.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, configuration, block)
111231

112232
/**
113233
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher].
114234
*/
115235
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
116-
public fun TestCoroutineDispatcher.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, block)
236+
public fun TestCoroutineDispatcher.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, configuration, block)
117237

118238
private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, DelayController> {
119239
// TODO optimize it

0 commit comments

Comments
 (0)