Skip to content

Commit 4da43bf

Browse files
committed
Support completing the test coroutine from outside the test thread.
This requires a call to invokeOnCompletion to get the completion state from the other thread. Changes: - runBlockingTest now supports completing the test coroutine from another thread - success and failure path tests While fixing this, a subtle non-determinism was discovered that would lead to flakey tests. If another thread completed the test coroutine *during* the cleanup checks it was possible to modify the state of the test during the cleanup checks in a way that could expose false-positive or false-negative results randomly. To resolve this, a non-completed coroutine immediately after advanceUntilIdle is now considered a failure, even if it completes before the more aggressive cleanup checks. There is still a very brief window (between advanceTimeBy and getResultIfKnown) that non-determinism may be introduced, but it will fail with a descriptive error message at on random executions directing the developer to resolve the non-determinstic behavior. Note: testWithOddlyCompletingJob_fails may fail when the implementation of runBlockingTest changes (it's very tightly coupled).
1 parent 218dc97 commit 4da43bf

File tree

2 files changed

+133
-3
lines changed

2 files changed

+133
-3
lines changed

kotlinx-coroutines-test/src/TestBuilders.kt

+41-3
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,56 @@ public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, te
4646
val (safeContext, dispatcher) = context.checkArguments()
4747
val startingJobs = safeContext.activeJobs()
4848
val scope = TestCoroutineScope(safeContext)
49+
4950
val deferred = scope.async {
5051
scope.testBody()
5152
}
53+
54+
// run any outstanding coroutines that can be completed by advancing virtual-time
5255
dispatcher.advanceUntilIdle()
53-
deferred.getCompletionExceptionOrNull()?.let {
54-
throw it
55-
}
56+
57+
// fetch results from the coroutine - this may require a thread hop if some child coroutine was *completed* on
58+
// another thread during this test so we must use an invokeOnCompletion handler to retrieve the result.
59+
60+
// There are two code paths for fetching the error:
61+
//
62+
// 1. The job was already completed (happy path, normal test)
63+
// - invokeOnCompletion was executed immediately and errorThrownByTestOrNull is already at it's final value so
64+
// we can throw it
65+
// 2. The job has not already completed (always fail the test due to error or time-based non-determinism)
66+
// - invokeOnCompletion will not be triggered right away. To avoid introducing wall non-deterministic behavior
67+
// (the deferred may complete between here and the call to activeJobs below) this will always be considered a
68+
// test failure.
69+
// - this will not happen if all coroutines are only waiting to complete due to thread hops, but may happen
70+
// if another thread triggers completion concurrently with this cleanup code.
71+
//
72+
// give test code errors a priority in the happy path, throw here if the error is already known.
73+
val (wasCompletedAfterTest, errorThrownByTestOrNull) = deferred.getResultIfKnown()
74+
errorThrownByTestOrNull?.let { throw it }
75+
5676
scope.cleanupTestCoroutines()
5777
val endingJobs = safeContext.activeJobs()
5878
if ((endingJobs - startingJobs).isNotEmpty()) {
5979
throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs")
6080
}
81+
82+
if (!wasCompletedAfterTest) {
83+
// Handle path #2, we are going to fail the test in an opinionated way at this point so let the developer know
84+
// how to fix it.
85+
throw UncompletedCoroutinesError("Test completed all jobs after cleanup code started. This is " +
86+
"thrown to avoid non-deterministic behavior in tests (the next execution may fail randomly). Ensure " +
87+
"all threads started by the test are completed before returning from runBlockingTest.")
88+
}
89+
}
90+
91+
private fun Deferred<Unit>.getResultIfKnown(): Pair<Boolean, Throwable?> {
92+
var testError: Throwable? = null
93+
var wasExecuted = false
94+
invokeOnCompletion { errorFromTestOrNull ->
95+
testError = errorFromTestOrNull
96+
wasExecuted = true
97+
}.dispose()
98+
return Pair(wasExecuted, testError)
6199
}
62100

63101
private fun CoroutineContext.activeJobs(): Set<Job> {

kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt

+92
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ package kotlinx.coroutines.test
66

77
import kotlinx.coroutines.*
88
import org.junit.*
9+
import kotlin.concurrent.thread
910
import kotlin.coroutines.*
11+
import kotlin.test.assertEquals
1012

1113
class TestRunBlockingOrderTest : TestBase() {
1214
@Test
@@ -67,4 +69,94 @@ class TestRunBlockingOrderTest : TestBase() {
6769
}
6870
expect(2)
6971
}
72+
73+
@Test
74+
fun testNewThread_inSuspendCancellableCoroutine() = runBlockingTest {
75+
expect(1)
76+
suspendCancellableCoroutine<Unit> { cont ->
77+
expect(2)
78+
thread {
79+
expect(3)
80+
cont.resume(Unit)
81+
}
82+
}
83+
finish(4)
84+
}
85+
86+
@Test(expected = UncompletedCoroutinesError::class)
87+
fun testWithOddlyCompletingJob_fails() {
88+
// this test is suspect since it relies upon the exact ordering of code in runBlockingTest
89+
// however, it needs to ensure the job finishes *after* advanceUntilIdle is called in order
90+
// to ensure that runBlockingTest errors when presented with threading non-determinism.
91+
92+
// this test is stable and will always pass unless the implementation changes.
93+
94+
// If this starts failing because the call to cleanupTestCoroutines changes it will need a similarly
95+
// implementation driven test.
96+
97+
class FakeDispatcher(val delegate: TestCoroutineDispatcher):
98+
CoroutineDispatcher(),
99+
Delay by delegate,
100+
DelayController by delegate {
101+
private var cleanupCallback: (() -> Unit)? = null
102+
103+
override fun dispatch(context: CoroutineContext, block: Runnable) {
104+
delegate.dispatch(context, block)
105+
}
106+
107+
fun onCleanup(block: () -> Unit) {
108+
cleanupCallback = block
109+
}
110+
111+
override fun cleanupTestCoroutines() {
112+
delegate.cleanupTestCoroutines()
113+
cleanupCallback?.invoke()
114+
}
115+
}
116+
117+
val dispatcher = FakeDispatcher(TestCoroutineDispatcher())
118+
val scope = TestCoroutineScope(dispatcher)
119+
val resumeAfterTest = CompletableDeferred<Unit>()
120+
121+
scope.runBlockingTest {
122+
expect(1)
123+
dispatcher.onCleanup {
124+
// after advanceTimeUntilIdle, complete the launched coroutine
125+
expect(3)
126+
resumeAfterTest.complete(Unit)
127+
finish(5)
128+
}
129+
expect(2)
130+
resumeAfterTest.await() // this will resume just before child jobs are checked
131+
expect(4)
132+
}
133+
}
134+
135+
@Test
136+
fun testThrows_throws() {
137+
val expected = IllegalStateException("expected")
138+
val result = runCatching {
139+
expect(1)
140+
runBlockingTest {
141+
expect(2)
142+
throw expected
143+
}
144+
}
145+
finish(3)
146+
assertEquals(expected, result.exceptionOrNull())
147+
}
148+
149+
@Test
150+
fun testSuspendForever_fails() {
151+
val uncompleted = CompletableDeferred<Unit>()
152+
val result = runCatching {
153+
expect(1)
154+
runBlockingTest {
155+
expect(2)
156+
uncompleted.await()
157+
}
158+
}
159+
finish(3)
160+
assertEquals(true, result.isFailure)
161+
}
70162
}

0 commit comments

Comments
 (0)