Skip to content

Support completing the test coroutine from outside the test thread. #1206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions kotlinx-coroutines-test/src/DelayController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kotlinx.coroutines.test

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ConflatedBroadcastChannel

/**
* Control the virtual clock time of a [CoroutineDispatcher].
Expand Down Expand Up @@ -93,6 +94,9 @@ public interface DelayController {
*
* This is useful when testing functions that start a coroutine. By pausing the dispatcher assertions or
* setup may be done between the time the coroutine is created and started.
*
* While in the paused block, the dispatcher will queue all dispatched coroutines and they will be resumed on
* whatever thread calls [advanceUntilIdle], [advanceTimeBy], or [runCurrent].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public suspend fun pauseDispatcher(block: suspend () -> Unit)
Expand All @@ -102,6 +106,9 @@ public interface DelayController {
*
* When paused, the dispatcher will not execute any coroutines automatically, and you must call [runCurrent] or
* [advanceTimeBy], or [advanceUntilIdle] to execute coroutines.
*
* While paused, the dispatcher will queue all dispatched coroutines and they will be resumed on whatever thread
* calls [advanceUntilIdle], [advanceTimeBy], or [runCurrent].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun pauseDispatcher()
Expand All @@ -112,6 +119,28 @@ public interface DelayController {
* Resumed dispatchers will automatically progress through all coroutines scheduled at the current time. To advance
* time and execute coroutines scheduled in the future use, one of [advanceTimeBy],
* or [advanceUntilIdle].
*
* When the dispatcher is resumed, all execution be immediate in the thread that triggered it similar to
* [Dispatchers.Unconfined]. This means that the following code will not switch back from Dispatchers.IO after
* `withContext`
*
* ```
* runBlockingTest {
* withContext(Dispatchers.IO) { doIo() }
* // runBlockingTest is still on Dispatchers.IO here
* }
* ```
*
* For tests that need accurate threading behavior, [pauseDispatcher] will ensure that the following test dispatches
* on a controlled thread.
*
* ```
* runBlockingTest {
* pauseDispatcher()
* withContext(Dispatchers.IO) { doIo() }
* // runBlockingTest has returned to it's starting thread here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit counter-intuitive as well, pauseDispatcher does not pause the dispatcher, but changes its behaviour

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - this one I'm stuck on. It seems required to support the pauseDispatcher { } block in runBlockingTest but it has this side effect in the presence of multi-threading.

* }
* ```
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun resumeDispatcher()
Expand Down
84 changes: 71 additions & 13 deletions kotlinx-coroutines-test/src/TestBuilders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
package kotlinx.coroutines.test

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.selects.select
import kotlin.coroutines.*

private const val DEFAULT_WAIT_FOR_OTHER_DISPATCHERS = 30_000L

/**
* Executes a [testBody] inside an immediate execution dispatcher.
*
Expand Down Expand Up @@ -38,26 +42,79 @@ import kotlin.coroutines.*
* (including coroutines suspended on join/await).
*
* @param context additional context elements. If [context] contains [CoroutineDispatcher] or [CoroutineExceptionHandler],
* then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively.
* then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively.
* @param waitForOtherDispatchers how long to wait for other dispatchers to execute tasks asynchronously, default 30
* seconds
* @param testBody The code of the unit-test.
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, testBody: suspend TestCoroutineScope.() -> Unit) {
public fun runBlockingTest(
context: CoroutineContext = EmptyCoroutineContext,
waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS,
testBody: suspend TestCoroutineScope.() -> Unit
) {
val (safeContext, dispatcher) = context.checkArguments()
val startingJobs = safeContext.activeJobs()
val scope = TestCoroutineScope(safeContext)
val deferred = scope.async {
scope.testBody()

var testScope: TestCoroutineScope? = null

val deferred = CoroutineScope(safeContext).async {
val localTestScope = TestCoroutineScope(coroutineContext)
testScope = localTestScope
localTestScope.testBody()
}
dispatcher.advanceUntilIdle()
deferred.getCompletionExceptionOrNull()?.let {
throw it

val didTimeout = deferred.waitForCompletion(waitForOtherDispatchers, dispatcher, dispatcher as IdleWaiter)

if (deferred.isCompleted) {
deferred.getCompletionExceptionOrNull()?.let {
throw it
}
}
scope.cleanupTestCoroutines()

testScope!!.cleanupTestCoroutines()
val endingJobs = safeContext.activeJobs()
if ((endingJobs - startingJobs).isNotEmpty()) {
throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs")

// TODO: should these be separate exceptions to allow for tests to detect difference?
if (didTimeout) {
val message = """
runBlockingTest timed out after waiting ${waitForOtherDispatchers}ms for coroutines to complete.
Active jobs after test (may be empty): $endingJobs
""".trimIndent()
throw UncompletedCoroutinesError(message)
} else if ((endingJobs - startingJobs).isNotEmpty()) {
throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs ")
}
}

private fun Deferred<Unit>.waitForCompletion(wait: Long, delayController: DelayController, park: IdleWaiter): Boolean {
var didTimeout = false

runBlocking {
val unparkChannel = Channel<Unit>(1)
val job = launch {
while(true) {
park.suspendUntilNextDispatch()
unparkChannel.send(Unit)
}
}

try {
withTimeout(wait) {
while(!isCompleted) {
delayController.advanceUntilIdle()
select<Unit> {
onAwait { Unit }
unparkChannel.onReceive { Unit }
}
}
}
} catch (exception: TimeoutCancellationException) {
didTimeout = true
}
job.cancel()
}
return didTimeout
}

private fun CoroutineContext.activeJobs(): Set<Job> {
Expand All @@ -69,18 +126,19 @@ private fun CoroutineContext.activeJobs(): Set<Job> {
*/
// todo: need documentation on how this extension is supposed to be used
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineScope.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, block)
public fun TestCoroutineScope.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, waitForOtherDispatchers, block)

