-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathEventLoop.common.kt
546 lines (478 loc) · 21.6 KB
/
EventLoop.common.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlin.concurrent.Volatile
import kotlin.coroutines.*
import kotlin.jvm.*
/**
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
*
* It may optionally implement [Delay] interface and support time-scheduled tasks.
* It is created or pigged back onto (see [ThreadLocalEventLoop])
* by `runBlocking` and by [Dispatchers.Unconfined].
*
* @suppress **This an internal API and should not be used from general code.**
*/
internal abstract class EventLoop : CoroutineDispatcher() {
/**
* Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
*/
private var useCount = 0L
/**
* Set to true on any use by `runBlocking`, because it potentially leaks this loop to other threads, so
* this instance must be properly shutdown. We don't need to shutdown event loop that was used solely
* by [Dispatchers.Unconfined] -- it can be left as [ThreadLocalEventLoop] and reused next time.
*/
private var shared = false
/**
* Queue used by [Dispatchers.Unconfined] tasks.
* These tasks are thread-local for performance and take precedence over the rest of the queue.
*/
private var unconfinedQueue: ArrayDeque<DispatchedTask<*>>? = null
/**
* Processes next event in this event loop.
*
* The result of this function is to be interpreted like this:
* - `<= 0` -- there are potentially more events for immediate processing;
* - `> 0` -- a number of nanoseconds to wait for next scheduled event;
* - [Long.MAX_VALUE] -- no more events.
*
* **NOTE**: Must be invoked only from the event loop's thread
* (no check for performance reasons, may be added in the future).
*/
open fun processNextEvent(): Long {
if (!processUnconfinedEvent()) return Long.MAX_VALUE
return 0
}
protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
protected open val nextTime: Long
get() {
val queue = unconfinedQueue ?: return Long.MAX_VALUE
return if (queue.isEmpty()) Long.MAX_VALUE else 0L
}
fun processUnconfinedEvent(): Boolean {
val queue = unconfinedQueue ?: return false
val task = queue.removeFirstOrNull() ?: return false
task.run()
return true
}
/**
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
* By default, event loop implementation is thread-local and should not processed in the context
* (current thread's event loop should be processed instead).
*/
open fun shouldBeProcessedFromContext(): Boolean = false
/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
* into the current event loop.
*/
fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?:
ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
queue.addLast(task)
}
val isActive: Boolean
get() = useCount > 0
val isUnconfinedLoopActive: Boolean
get() = useCount >= delta(unconfined = true)
// May only be used from the event loop's thread
val isUnconfinedQueueEmpty: Boolean
get() = unconfinedQueue?.isEmpty() ?: true
private fun delta(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L
fun incrementUseCount(unconfined: Boolean = false) {
useCount += delta(unconfined)
if (!unconfined) shared = true
}
fun decrementUseCount(unconfined: Boolean = false) {
useCount -= delta(unconfined)
if (useCount > 0) return
assert { useCount == 0L } // "Extra decrementUseCount"
if (shared) {
// shut it down and remove from ThreadLocalEventLoop
shutdown()
}
}
final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
return namedOrThis(name) // Single-threaded, short-circuit
}
open fun shutdown() {}
}
internal object ThreadLocalEventLoop {
private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal fun currentOrNull(): EventLoop? =
ref.get()
internal fun resetEventLoop() {
ref.set(null)
}
internal fun setEventLoop(eventLoop: EventLoop) {
ref.set(eventLoop)
}
}
private val DISPOSED_TASK = Symbol("REMOVED_TASK")
// results for scheduleImpl
private const val SCHEDULE_OK = 0
private const val SCHEDULE_COMPLETED = 1
private const val SCHEDULE_DISPOSED = 2
private const val MS_TO_NS = 1_000_000L
private const val MAX_MS = Long.MAX_VALUE / MS_TO_NS
/**
* First-line overflow protection -- limit maximal delay.
* Delays longer than this one (~146 years) are considered to be delayed "forever".
*/
private const val MAX_DELAY_NS = Long.MAX_VALUE / 2
internal fun delayToNanos(timeMillis: Long): Long = when {
timeMillis <= 0 -> 0L
timeMillis >= MAX_MS -> Long.MAX_VALUE
else -> timeMillis * MS_TO_NS
}
internal fun delayNanosToMillis(timeNanos: Long): Long =
timeNanos / MS_TO_NS
private val CLOSED_EMPTY = Symbol("CLOSED_EMPTY")
private typealias Queue<T> = LockFreeTaskQueueCore<T>
internal expect abstract class EventLoopImplPlatform() : EventLoop {
// Called to unpark this event loop's thread
protected fun unpark()
// Called to reschedule to DefaultExecutor when this event loop is complete
protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
}
internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// null | CLOSED_EMPTY | task | Queue<Runnable>
private val _queue = atomic<Any?>(null)
// Allocated only only once
private val _delayed = atomic<DelayedTaskQueue?>(null)
private val _isCompleted = atomic(false)
private var isCompleted
get() = _isCompleted.value
set(value) { _isCompleted.value = value }
override val isEmpty: Boolean get() {
if (!isUnconfinedQueueEmpty) return false
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) return false
return when (val queue = _queue.value) {
null -> true
is Queue<*> -> queue.isEmpty
else -> queue === CLOSED_EMPTY
}
}
override val nextTime: Long
get() {
if (super.nextTime == 0L) return 0L
val queue = _queue.value
when {
queue === null -> {} // empty queue -- proceed
queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
else -> return 0 // non-empty queue
}
val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
}
override fun shutdown() {
// Clean up thread-local reference here -- this event loop is shutting down
ThreadLocalEventLoop.resetEventLoop()
// We should signal that this event loop should not accept any more tasks
// and process queued events (that could have been added after last processNextEvent)
isCompleted = true
closeQueue()
// complete processing of all queued tasks
while (processNextEvent() <= 0) { /* spin */ }
// reschedule the rest of delayed tasks
rescheduleAllDelayed()
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
/*
* Order is important here: first we schedule the heap and only then
* publish it to continuation. Otherwise, `DelayedResumeTask` would
* have to know how to be disposed of even when it wasn't scheduled yet.
*/
schedule(now, task)
continuation.disposeOnCancellation(task)
}
}
}
protected fun scheduleInvokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val timeNanos = delayToNanos(timeMillis)
return if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedRunnableTask(now + timeNanos, block).also { task ->
schedule(now, task)
}
} else {
NonDisposableHandle
}
}
override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return 0
// queue all delayed tasks that are due to be executed
enqueueDelayedTasks()
// then process one event from queue
val task = dequeue()
if (task != null) {
platformAutoreleasePool { task.run() }
return 0
}
return nextTime
}
final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
open fun enqueue(task: Runnable) {
// are there some delayed tasks that should execute before this one? If so, move them to the queue first.
enqueueDelayedTasks()
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
}
}
@Suppress("UNCHECKED_CAST")
private fun enqueueImpl(task: Runnable): Boolean {
_queue.loop { queue ->
if (isCompleted) return false // fail fast if already completed, may still add, but queues will close
when (queue) {
null -> if (_queue.compareAndSet(null, task)) return true
is Queue<*> -> {
when ((queue as Queue<Runnable>).addLast(task)) {
Queue.ADD_SUCCESS -> return true
Queue.ADD_CLOSED -> return false
Queue.ADD_FROZEN -> _queue.compareAndSet(queue, queue.next())
}
}
else -> when {
queue === CLOSED_EMPTY -> return false
else -> {
// update to full-blown queue to add one more
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
newQueue.addLast(task)
if (_queue.compareAndSet(queue, newQueue)) return true
}
}
}
}
}
@Suppress("UNCHECKED_CAST")
private fun dequeue(): Runnable? {
_queue.loop { queue ->
when (queue) {
null -> return null
is Queue<*> -> {
val result = (queue as Queue<Runnable>).removeFirstOrNull()
if (result !== Queue.REMOVE_FROZEN) return result as Runnable?
_queue.compareAndSet(queue, queue.next())
}
else -> when {
queue === CLOSED_EMPTY -> return null
else -> if (_queue.compareAndSet(queue, null)) return queue as Runnable
}
}
}
}
/** Move all delayed tasks that are due to the main queue. */
private fun enqueueDelayedTasks() {
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
}
private fun closeQueue() {
assert { isCompleted }
_queue.loop { queue ->
when (queue) {
null -> if (_queue.compareAndSet(null, CLOSED_EMPTY)) return
is Queue<*> -> {
queue.close()
return
}
else -> when {
queue === CLOSED_EMPTY -> return
else -> {
// update to full-blown queue to close
val newQueue = Queue<Runnable>(Queue.INITIAL_CAPACITY, singleConsumer = true)
newQueue.addLast(queue as Runnable)
if (_queue.compareAndSet(queue, newQueue)) return
}
}
}
}
}
fun schedule(now: Long, delayedTask: DelayedTask) {
when (scheduleImpl(now, delayedTask)) {
SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
else -> error("unexpected result")
}
}
private fun shouldUnpark(task: DelayedTask): Boolean = _delayed.value?.peek() === task
private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
if (isCompleted) return SCHEDULE_COMPLETED
val delayedQueue = _delayed.value ?: run {
_delayed.compareAndSet(null, DelayedTaskQueue(now))
_delayed.value!!
}
return delayedTask.scheduleTask(now, delayedQueue, this)
}
// It performs "hard" shutdown for test cleanup purposes
protected fun resetAll() {
_queue.value = null
_delayed.value = null
}
// This is a "soft" (normal) shutdown
private fun rescheduleAllDelayed() {
val now = nanoTime()
while (true) {
/*
* `removeFirstOrNull` below is the only operation on DelayedTask & ThreadSafeHeap that is not
* synchronized on DelayedTask itself. All other operation are synchronized both on
* DelayedTask & ThreadSafeHeap instances (in this order). It is still safe, because `dispose`
* first removes DelayedTask from the heap (under synchronization) then
* assign "_heap = DISPOSED_TASK", so there cannot be ever a race to _heap reference update.
*/
val delayedTask = _delayed.value?.removeFirstOrNull() ?: break
reschedule(now, delayedTask)
}
}
internal abstract class DelayedTask(
/**
* This field can be only modified in [scheduleTask] before putting this DelayedTask
* into heap to avoid overflow and corruption of heap data structure.
*/
@JvmField var nanoTime: Long
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode, SynchronizedObject() {
@Volatile
private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK
override var heap: ThreadSafeHeap<*>?
get() = _heap as? ThreadSafeHeap<*>
set(value) {
require(_heap !== DISPOSED_TASK) // this can never happen, it is always checked before adding/removing
_heap = value
}
override var index: Int = -1
override fun compareTo(other: DelayedTask): Int {
val dTime = nanoTime - other.nanoTime
return when {
dTime > 0 -> 1
dTime < 0 -> -1
else -> 0
}
}
fun timeToExecute(now: Long): Boolean = now - nanoTime >= 0L
fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int = synchronized<Int>(this) {
if (_heap === DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
delayed.addLastIf(this) { firstTask ->
if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
/**
* We are about to add new task and we have to make sure that [DelayedTaskQueue]
* invariant is maintained. The code in this lambda is additionally executed under
* the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
*/
if (firstTask == null) {
/**
* When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
* the current now time even if that means "going backwards in time". This makes the structure
* self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
* are removed from the delayed queue for execution.
*/
delayed.timeNow = now
} else {
/**
* Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
* and only goes forward in time. We cannot let it go backwards in time or invariant can be
* violated for tasks that were already scheduled.
*/
val firstTime = firstTask.nanoTime
// compute min(now, firstTime) using a wrap-safe check
val minTime = if (firstTime - now >= 0) now else firstTime
// update timeNow only when going forward in time
if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
}
/**
* Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
* task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
* function can be called to reschedule from one queue to another and this might be another reason
* where new task's time might now violate invariant.
* We correct invariant violation (if any) by simply changing this task's time to now.
*/
if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
true
}
return SCHEDULE_OK
}
final override fun dispose(): Unit = synchronized(this) {
val heap = _heap
if (heap === DISPOSED_TASK) return // already disposed
(heap as? DelayedTaskQueue)?.remove(this) // remove if it is in heap (first)
_heap = DISPOSED_TASK // never add again to any heap
}
override fun toString(): String = "Delayed[nanos=$nanoTime]"
}
private inner class DelayedResumeTask(
nanoTime: Long,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
override fun run() { with(cont) { resumeUndispatched(Unit) } }
override fun toString(): String = super.toString() + cont.toString()
}
private class DelayedRunnableTask(
nanoTime: Long,
private val block: Runnable
) : DelayedTask(nanoTime) {
override fun run() { block.run() }
override fun toString(): String = super.toString() + block.toString()
}
/**
* Delayed task queue maintains stable time-comparision invariant despite potential wraparounds in
* long nano time measurements by maintaining last observed [timeNow]. It protects the integrity of the
* heap data structure in spite of potential non-monotonicity of `nanoTime()` source.
* The invariant is that for every scheduled [DelayedTask]:
*
* ```
* delayedTask.nanoTime - timeNow >= 0
* ```
*
* So the comparison of scheduled tasks via [DelayedTask.compareTo] is always stable as
* scheduled [DelayedTask.nanoTime] can be at most [Long.MAX_VALUE] apart. This invariant is maintained when
* new tasks are added by [DelayedTask.scheduleTask] function and it cannot be violated when tasks are removed
* (so there is nothing special to do there).
*/
internal class DelayedTaskQueue(
@JvmField var timeNow: Long
) : ThreadSafeHeap<DelayedTask>()
}
internal expect fun createEventLoop(): EventLoop
internal expect fun nanoTime(): Long
internal expect object DefaultExecutor {
fun enqueue(task: Runnable)
}
/**
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
* non-Darwin native targets.
*
* Coroutines on Darwin targets can call into the Objective-C world, where a callee may push a to-be-returned object to
* the Autorelease Pool, so as to avoid a premature ARC release before it reaches the caller. This means the pool must
* be eventually drained to avoid leaks. Since Kotlin Coroutines does not use [NSRunLoop], which provides automatic
* pool management, it must manage the pool creation and pool drainage manually.
*/
internal expect inline fun platformAutoreleasePool(crossinline block: () -> Unit)