Skip to content

Explore paused mode only for TestCoroutineDispatcher. #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: coroutines-test
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 36 additions & 48 deletions kotlinx-coroutines-test/src/DelayController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ public interface DelayController {
/**
* Moves the Dispatcher's virtual clock forward by a specified amount of time.
*
* The amount the clock is progressed may be larger than the requested `delayTimeMillis` if the code under test uses
* blocking coroutines.
*
* The virtual clock time will advance once for each delay resumed until the next delay exceeds the requested
* `delayTimeMills`. In the following test, the virtual time will progress by 2_000 then 1 to resume three different
* calls to delay.
Expand Down Expand Up @@ -60,7 +57,7 @@ public interface DelayController {
/**
* Immediately execute all pending tasks and advance the virtual clock-time to the last delay.
*
* If new tasks are scheduled due to advancing virtual time, they will be executed before `advanceUntilIdle`
* If new tasks are scheduled while advancing virtual time, they will be executed before `advanceUntilIdle`
* returns.
*
* @return the amount of delay-time that this Dispatcher's clock has been forwarded in milliseconds.
Expand All @@ -87,63 +84,54 @@ public interface DelayController {
public fun cleanupTestCoroutines()

/**
* Run a block of code in a paused dispatcher.
* Suspends until at least one task may be executed by calling [runCurrent].
*
* By pausing the dispatcher any new coroutines will not execute immediately. After block executes, the dispatcher
* will resume auto-advancing.
* If this method returns normally, there must be at least one task that can be executed by calling [runCurrent]. It
* will return immediately if there is already a task in the queue for the current time.
*
* This is useful when testing functions that start a coroutine. By pausing the dispatcher assertions or
* setup may be done between the time the coroutine is created and started.
* This is useful when another dispatcher is currently processing a coroutine and is expected to dispatch the result
* to this dispatcher. This most commonly happens due to calls to [withContext] or [Deferred.await].
*
* While in the paused block, the dispatcher will queue all dispatched coroutines and they will be resumed on
* whatever thread calls [advanceUntilIdle], [advanceTimeBy], or [runCurrent].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public suspend fun pauseDispatcher(block: suspend () -> Unit)

/**
* Pause the dispatcher.
* Note: You should not need to call this function from inside [runBlockingTest] as it calls it implicitly, but it
* is required for thread coordination in in tests that don't use `runBlockingTest` and interact with multiple
* threads.
*
* When paused, the dispatcher will not execute any coroutines automatically, and you must call [runCurrent] or
* [advanceTimeBy], or [advanceUntilIdle] to execute coroutines.
* ```
* val otherDispatcher = // single threaded dispatcher
*
* While paused, the dispatcher will queue all dispatched coroutines and they will be resumed on whatever thread
* calls [advanceUntilIdle], [advanceTimeBy], or [runCurrent].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun pauseDispatcher()

/**
* Resume the dispatcher from a paused state.
* suspend fun switchingThreads() {
* withContext(otherDispatcher) {
* // this is not executing on TestCoroutineDispatcher
* database.query()
* }
* println("run me after waiting for withContext")
* }
*
* Resumed dispatchers will automatically progress through all coroutines scheduled at the current time. To advance
* time and execute coroutines scheduled in the future use, one of [advanceTimeBy],
* or [advanceUntilIdle].
* @Test whenSwitchingThreads_resultsBecomeAvailable() {
* val scope = TestCoroutineScope()
*
* When the dispatcher is resumed, all execution be immediate in the thread that triggered it similar to
* [Dispatchers.Unconfined]. This means that the following code will not switch back from Dispatchers.IO after
* `withContext`
* val job = scope.launch {
* switchingThreads()
* }
*
* ```
* runBlockingTest {
* withContext(Dispatchers.IO) { doIo() }
* // runBlockingTest is still on Dispatchers.IO here
* scope.runCurrent() // run to withContext, which will dispatch on otherDispatcher
* runBlocking {
* // wait for otherDispatcher to return control of the coroutine to TestCoroutineDispatcher
* scope.waitForDispatcherBusy(2_000) // throws timeout exception if withContext doesn't finish in 2_000ms
* }
* scope.runCurrent() // run the dispatched task (everything after withContext)
* // job.isCompleted == true
* }
* ```
*
* For tests that need accurate threading behavior, [pauseDispatcher] will ensure that the following test dispatches
* on a controlled thread.
* Whenever possible, it is preferred to inject [TestCoroutineDispatcher] to the [withContext] call instead of
* calling `waitForDispatcherBusy` as it creates a single threaded test and avoids thread synchronization issues.
*
* ```
* runBlockingTest {
* pauseDispatcher()
* withContext(Dispatchers.IO) { doIo() }
* // runBlockingTest has returned to it's starting thread here
* }
* ```
* Calling this method will never change the virtual-time.
*
* @param timeoutMills how long to wait for a task to be dispatched to this dispatcher.
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun resumeDispatcher()
suspend fun waitForDispatcherBusy(timeoutMills: Long)
}

/**
Expand Down
49 changes: 26 additions & 23 deletions kotlinx-coroutines-test/src/TestBuilders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import kotlin.coroutines.*
private const val DEFAULT_WAIT_FOR_OTHER_DISPATCHERS = 30_000L

/**
* Executes a [testBody] inside an immediate execution dispatcher.
* Executes a [testBody] inside an dispatcher that gurantees controlled, repeatable execution.
*
* This is similar to [runBlocking] but it will immediately progress past delays and into [launch] and [async] blocks.
* You can use this to write tests that execute in the presence of calls to [delay] without causing your test to take
* extra time.
* This is similar to [runBlocking] but it uses [TestCoroutineScope] to allow explict control over execution using the
* [DelayController] interface. When used for single-threaded testing, the ordering of execution is guranteed to be
* determistic (that means it always executes in the same order).
*
* When using for multi-threaded testing (e.g. calls to [withContext]), [runBlockingTest] will wait for the other
* dispatcher to return control then resume execution.
*
* ```
* @Test
Expand All @@ -27,7 +30,7 @@ private const val DEFAULT_WAIT_FOR_OTHER_DISPATCHERS = 30_000L
* delay(1_000)
* }.await()
* }
*
* advanceTimeBy(2_000)
* deferred.await() // result available immediately
* }
*
Expand All @@ -36,7 +39,8 @@ private const val DEFAULT_WAIT_FOR_OTHER_DISPATCHERS = 30_000L
* This method requires that all coroutines launched inside [testBody] complete, or are cancelled, as part of the test
* conditions.
*
* Unhandled exceptions thrown by coroutines in the test will be re-thrown at the end of the test.
* Unhandled exceptions thrown by coroutines started in the [TestCoroutineScope] will be re-thrown at the end of the
* test.
*
* @throws UncompletedCoroutinesError If the [testBody] does not complete (or cancel) all coroutines that it launches
* (including coroutines suspended on join/await).
Expand Down Expand Up @@ -64,7 +68,7 @@ public fun runBlockingTest(
localTestScope.testBody()
}

val didTimeout = deferred.waitForCompletion(waitForOtherDispatchers, dispatcher, dispatcher as IdleWaiter)
val didTimeout = deferred.waitForCompletion(waitForOtherDispatchers, dispatcher)

if (deferred.isCompleted) {
deferred.getCompletionExceptionOrNull()?.let {
Expand All @@ -87,14 +91,27 @@ public fun runBlockingTest(
}
}

private fun Deferred<Unit>.waitForCompletion(wait: Long, delayController: DelayController, park: IdleWaiter): Boolean {
/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineScope].
*/
// todo: need documentation on how this extension is supposed to be used
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineScope.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, waitForOtherDispatchers, block)

