Skip to content

Commit b017cca

Browse files
committed
Port the JVM implementation of DefaultDelay to Native
Tests segfault on my machine with this stacktrace: 0 kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher-trampoline () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 1 0x00000000003bed59 in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism$default(kotlin.Int;kotlin.String?;kotlin.Int){}kotlinx.coroutines.CoroutineDispatcher (_this=0x0, parallelism=2147483647, name=0x0, $mask0=2) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 2 0x00000000004bfa97 in kfun:kotlinx.coroutines.$init_global#internal.18 () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/DefaultDelay.kt:101 3 0x0000000000cac194 in CallInitGlobalPossiblyLock () 4 0x00000000004bfb60 in kfun:kotlinx.coroutines#<get-DefaultDelay>(){}kotlinx.coroutines.Delay () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/DefaultDelay.kt:1 5 0x0000000000496840 in kfun:kotlinx.coroutines.internal.LimitedDispatcher#<init>(kotlinx.coroutines.CoroutineDispatcher;kotlin.Int;kotlin.String?){} ($this=0x7ffff64a0668, dispatcher=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt:26 6 0x00000000003bec1d in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher (_this=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:178 7 0x00000000004c53ad in kfun:kotlinx.coroutines.MultiWorkerDispatcher.limitedParallelism#internal (_this=0x7ffff64a0620, parallelism=64, name=0x0) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt:151 8 0x0000000000b3de7a in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism(kotlin.Int;kotlin.String?){}kotlinx.coroutines.CoroutineDispatcher-trampoline () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 9 0x00000000003bed59 in kfun:kotlinx.coroutines.CoroutineDispatcher#limitedParallelism$default(kotlin.Int;kotlin.String?;kotlin.Int){}kotlinx.coroutines.CoroutineDispatcher (_this=0x7ffff64a0620, parallelism=64, name=0x0, $mask0=2) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:176 10 0x00000000004c13c0 in kfun:kotlinx.coroutines.DefaultIoScheduler.<init>#internal ($this=0x7ffff6630700) at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/Dispatchers.kt:27 11 0x00000000004c127f in kfun:kotlinx.coroutines.DefaultIoScheduler.$init_global#internal () at /home/dkhalansky/IdeaProjects/kotlinx.coroutines/kotlinx-coroutines-core/native/src/Dispatchers.kt:1
1 parent d2492e3 commit b017cca

File tree

5 files changed

+118
-13
lines changed

5 files changed

+118
-13
lines changed

Diff for: kotlinx-coroutines-core/jvm/src/DefaultDelay.kt

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package kotlinx.coroutines
33

44
import kotlinx.atomicfu.*
55
import kotlinx.coroutines.internal.*
6-
import kotlinx.coroutines.scheduling.*
76
import kotlinx.coroutines.scheduling.scheduleBackgroundIoTask
87
import kotlin.coroutines.*
98
import kotlin.time.Duration

Diff for: kotlinx-coroutines-core/native/src/CoroutineContext.kt

-3
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@ package kotlinx.coroutines
33
import kotlinx.coroutines.internal.*
44
import kotlin.coroutines.*
55

6-
@PublishedApi
7-
internal actual val DefaultDelay: Delay = WorkerDispatcher(name = "DefaultDelay")
8-
96
internal expect fun createDefaultDispatcher(): CoroutineDispatcher
107

