Skip to content

Better handle the exceptions from child coroutines in runTest #2995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 45 additions & 12 deletions kotlinx-coroutines-test/common/src/TestBuilders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import kotlin.coroutines.*
*/
@Deprecated("Use `runTest` instead to support completing from other dispatchers.", level = DeprecationLevel.WARNING)
public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, testBody: suspend TestCoroutineScope.() -> Unit) {
val scope = TestCoroutineScope(context)
val scope = TestCoroutineScope(TestCoroutineDispatcher() + SupervisorJob() + context)
val scheduler = scope.testScheduler
val deferred = scope.async {
scope.testBody()
Expand Down Expand Up @@ -141,6 +141,18 @@ public expect class TestResult
*
* ### Failures
*
* #### Test body failures
*
* If the test body finishes with an exception, then this exception will be thrown at the end of the test.
*
* #### Reported exceptions
*
* Exceptions reported to the test coroutine scope via [TestCoroutineScope.reportException] will be thrown at the end.
* By default, unless an explicit [TestExceptionHandler] is passed, this includes all unhandled exceptions. If the test
* body also fails, the reported exceptions are suppressed by it.
*
* #### Uncompleted coroutines
*
* This method requires that all coroutines launched inside [testBody] complete, or are cancelled. Otherwise, the test
* will be failed (which, on JVM and Native, means that [runTest] itself will throw [AssertionError],
* whereas on JS, the `Promise` will fail with it).
Expand All @@ -151,8 +163,6 @@ public expect class TestResult
* idle before throwing [AssertionError]. If some dispatcher linked to [TestCoroutineScheduler] receives a
* task during that time, the timer gets reset.
*
* Unhandled exceptions thrown by coroutines in the test will be rethrown at the end of the test.
*
* ### Configuration
*
* [context] can be used to affect the environment of the code under test. Beside just being passed to the coroutine
Expand All @@ -170,16 +180,18 @@ public fun runTest(
): TestResult {
if (context[RunningInRunTest] != null)
throw IllegalStateException("Calls to `runTest` can't be nested. Please read the docs on `TestResult` for details.")
val testScope = TestCoroutineScope(context + RunningInRunTest())
val testScope = TestBodyCoroutine<Unit>(TestCoroutineScope(context + RunningInRunTest))
val scheduler = testScope.testScheduler
return createTestResult {
val deferred = testScope.async {
testScope.testBody()
/** TODO: moving this [AbstractCoroutine.start] call outside [createTestResult] fails on Native with
* [TestCoroutineDispatcher], because the event loop is not started. */
testScope.start(CoroutineStart.DEFAULT, testScope) {
testBody()
}
var completed = false
while (!completed) {
scheduler.advanceUntilIdle()
if (deferred.isCompleted) {
if (testScope.isCompleted) {
/* don't even enter `withTimeout`; this allows to use a timeout of zero to check that there are no
non-trivial dispatches. */
completed = true
Expand All @@ -188,7 +200,7 @@ public fun runTest(
try {
withTimeout(dispatchTimeoutMs) {
select<Unit> {
deferred.onAwait {
testScope.onJoin {
completed = true
}
scheduler.onDispatchEvent {
Expand All @@ -205,7 +217,14 @@ public fun runTest(
throw UncompletedCoroutinesError("The test coroutine was not completed after waiting for $dispatchTimeoutMs ms")
}
}
deferred.getCompletionExceptionOrNull()?.let {
testScope.getCompletionExceptionOrNull()?.let {
try {
testScope.cleanupTestCoroutines()
} catch (e: UncompletedCoroutinesError) {
// it's normal that some jobs are not completed if the test body has failed, won't clutter the output
} catch (e: Throwable) {
it.addSuppressed(e)
}
throw it
}
testScope.cleanupTestCoroutines()
Expand All @@ -222,7 +241,7 @@ internal expect fun createTestResult(testProcedure: suspend () -> Unit): TestRes
* Runs a test in a [TestCoroutineScope] based on this one.
*
* Calls [runTest] using a coroutine context from this [TestCoroutineScope]. The [TestCoroutineScope] used to run
* [block] will be different from this one, but will reuse its [Job]; therefore, even if calling
* [block] will be different from this one, but will use its [Job] as a parent; therefore, even if calling
* [TestCoroutineScope.cleanupTestCoroutines] on this scope were to complete its job, [runTest] won't complete it at the
* end of the test.
*
Expand Down Expand Up @@ -252,10 +271,24 @@ public fun TestDispatcher.runTest(
runTest(this, dispatchTimeoutMs, block)

/** A coroutine context element indicating that the coroutine is running inside `runTest`. */
private class RunningInRunTest: AbstractCoroutineContextElement(RunningInRunTest), CoroutineContext.Element {
companion object Key : CoroutineContext.Key<RunningInRunTest>
private object RunningInRunTest: CoroutineContext.Key<RunningInRunTest>, CoroutineContext.Element {
override val key: CoroutineContext.Key<*>
get() = this

override fun toString(): String = "RunningInRunTest"
}

/** The default timeout to use when waiting for asynchronous completions of the coroutines managed by
* a [TestCoroutineScheduler]. */
private const val DEFAULT_DISPATCH_TIMEOUT_MS = 10_000L

private class TestBodyCoroutine<T>(
private val testScope: TestCoroutineScope,
) : AbstractCoroutine<T>(testScope.coroutineContext, initParentJob = true, active = true), TestCoroutineScope,
UncaughtExceptionCaptor by testScope.coroutineContext.uncaughtExceptionCaptor
{
override val testScheduler get() = testScope.testScheduler

override fun cleanupTestCoroutines() = testScope.cleanupTestCoroutines()

}
26 changes: 17 additions & 9 deletions kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
}
}

/**
* Runs the next enqueued task, advancing the virtual time to the time of its scheduled awakening.
*/
private fun tryRunNextTask(): Boolean {
val event = synchronized(lock) {
val event = events.removeFirstOrNull() ?: return false
if (currentTime > event.time)
currentTimeAheadOfEvents()
currentTime = event.time
event
}
event.dispatcher.processEvent(event.time, event.marker)
return true
}

/**
* Runs the enqueued tasks in the specified order, advancing the virtual time as needed until there are no more
* tasks associated with the dispatchers linked to this scheduler.
Expand All @@ -91,15 +106,8 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
*/
@ExperimentalCoroutinesApi
public fun advanceUntilIdle() {
while (!events.isEmpty) {
val event = synchronized(lock) {
val event = events.removeFirstOrNull() ?: return
if (currentTime > event.time)
currentTimeAheadOfEvents()
currentTime = event.time
event
}
event.dispatcher.processEvent(event.time, event.marker)
while (!synchronized(lock) { events.isEmpty }) {
tryRunNextTask()
}
}

Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-test/common/src/TestCoroutineScope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public fun TestCoroutineScope(context: CoroutineContext = EmptyCoroutineContext)
val job: Job
val ownJob: CompletableJob?
if (context[Job] == null) {
ownJob = SupervisorJob()
ownJob = Job()
job = ownJob
} else {
ownJob = null
Expand All @@ -124,7 +124,7 @@ public fun TestCoroutineScope(context: CoroutineContext = EmptyCoroutineContext)
return TestCoroutineScopeImpl(context + scheduler + dispatcher + exceptionHandler + job, ownJob)
}

private inline val CoroutineContext.uncaughtExceptionCaptor: UncaughtExceptionCaptor
internal inline val CoroutineContext.uncaughtExceptionCaptor: UncaughtExceptionCaptor
get() {
val handler = this[CoroutineExceptionHandler]
return handler as? UncaughtExceptionCaptor ?: throw IllegalArgumentException(
Expand Down
4 changes: 3 additions & 1 deletion kotlinx-coroutines-test/common/test/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ inline fun <T> assertRunsFast(block: () -> T): T = assertRunsFast(Duration.secon
/**
* Passes [test] as an argument to [block], but as a function returning not a [TestResult] but [Unit].
*/
expect fun testResultMap(block: (() -> Unit) -> Unit, test: () -> TestResult): TestResult
expect fun testResultMap(block: (() -> Unit) -> Unit, test: () -> TestResult): TestResult

class TestException(message: String? = null): Exception(message)
64 changes: 64 additions & 0 deletions kotlinx-coroutines-test/common/test/RunTestTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.test

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*
import kotlin.test.*

Expand Down Expand Up @@ -153,4 +154,67 @@ class RunTestTest {
}
}

@Test
fun reproducer2405() = runTest {
val dispatcher = TestCoroutineDispatcher(testScheduler)
var collectedError = false
withContext(dispatcher) {
flow { emit(1) }
.combine(
flow<String> { throw IllegalArgumentException() }
) { int, string -> int.toString() + string }
.catch { emit("error") }
.collect {
assertEquals("error", it)
collectedError = true
}
}
assertTrue(collectedError)
}

/** Tests that, once the test body has thrown, the child coroutines are cancelled. */
@Test
fun testChildrenCancellationOnTestBodyFailure() {
var job: Job? = null
testResultMap({
assertFailsWith<AssertionError> { it() }
assertTrue(job!!.isCancelled)
}, {
runTest {
job = launch {
while (true) {
delay(1000)
}
}
throw AssertionError()
}
})
}

/** Tests that [runTest] reports [TimeoutCancellationException]. */
@Test
fun testTimeout() = testResultMap({
assertFailsWith<TimeoutCancellationException> { it() }
}, {
runTest {
withTimeout(50) {
launch {
delay(1000)
}
}
}
})

/** Checks that [runTest] throws the root cause and not [JobCancellationException] when a child coroutine throws. */
@Test
fun testRunTestThrowsRootCause() = testResultMap({
assertFailsWith<TestException> { it() }
}, {
runTest {
launch {
throw TestException()
}
}
})

}
29 changes: 29 additions & 0 deletions kotlinx-coroutines-test/common/test/TestCoroutineScopeTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,35 @@ class TestCoroutineScopeTest {
assertFalse(handlerCalled)
}

/** Tests that uncaught exceptions are thrown at the cleanup. */
@Test
fun testThrowsUncaughtExceptionsOnCleanup() {
val scope = TestCoroutineScope()
val exception = TestException("test")
scope.launch {
throw exception
}
assertFailsWith<TestException> {
scope.cleanupTestCoroutines()
}
}

/** Tests that uncaught exceptions take priority over uncompleted jobs when throwing on cleanup. */
@Test
fun testUncaughtExceptionsPrioritizedOnCleanup() {
val scope = TestCoroutineScope()
val exception = TestException("test")
scope.launch {
throw exception
}
scope.launch {
delay(1000)
}
assertFailsWith<TestException> {
scope.cleanupTestCoroutines()
}
}

companion object {
internal val invalidContexts = listOf(
Dispatchers.Default, // not a [TestDispatcher]
Expand Down
4 changes: 1 addition & 3 deletions kotlinx-coroutines-test/common/test/TestRunBlockingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,4 @@ class TestRunBlockingTest {
}
}
}
}

private class TestException(message: String? = null): Exception(message)
}