/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineDispatcher.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, block)
public fun TestCoroutineDispatcher.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, waitForOtherDispatchers, block)

private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, DelayController> {
// TODO optimize it
val dispatcher = get(ContinuationInterceptor).run {
this?.let { require(this is DelayController) { "Dispatcher must implement DelayController: $this" } }
this?.let { require(this is IdleWaiter) { "Dispatcher must implement IdleWaiter" } }
this ?: TestCoroutineDispatcher()
}

Expand Down
69 changes: 56 additions & 13 deletions kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

package kotlinx.coroutines.test

import kotlinx.atomicfu.*
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.math.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.internal.ThreadSafeHeap
import kotlinx.coroutines.internal.ThreadSafeHeapNode
import kotlin.coroutines.CoroutineContext
import kotlin.math.max

/**
* [CoroutineDispatcher] that performs both immediate and lazy execution of coroutines in tests
Expand All @@ -22,10 +25,14 @@ import kotlin.math.*
* not execute until a call to [DelayController.runCurrent] or the virtual clock-time has been advanced via one of the
* methods on [DelayController].
*
* While in immediate mode [TestCoroutineDispatcher] behaves similar to [Dispatchers.Unconfined]. When resuming from
* another thread it will *not* switch threads. When in lazy mode, [TestCoroutineDispatcher] will enqueue all
* dispatches and whatever thread calls an [advanceUntilIdle], [advanceTimeBy], or [runCurrent] will continue execution.
*
* @see DelayController
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayController {
public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayController, IdleWaiter {
private var dispatchImmediately = true
set(value) {
field = value
Expand All @@ -44,10 +51,13 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
// Storing time in nanoseconds internally.
private val _time = atomic(0L)

private val waitLock = Channel<Unit>(capacity = 1)

/** @suppress */
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (dispatchImmediately) {
block.run()
unpark()
} else {
post(block)
}
Expand Down Expand Up @@ -79,14 +89,18 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
return "TestCoroutineDispatcher[currentTime=${currentTime}ms, queued=${queue.size}]"
}

private fun post(block: Runnable) =
private fun post(block: Runnable) {
queue.addLast(TimedRunnable(block, _counter.getAndIncrement()))
unpark()
}

private fun postDelayed(block: Runnable, delayTime: Long) =
TimedRunnable(block, _counter.getAndIncrement(), safePlus(currentTime, delayTime))
.also {
queue.addLast(it)
}
private fun postDelayed(block: Runnable, delayTime: Long): TimedRunnable {
return TimedRunnable(block, _counter.getAndIncrement(), safePlus(currentTime, delayTime))
.also {
queue.addLast(it)
unpark()
}
}

private fun safePlus(currentTime: Long, delayTime: Long): Long {
check(delayTime >= 0)
Expand Down Expand Up @@ -132,11 +146,14 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
val next = queue.peek() ?: break
advanceUntilTime(next.time)
}

return currentTime - oldTime
}

/** @suppress */
override fun runCurrent() = doActionsUntil(currentTime)
override fun runCurrent() {
doActionsUntil(currentTime)
}

/** @suppress */
override suspend fun pauseDispatcher(block: suspend () -> Unit) {
Expand All @@ -161,6 +178,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl

/** @suppress */
override fun cleanupTestCoroutines() {
unpark()
// process any pending cancellations or completions, but don't advance time
doActionsUntil(currentTime)

Expand All @@ -181,6 +199,14 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
)
}
}

override suspend fun suspendUntilNextDispatch() {
waitLock.receive()
}

private fun unpark() {
waitLock.offer(Unit)
}
}

/**
Expand Down Expand Up @@ -212,4 +238,21 @@ private class TimedRunnable(
}

override fun toString() = "TimedRunnable(time=$time, run=$runnable)"
}
}

/**
* Alternative implementations of [TestCoroutineDispatcher] must implement this interface in order to be supported by
* [runBlockingTest].
*
* This interface allows external code to suspend itself until the next dispatch is received. This is similar to park in
* a normal event loop, but doesn't require that [TestCoroutineDispatcher] block a thread while parked.
*/
interface IdleWaiter {
/**
* Attempt to suspend until the next dispatch is received.
*
* This method may resume immediately if any dispatch was received since the last time it was called. This ensures
* that dispatches won't be dropped if they happen just before calling [suspendUntilNextDispatch].
*/
public suspend fun suspendUntilNextDispatch()
}
Loading