Skip to content

Commit b1751db

Browse files
committed
Fix DefaultExecutor not being able to exit
Solves #856.
1 parent 3fdd3fe commit b1751db

File tree

2 files changed

+32
-16
lines changed

2 files changed

+32
-16
lines changed

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+21-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import kotlin.native.concurrent.*
2323
internal abstract class EventLoop : CoroutineDispatcher() {
2424
/**
2525
* Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
26+
* There are two 32-bit counters encoded in this 64-bit value, allowing to count [Dispatchers.Unconfined]
27+
* separately from `runBlocking`; see [delta] and its uses.
2628
*/
2729
private var useCount = 0L
2830

@@ -51,7 +53,8 @@ internal abstract class EventLoop : CoroutineDispatcher() {
5153
* (no check for performance reasons, may be added in the future).
5254
*/
5355
public open fun processNextEvent(): Long {
54-
if (!processUnconfinedEvent()) return Long.MAX_VALUE
56+
val task = popNextTask() ?: return Long.MAX_VALUE
57+
task.run()
5558
return nextTime
5659
}
5760

@@ -64,11 +67,17 @@ internal abstract class EventLoop : CoroutineDispatcher() {
6467
}
6568

6669
public fun processUnconfinedEvent(): Boolean {
67-
val queue = unconfinedQueue ?: return false
68-
val task = queue.removeFirstOrNull() ?: return false
70+
val task = popUnconfinedTask() ?: return false
6971
task.run()
7072
return true
7173
}
74+
75+
public fun popUnconfinedTask(): DispatchedTask<*>? =
76+
unconfinedQueue?.removeFirstOrNull()
77+
78+
protected open fun popNextTask(onlyUnconfined: Boolean = false): Runnable? =
79+
popUnconfinedTask()
80+
7281
/**
7382
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
7483
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
@@ -249,9 +258,15 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
249258
}
250259
}
251260

252-
override fun processNextEvent(): Long {
261+
override fun popNextTask(onlyUnconfined: Boolean): Runnable? {
262+
val unconfined = popUnconfinedTask()
263+
if (onlyUnconfined) {
264+
return unconfined
265+
}
253266
// unconfined events take priority
254-
if (processUnconfinedEvent()) return nextTime
267+
if (unconfined != null) {
268+
return unconfined
269+
}
255270
// queue all delayed tasks that are due to be executed
256271
val delayed = _delayed.value
257272
if (delayed != null && !delayed.isEmpty) {
@@ -269,8 +284,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
269284
}
270285
}
271286
// then process one event from queue
272-
dequeue()?.run()
273-
return nextTime
287+
return dequeue()
274288
}
275289

276290
public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

+11-9
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,19 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
6565
if (!notifyStartup()) return
6666
while (true) {
6767
Thread.interrupted() // just reset interruption flag
68-
var parkNanos = processNextEvent()
68+
val nextTask = popNextTask()
69+
if (nextTask != null) {
70+
shutdownNanos = Long.MAX_VALUE
71+
nextTask.run()
72+
}
73+
var parkNanos = nextTime
6974
if (parkNanos == Long.MAX_VALUE) {
7075
// nothing to do, initialize shutdown timeout
71-
if (shutdownNanos == Long.MAX_VALUE) {
72-
val now = nanoTime()
73-
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
74-
val tillShutdown = shutdownNanos - now
75-
if (tillShutdown <= 0) return // shut thread down
76-
parkNanos = parkNanos.coerceAtMost(tillShutdown)
77-
} else
78-
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
76+
val now = nanoTime()
77+
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
78+
val tillShutdown = shutdownNanos - now
79+
if (tillShutdown <= 0) return // shut thread down
80+
parkNanos = parkNanos.coerceAtMost(tillShutdown)
7981
}
8082
if (parkNanos > 0) {
8183
// check if shutdown was requested and bail out in this case

0 commit comments

Comments
 (0)