/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineDispatcher.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, waitForOtherDispatchers, block)

private fun Deferred<Unit>.waitForCompletion(wait: Long, delayController: DelayController): Boolean {
var didTimeout = false

runBlocking {
val unparkChannel = Channel<Unit>(1)
val job = launch {
while(true) {
park.suspendUntilNextDispatch()
delayController.waitForDispatcherBusy(wait)
unparkChannel.send(Unit)
}
}
Expand All @@ -121,24 +138,10 @@ private fun CoroutineContext.activeJobs(): Set<Job> {
return checkNotNull(this[Job]).children.filter { it.isActive }.toSet()
}

/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineScope].
*/
// todo: need documentation on how this extension is supposed to be used
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineScope.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, waitForOtherDispatchers, block)

/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineDispatcher.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, waitForOtherDispatchers, block)

private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, DelayController> {
// TODO optimize it
val dispatcher = get(ContinuationInterceptor).run {
this?.let { require(this is DelayController) { "Dispatcher must implement DelayController: $this" } }
this?.let { require(this is IdleWaiter) { "Dispatcher must implement IdleWaiter" } }
this ?: TestCoroutineDispatcher()
}

Expand Down
86 changes: 20 additions & 66 deletions kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,22 @@ import kotlin.coroutines.CoroutineContext
import kotlin.math.max

/**
* [CoroutineDispatcher] that performs both immediate and lazy execution of coroutines in tests
* [CoroutineDispatcher] that offers lazy, determistic, execution of coroutines in tests
* and implements [DelayController] to control its virtual clock.
*
* By default, [TestCoroutineDispatcher] is immediate. That means any tasks scheduled to be run without delay are
* immediately executed. If they were scheduled with a delay, the virtual clock-time must be advanced via one of the
* methods on [DelayController].
* Any coroutines started via [launch] or [async] will not execute until a call to [DelayController.runCurrent] or the
* virtual clock-time has been advanced via one of the methods on [DelayController].
*
* When switched to lazy execution using [pauseDispatcher] any coroutines started via [launch] or [async] will
* not execute until a call to [DelayController.runCurrent] or the virtual clock-time has been advanced via one of the
* methods on [DelayController].
*
* While in immediate mode [TestCoroutineDispatcher] behaves similar to [Dispatchers.Unconfined]. When resuming from
* another thread it will *not* switch threads. When in lazy mode, [TestCoroutineDispatcher] will enqueue all
* dispatches and whatever thread calls an [advanceUntilIdle], [advanceTimeBy], or [runCurrent] will continue execution.
* [TestCoroutineDispatcher] does not hold a thread, so if a coroutine switches to another thread via [withContext] or
* similar, this dispatcher will not automatically wait for the other thread to pass control back. Tests can wait for
* the other dispatcher by calling [DelayController.waitForDispatcherBusy], then call [DelayController.runCurrent] to
* run the dispatched task from the test thread. Tests that use [runBlockingTest] do not need to call
* [DelayController.waitForDispatcherBusy].
*
* @see DelayController
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayController, IdleWaiter {
private var dispatchImmediately = true
set(value) {
field = value
if (value) {
// there may already be tasks from setup code we need to run
advanceUntilIdle()
}
}
public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayController {

// The ordered queue for the runnable tasks.
private val queue = ThreadSafeHeap<TimedRunnable>()
Expand All @@ -55,12 +44,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl

/** @suppress */
override fun dispatch(context: CoroutineContext, block: Runnable) {
if (dispatchImmediately) {
block.run()
unpark()
} else {
post(block)
}
post(block)
}

/** @suppress */
Expand Down Expand Up @@ -155,27 +139,6 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
doActionsUntil(currentTime)
}