118
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {

Diff for: kotlinx-coroutines-core/native/src/DefaultDelay.kt

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package kotlinx.coroutines
2+
3+
import kotlinx.atomicfu.*
4+
import kotlinx.coroutines.internal.*
5+
import kotlin.coroutines.*
6+
import kotlin.native.concurrent.ObsoleteWorkersApi
7+
import kotlin.native.concurrent.Worker
8+
9+
@PublishedApi
10+
internal actual val DefaultDelay: Delay get() = DefaultDelayImpl
11+
12+
@OptIn(ObsoleteWorkersApi::class)
13+
private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
14+
init {
15+
incrementUseCount() // this event loop is never completed
16+
}
17+
18+
private val _thread = atomic<Worker?>(null)
19+
20+
/** Can only happen when tests close the default executor */
21+
override fun reschedule(now: Long, delayedTask: DelayedTask) {
22+
throw IllegalStateException("Attempted to schedule $delayedTask at $now after shutdown")
23+
}
24+
25+
/**
26+
* All event loops are using DefaultDelay#invokeOnTimeout to avoid livelock on
27+
* ```
28+
* runBlocking(eventLoop) { withTimeout { while(isActive) { ... } } }
29+
* ```
30+
*
31+
* Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
32+
* but it's not exposed as public API.
33+
*/
34+
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
35+
scheduleInvokeOnTimeout(timeMillis, block)
36+
37+
override fun run() {
38+
val currentThread = Worker.current
39+
// Identity comparisons do not work for value classes, but comparing `null` with non-null should still work
40+
if (!_thread.compareAndSet(null, currentThread)) return // some other thread won the race to start the thread
41+
ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop)
42+
try {
43+
while (true) {
44+
val parkNanos = processNextEvent()
45+
if (parkNanos == Long.MAX_VALUE) break // no more events
46+
if (parkNanos > 0) currentThread.park(parkNanos / 1000L, true)
47+
}
48+
} finally {
49+
_thread.value = null
50+
ThreadLocalEventLoop.resetEventLoop()
51+
// recheck if queues are empty after _thread reference was set to null (!!!)
52+
if (!delayedQueueIsEmpty) {
53+
/* recreate the thread, as there is still work to do,
54+
and `unpark` could have awoken the thread we're currently running on */
55+
startThreadOrObtainSleepingThread()
56+
}
57+
}
58+
}
59+
60+
override fun startThreadOrObtainSleepingThread(): Worker? {
61+
// Check if the thread is already running
62+
_thread.value?.let { return it }
63+
/* Now we know that at the moment of this call the thread was not initially running.
64+
This means that whatever thread is going to be running by the end of this function,
65+
it's going to notice the tasks it's supposed to run.
66+
We can return `null` unconditionally. */
67+
scheduleBackgroundIoTask(this)
68+
return null
69+
}
70+
71+
override fun toString(): String = "DefaultDelay"
72+
}
73+
74+
private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop {
75+
override val thisLoopsTaskCanAvoidYielding: Boolean
76+
get() = defaultDelayRunningUnconfinedLoop()
77+
78+
override val isUnconfinedLoopActive: Boolean get() = false
79+
80+
override fun runUnconfinedEventLoop(initialBlock: () -> Unit) {
81+
ioView.dispatch(ioView, Runnable {
82+
ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock)
83+
})
84+
}
85+
86+
override fun dispatchUnconfined(task: DispatchedTask<*>) =
87+
defaultDelayRunningUnconfinedLoop()
88+
89+
override fun tryUseAsEventLoop(): EventLoop? = null
90+
}
91+
92+
private fun defaultDelayRunningUnconfinedLoop(): Nothing {
93+
throw UnsupportedOperationException(
94+
"This method can only be called from the thread where an unconfined event loop is running, " +
95+
"but no tasks can run on this thread."
96+
)
97+
}
98+
99+
/** A view separate from [Dispatchers.IO].
100+
* [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */
101+
private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE)

Diff for: kotlinx-coroutines-core/native/src/Dispatchers.kt

+5
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,14 @@ internal object DefaultIoScheduler : CoroutineDispatcher() {
4040
io.dispatchYield(context, block)
4141
}
4242

43+
internal fun dispatchToUnlimitedPool(block: Runnable) {
44+
unlimitedPool.dispatch(EmptyCoroutineContext, block)
45+
}
46+
4347
override fun toString(): String = "Dispatchers.IO"
4448
}
4549

50+
internal inline fun scheduleBackgroundIoTask(block: Runnable) = DefaultIoScheduler.dispatchToUnlimitedPool(block)
4651

4752
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
4853
public actual val Dispatchers.IO: CoroutineDispatcher get() = IO

Diff for: kotlinx-coroutines-core/native/src/EventLoop.kt

+12-9
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,29 @@
22

33
package kotlinx.coroutines
44

5-
import kotlin.coroutines.*
65
import kotlin.native.concurrent.*
76
import kotlin.time.*
87

98
internal actual abstract class EventLoopImplPlatform : EventLoop() {
10-
11-
private val current = Worker.current // not `get()`! We're interested in the worker at the moment of creation.
9+
/** Returns `null` if a thread was created and doesn't need to be awoken.
10+
* Returns a thread to awaken if the thread already existed when this method was called. */
11+
protected abstract fun startThreadOrObtainSleepingThread(): Worker?
1212

1313
protected actual fun unpark() {
14-
current.executeAfter(0L, {})// send an empty task to unpark the waiting event loop
14+
startThreadOrObtainSleepingThread()?.let {
15+
it.executeAfter(0L, {})
16+
}
1517
}
16-
1718
}
1819

19-
internal class EventLoopImpl: EventLoopImplBase() {
20-
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
21-
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
20+
internal class BlockingEventLoop(
21+
private val worker: Worker
22+
) : EventLoopImplBase() {
23+
override fun startThreadOrObtainSleepingThread(): Worker? =
24+
if (Worker.current.id != worker.id) worker else null
2225
}
2326

24-
internal actual fun createEventLoop(): EventLoop = EventLoopImpl()
27+
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Worker.current)
2528

2629
private val startingPoint = TimeSource.Monotonic.markNow()
2730

0 commit comments

Comments
 (0)