Skip to content

Commit 35d2c34

Browse files
committed
Default scheduler for delay is rewritten with support for virtual time
Guide tests use virtual test time for faster & more predictable operation
1 parent 69d9c85 commit 35d2c34

File tree

21 files changed

+578
-217
lines changed

21 files changed

+578
-217
lines changed

coroutines-guide.md

+8-8
Original file line numberDiff line numberDiff line change
@@ -354,13 +354,13 @@ example shows:
354354
```kotlin
355355
fun main(args: Array<String>) = runBlocking<Unit> {
356356
val job = launch(CommonPool) {
357-
var nextPrintTime = 0L
357+
var nextPrintTime = System.currentTimeMillis()
358358
var i = 0
359359
while (i < 10) { // computation loop
360360
val currentTime = System.currentTimeMillis()
361361
if (currentTime >= nextPrintTime) {
362362
println("I'm sleeping ${i++} ...")
363-
nextPrintTime = currentTime + 500L
363+
nextPrintTime += 500L
364364
}
365365
}
366366
}
@@ -613,7 +613,7 @@ The answer is 42
613613
Completed in 2017 ms
614614
```
615615

616-
<!--- TEST FLEXIBLE_TIME -->
616+
<!--- TEST ARBITRARY_TIME -->
617617

618618
### Concurrent using async
619619

@@ -646,7 +646,7 @@ The answer is 42
646646
Completed in 1017 ms
647647
```
648648

649-
<!--- TEST FLEXIBLE_TIME -->
649+
<!--- TEST ARBITRARY_TIME -->
650650

651651
This is twice as fast, because we have concurrent execution of two coroutines.
652652
Note, that concurrency with coroutines is always explicit.
@@ -678,7 +678,7 @@ The answer is 42
678678
Completed in 2017 ms
679679
```
680680

681-
<!--- TEST FLEXIBLE_TIME -->
681+
<!--- TEST ARBITRARY_TIME -->
682682