/** @suppress */
override suspend fun pauseDispatcher(block: suspend () -> Unit) {
val previous = dispatchImmediately
dispatchImmediately = false
try {
block()
} finally {
dispatchImmediately = previous
}
}

/** @suppress */
override fun pauseDispatcher() {
dispatchImmediately = false
}

/** @suppress */
override fun resumeDispatcher() {
dispatchImmediately = true
}

/** @suppress */
override fun cleanupTestCoroutines() {
unpark()
Expand All @@ -200,8 +163,16 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
}
}

override suspend fun suspendUntilNextDispatch() {
waitLock.receive()
override suspend fun waitForDispatcherBusy(timeoutMills: Long) {
withTimeout(timeoutMills) {
while (true) {
val nextTime = queue.peek()?.time
if (nextTime != null && nextTime <= currentTime) {
break
}
waitLock.receive()
}
}
}

private fun unpark() {
Expand Down Expand Up @@ -239,20 +210,3 @@ private class TimedRunnable(

override fun toString() = "TimedRunnable(time=$time, run=$runnable)"
}

/**
* Alternative implementations of [TestCoroutineDispatcher] must implement this interface in order to be supported by
* [runBlockingTest].
*
* This interface allows external code to suspend itself until the next dispatch is received. This is similar to park in
* a normal event loop, but doesn't require that [TestCoroutineDispatcher] block a thread while parked.
*/
interface IdleWaiter {
/**
* Attempt to suspend until the next dispatch is received.
*
* This method may resume immediately if any dispatch was received since the last time it was called. This ensures
* that dispatches won't be dropped if they happen just before calling [suspendUntilNextDispatch].
*/
public suspend fun suspendUntilNextDispatch()
}
Loading