683683
So, we are back to sequential execution, because we _first_ start and await for `one`, _and then_ start and await
684684
for `two`. It is not the intended use-case for laziness. It is designed as a replacement for
@@ -728,7 +728,7 @@ fun main(args: Array<String>) {
728728

729729
> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-compose-04.kt)
730730
731-
<!--- TEST FLEXIBLE_TIME
731+
<!--- TEST ARBITRARY_TIME
732732
The answer is 42
733733
Completed in 1085 ms
734734
-->
@@ -1371,13 +1371,13 @@ fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool)
13711371
}
13721372
```
13731373

1374-
Now let us launch five processors and let them work for a second. See what happens:
1374+
Now let us launch five processors and let them work for almost a second. See what happens:
13751375

13761376
```kotlin
13771377
fun main(args: Array<String>) = runBlocking<Unit> {
13781378
val producer = produceNumbers()
13791379
repeat(5) { launchProcessor(it, producer) }
1380-
delay(1000)
1380+
delay(950)
13811381
producer.cancel() // cancel producer coroutine and thus kill them all
13821382
}
13831383
```

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private class BlockingCoroutine<T>(
206206
private val blockedThread: Thread,
207207
private val privateEventLoop: Boolean
208208
) : AbstractCoroutine<T>(parentContext, true) {
209-
val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
209+
private val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop
210210

211211
init {
212212
if (privateEventLoop) require(eventLoop is EventLoopImpl)
@@ -219,15 +219,17 @@ private class BlockingCoroutine<T>(
219219

220220
@Suppress("UNCHECKED_CAST")
221221
fun joinBlocking(): T {
222+
timeSource.registerTimeLoopThread()
222223
while (true) {
223224
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
224225
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
225-
// note: process next even may look unpark flag, so check !isActive before parking
226+
// note: process next even may loose unpark flag, so check if completed before parking
226227
if (isCompleted) break
227-
LockSupport.parkNanos(this, parkNanos)
228+
timeSource.parkNanos(this, parkNanos)
228229
}
229230
// process queued events (that could have been added after last processNextEvent and before cancel
230231
if (privateEventLoop) (eventLoop as EventLoopImpl).shutdown()
232+
timeSource.unregisterTimeLoopThread()
231233
// now return result
232234
val state = this.state
233235
(state as? CompletedExceptionally)?.let { throw it.exception }

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt

+2-10
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
9494
* [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
9595
* This function is designed to be used only by the [CoroutineDispatcher] implementations themselves.
9696
* **It should not be used in general code**.
97-
*
98-
* The receiver [CoroutineDispatcher] of this function be equal to the context dispatcher or
99-
* [IllegalArgumentException] if thrown.
10097
*/
10198
public fun CoroutineDispatcher.resumeUndispatched(value: T)
10299

@@ -105,9 +102,6 @@ public interface CancellableContinuation<in T> : Continuation<T>, Job {
105102
* [dispatch][CoroutineDispatcher.dispatch] function of the [CoroutineDispatcher] in the [context].
106103
* This function is designed to be used only by the [CoroutineDispatcher] implementations themselves.
107104
* **It should not be used in general code**.
108-
*
109-
* The receiver [CoroutineDispatcher] of this function be equal to the context dispatcher or
110-
* [IllegalArgumentException] if thrown.
111105
*/
112106
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
113107
}
@@ -244,14 +238,12 @@ internal class CancellableContinuationImpl<in T>(
244238

245239
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
246240
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
247-
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
248-
resumeImpl(value, MODE_UNDISPATCHED)
241+
resumeImpl(value, if (dc.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
249242
}
250243

251244
override fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable) {
252245
val dc = delegate as? DispatchedContinuation ?: throw IllegalArgumentException("Must be used with DispatchedContinuation")
253-
check(dc.dispatcher === this) { "Must be invoked from the context CoroutineDispatcher"}
254-
resumeWithExceptionImpl(exception, MODE_UNDISPATCHED)
246+
resumeWithExceptionImpl(exception, if (dc.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
255247
}
256248

257249
override fun toString(): String = super.toString() + "[${delegate.toDebugString()}]"

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,11 @@ object CommonPool : CoroutineDispatcher() {
6060
_pool ?: createPool().also { _pool = it }
6161

6262
override fun dispatch(context: CoroutineContext, block: Runnable) =
63-
try { (_pool ?: getOrCreatePoolSync()).execute(block) }
64-
catch (e: RejectedExecutionException) { defaultExecutor.execute(block) }
63+
try { (_pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
64+
catch (e: RejectedExecutionException) {
65+
timeSource.unTrackTask()
66+
DefaultExecutor.execute(block)
67+
}
6568

6669
// used for tests
6770
@Synchronized
@@ -78,7 +81,7 @@ object CommonPool : CoroutineDispatcher() {
7881
shutdown()
7982
if (timeout > 0)
8083
awaitTermination(timeout, TimeUnit.MILLISECONDS)
81-
shutdownNow().forEach { defaultExecutor.execute(it) }
84+
shutdownNow().forEach { DefaultExecutor.execute(it) }
8285
}
8386
_pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") }
8487
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/DefaultExecutor.kt

+111-32
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,125 @@
1616

1717
package kotlinx.coroutines.experimental
1818

19-
import java.util.concurrent.ScheduledExecutorService
20-
import java.util.concurrent.ScheduledThreadPoolExecutor
2119
import java.util.concurrent.TimeUnit
2220

23-
private const val DEFAULT_KEEP_ALIVE = 1000L
21+
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
22+
internal object DefaultExecutor : EventLoopBase(), Runnable {
2423

25-
private val KEEP_ALIVE =
26-
try { java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) }
27-
catch (e: SecurityException) { DEFAULT_KEEP_ALIVE }
24+
override val canComplete: Boolean get() = false
25+
override val isCompleted: Boolean get() = false
2826

29-
@Volatile
30-
private var _executor: ScheduledExecutorService? = null
27+
private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
3128

32-
internal val defaultExecutor: ScheduledExecutorService
33-
get() = _executor ?: getOrCreateExecutorSync()
29+
private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
30+
try {
31+
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
32+
} catch (e: SecurityException) {
33+
DEFAULT_KEEP_ALIVE
34+
})
35+
36+
@Volatile
37+
private var _thread: Thread? = null
38+
39+
private const val FRESH = 0
40+
private const val ACTIVE = 1
41+
private const val SHUTDOWN_REQ = 2
42+
private const val SHUTDOWN_ACK = 3
43+
44+
@Volatile
45+
private var debugStatus: Int = FRESH
3446

35-
@Synchronized
36-
private fun getOrCreateExecutorSync(): ScheduledExecutorService =
37-
_executor ?: ScheduledThreadPoolExecutor(1) { r ->
38-
Thread(r, "kotlinx.coroutines.DefaultExecutor").apply { isDaemon = true }
39-
}.apply {
40-
setKeepAliveTime(KEEP_ALIVE, TimeUnit.MILLISECONDS)
41-
allowCoreThreadTimeOut(true)
42-
executeExistingDelayedTasksAfterShutdownPolicy = false
43-
// "setRemoveOnCancelPolicy" is available only since JDK7, so try it via reflection
47+
override fun run() {
48+
var shutdownNanos = Long.MAX_VALUE
49+
timeSource.registerTimeLoopThread()
50+
notifyStartup()
4451
try {
45-
val m = this::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.javaPrimitiveType)
46-
m.invoke(this, true)
47-
} catch (ex: Throwable) { /* ignore */ }
48-
_executor = this
52+
runLoop@ while (true) {
53+
Thread.interrupted() // just reset interruption flag
54+
var parkNanos = processNextEvent()
55+
if (parkNanos == Long.MAX_VALUE) {
56+
// nothing to do, initialize shutdown timeout
57+
if (shutdownNanos == Long.MAX_VALUE) {
58+
val now = timeSource.nanoTime()
59+
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
60+
val tillShutdown = shutdownNanos - now
61+
if (tillShutdown <= 0) break@runLoop // shut thread down
62+
parkNanos = parkNanos.coerceAtMost(tillShutdown)
63+
} else
64+
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
65+
}
66+
if (parkNanos > 0) {
67+
// check if shutdown was requested and bail out in this case
68+
if (debugStatus == SHUTDOWN_REQ) {
69+
acknowledgeShutdown()
70+
break@runLoop
71+
} else {
72+
timeSource.parkNanos(this, parkNanos)
73+
}
74+
}
75+
}
76+
} finally {
77+
_thread = null // this thread is dead
78+
timeSource.unregisterTimeLoopThread()
79+
// recheck if queues are empty after _thread reference was set to null (!!!)
80+
if (!isEmpty) thread() // recreate thread if it is needed
81+
}
4982
}
5083

51-
// used for tests
52-
@Synchronized
53-
internal fun shutdownDefaultExecutor(timeout: Long) {
54-
_executor?.apply {
55-
shutdown()
56-
awaitTermination(timeout, TimeUnit.MILLISECONDS)
57-
shutdownNow() // ignore all remaining
58-
_executor = null
84+
// ensure that thread is there
85+
private fun thread(): Thread = _thread ?: createThreadSync()
86+
87+
@Synchronized
88+
private fun createThreadSync() = _thread ?:
89+
Thread(this, "kotlinx.coroutines.DefaultExecutor").apply {
90+
_thread = this
91+
isDaemon = true
92+
start()
93+
}
94+
95+
override fun unpark() {
96+
timeSource.unpark(thread()) // as a side effect creates thread if it is not there
5997
}
60-
}
6198

99+
override fun isCorrectThread(): Boolean = true
100+
101+
// used for tests
102+
@Synchronized
103+
internal fun ensureStarted() {
104+
assert(_thread == null) // ensure we are at a clean state
105+
debugStatus = FRESH
106+
createThreadSync() // create fresh thread
107+
while (debugStatus == FRESH) (this as Object).wait()
108+
}
109+
110+
@Synchronized
111+
private fun notifyStartup() {
112+
debugStatus = ACTIVE
113+
(this as Object).notifyAll()
114+
}
115+
116+
// used for tests
117+
@Synchronized
118+
internal fun shutdown(timeout: Long) {
119+
if (_thread != null) {
120+
val deadline = System.currentTimeMillis() + timeout
121+
if (debugStatus == ACTIVE) debugStatus = SHUTDOWN_REQ
122+
unpark()
123+
// loop while there is anything to do immediately or deadline passes
124+
while (debugStatus != SHUTDOWN_ACK && _thread != null) {
125+
val remaining = deadline - System.currentTimeMillis()
126+
if (remaining <= 0) break
127+
(this as Object).wait(timeout)
128+
}
129+
}
130+
// restore fresh status
131+
debugStatus = FRESH
132+
}
133+
134+
@Synchronized
135+
private fun acknowledgeShutdown() {
136+
debugStatus = SHUTDOWN_ACK
137+
clearAll() // clear queues
138+
(this as Object).notifyAll()
139+
}
140+
}

kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt

+6-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import kotlinx.coroutines.experimental.selects.select
2121
import java.util.concurrent.Future
2222
import java.util.concurrent.TimeUnit
2323
import kotlin.coroutines.experimental.ContinuationInterceptor
24+
import kotlin.coroutines.experimental.CoroutineContext
2425

2526
/**
2627
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
@@ -67,7 +68,7 @@ public interface Delay {
6768
* This implementation uses a built-in single-threaded scheduled executor service.
6869
*/
6970
fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
70-
DisposableFutureHandle(defaultExecutor.schedule(block, time, unit))
71+
DefaultExecutor.invokeOnTimeout(time, unit, block)
7172
}
7273

7374
/**
@@ -85,10 +86,7 @@ suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
8586
require(time >= 0) { "Delay time $time cannot be negative" }
8687
if (time <= 0) return // don't delay
8788
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
88-
val delay = cont.context[ContinuationInterceptor] as? Delay
89-
if (delay != null)
90-
delay.scheduleResumeAfterDelay(time, unit, cont) else
91-
cont.cancelFutureOnCompletion(defaultExecutor.schedule(ResumeRunnable(cont), time, unit))
89+
cont.context.delay.scheduleResumeAfterDelay(time, unit, cont)
9290
}
9391
}
9492

@@ -101,3 +99,6 @@ public class DisposableFutureHandle(private val future: Future<*>) : DisposableH
10199
}
102100
override fun toString(): String = "DisposableFutureHandle[$future]"
103101
}
102+
103+
/** Returns [Delay] implementation of the given context */
104+
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultExecutor

0 commit comments

Comments
 (0)