Skip to content

Commit e653175

Browse files
committed
WIP
1 parent 80aa1b6 commit e653175

File tree

14 files changed

+127
-54
lines changed

14 files changed

+127
-54
lines changed

Diff for: docs/topics/coroutine-context-and-dispatchers.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ Produces the output:
115115
```text
116116
Unconfined : I'm working in thread main
117117
main runBlocking: I'm working in thread main
118-
Unconfined : After delay in thread kotlinx.coroutines.DefaultDelay
118+
Unconfined : After delay in thread DefaultDispatcher oroutine#2
119119
main runBlocking: After delay in thread main
120120
```
121121

Diff for: kotlinx-coroutines-core/common/src/EventLoop.common.kt

+73-23
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,48 @@ import kotlin.concurrent.Volatile
66
import kotlin.coroutines.*
77
import kotlin.jvm.*
88

9+
internal interface UnconfinedEventLoop {
10+
/**
11+
* Returns `true` if calling [yield] in a coroutine in this event loop can avoid yielding and continue executing
12+
* due to there being no other tasks in the queue.
13+
*
14+
* This can only be called from the thread that owns this event loop.
15+
*/
16+
val thisLoopsTaskCanAvoidYielding: Boolean
17+
18+
/**
19+
* Returns `true` if someone (typically a call to [runUnconfinedEventLoop]) is currently processing the tasks,
20+
* so calling [dispatchUnconfined] is guaranteed to be processed eventually.
21+
*
22+
* This can only be called from the thread that owns this event loop.
23+
*/
24+
val isUnconfinedLoopActive: Boolean
25+
26+
/**
27+
* Executes [initialBlock] and then processes unconfined tasks until there are no more, blocking the current thread.
28+
*
29+
* This can only be called when no other [runUnconfinedEventLoop] is currently active on this event loop.
30+
*
31+
* This can only be called from the thread that owns this event loop.
32+
*/
33+
fun runUnconfinedEventLoop(initialBlock: () -> Unit)
34+
35+
/**
36+
* Sends the [task] to this event loop for execution.
37+
*
38+
* This method should only be called while [isUnconfinedLoopActive] is `true`.
39+
* Otherwise, the task may be left unprocessed.
40+
*
41+
* This can only be called from the thread that owns this event loop.
42+
*/
43+
fun dispatchUnconfined(task: DispatchedTask<*>)
44+
45+
/**
46+
* Tries to interpret this event loop for unconfined tasks as a proper event loop and returns it if successful.
47+
*/
48+
fun tryUseAsEventLoop(): EventLoop?
49+
}
50+
951
/**
1052
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
1153
* be asked to process next event from their event queue.
@@ -16,7 +58,7 @@ import kotlin.jvm.*
1658
*
1759
* @suppress **This an internal API and should not be used from general code.**
1860
*/
19-
internal abstract class EventLoop : CoroutineDispatcher() {
61+
internal abstract class EventLoop : CoroutineDispatcher(), UnconfinedEventLoop {
2062
/**
2163
* Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
2264
*/
@@ -51,8 +93,6 @@ internal abstract class EventLoop : CoroutineDispatcher() {
5193
return 0
5294
}
5395

54-
protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
55-
5696
protected open val nextTime: Long
5797
get() {
5898
val queue = unconfinedQueue ?: return Long.MAX_VALUE
@@ -65,6 +105,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
65105
task.run()
66106
return true
67107
}
108+
68109
/**
69110
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
70111
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
@@ -77,28 +118,26 @@ internal abstract class EventLoop : CoroutineDispatcher() {
77118
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
78119
* into the current event loop.
79120
*/
80-
fun dispatchUnconfined(task: DispatchedTask<*>) {
81-
val queue = unconfinedQueue ?:
82-
ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
121+
override fun dispatchUnconfined(task: DispatchedTask<*>) {
122+
val queue = unconfinedQueue ?: ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
83123
queue.addLast(task)
84124
}
85125

86126
val isActive: Boolean
87127
get() = useCount > 0
88128

89-
val isUnconfinedLoopActive: Boolean
129+
override val isUnconfinedLoopActive: Boolean
90130
get() = useCount >= delta(unconfined = true)
91131

92-
// May only be used from the event loop's thread
93-
val isUnconfinedQueueEmpty: Boolean
94-
get() = unconfinedQueue?.isEmpty() ?: true
132+
override val thisLoopsTaskCanAvoidYielding: Boolean
133+
get() = unconfinedQueue?.isEmpty() != false
95134

96135
private fun delta(unconfined: Boolean) =
97136
if (unconfined) (1L shl 32) else 1L
98137

99138
fun incrementUseCount(unconfined: Boolean = false) {
100139
useCount += delta(unconfined)
101-
if (!unconfined) shared = true
140+
if (!unconfined) shared = true
102141
}
103142

104143
fun decrementUseCount(unconfined: Boolean = false) {
@@ -117,22 +156,37 @@ internal abstract class EventLoop : CoroutineDispatcher() {
117156
}
118157

119158
open fun shutdown() {}
159+
160+
override fun runUnconfinedEventLoop(initialBlock: () -> Unit) {
161+
incrementUseCount(unconfined = true)
162+
try {
163+
initialBlock()
164+
while (true) {
165+
// break when all unconfined continuations where executed
166+
if (!processUnconfinedEvent()) break
167+
}
168+
} finally {
169+
decrementUseCount(unconfined = true)
170+
}
171+
}
172+
173+
override fun tryUseAsEventLoop(): EventLoop? = this
120174
}
121175

122176
internal object ThreadLocalEventLoop {
123-
private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
177+
private val ref = commonThreadLocal<UnconfinedEventLoop?>(Symbol("ThreadLocalEventLoop"))
124178

125-
internal val eventLoop: EventLoop
179+
internal val unconfinedEventLoop: UnconfinedEventLoop
126180
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
127181

128-
internal fun currentOrNull(): EventLoop? =
182+
internal fun currentOrNull(): UnconfinedEventLoop? =
129183
ref.get()
130184

131185
internal fun resetEventLoop() {
132186
ref.set(null)
133187
}
134188

135-
internal fun setEventLoop(eventLoop: EventLoop) {
189+
internal fun setEventLoop(eventLoop: UnconfinedEventLoop) {
136190
ref.set(eventLoop)
137191
}
138192
}
@@ -183,8 +237,10 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
183237
get() = _isCompleted.value
184238
set(value) { _isCompleted.value = value }
185239

186-
override val isEmpty: Boolean get() {
187-
if (!isUnconfinedQueueEmpty) return false
240+
/**
241+
* Checks that at the moment this method is called, there are no tasks in the delayed tasks queue.
242+
*/
243+
protected val delayedQueueIsEmpty: Boolean get() {
188244
val delayed = _delayed.value
189245
if (delayed != null && !delayed.isEmpty) return false
190246
return when (val queue = _queue.value) {
@@ -383,12 +439,6 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
383439
return delayedTask.scheduleTask(now, delayedQueue, this)
384440
}
385441

386-
// It performs "hard" shutdown for test cleanup purposes
387-
protected fun resetAll() {
388-
_queue.value = null
389-
_delayed.value = null
390-
}
391-
392442
// This is a "soft" (normal) shutdown
393443
private fun rescheduleAllDelayed() {
394444
val now = nanoTime()

Diff for: kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -292,12 +292,12 @@ internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
292292
*/
293293
private inline fun DispatchedContinuation<*>.executeUnconfined(
294294
contState: Any?, mode: Int, doYield: Boolean = false,
295-
block: () -> Unit
295+
noinline block: () -> Unit
296296
): Boolean {
297297
assert { mode != MODE_UNINITIALIZED } // invalid execution mode
298-
val eventLoop = ThreadLocalEventLoop.eventLoop
298+
val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop
299299
// If we are yielding and unconfined queue is empty, we can bail out as part of fast path
300-
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
300+
if (doYield && eventLoop.thisLoopsTaskCanAvoidYielding) return false
301301
return if (eventLoop.isUnconfinedLoopActive) {
302302
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
303303
_state = contState

Diff for: kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt

+4-11
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, undispatche
165165
}
166166

167167
private fun DispatchedTask<*>.resumeUnconfined() {
168-
val eventLoop = ThreadLocalEventLoop.eventLoop
168+
val eventLoop = ThreadLocalEventLoop.unconfinedEventLoop
169169
if (eventLoop.isUnconfinedLoopActive) {
170170
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
171171
eventLoop.dispatchUnconfined(this)
@@ -177,25 +177,18 @@ private fun DispatchedTask<*>.resumeUnconfined() {
177177
}
178178
}
179179

180-
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
181-
eventLoop: EventLoop,
180+
internal fun DispatchedTask<*>.runUnconfinedEventLoop(
181+
eventLoop: UnconfinedEventLoop,
182182
block: () -> Unit
183183
) {
184-
eventLoop.incrementUseCount(unconfined = true)
185184
try {
186-
block()
187-
while (true) {
188-
// break when all unconfined continuations where executed
189-
if (!eventLoop.processUnconfinedEvent()) break
190-
}
185+
eventLoop.runUnconfinedEventLoop(block)
191186
} catch (e: Throwable) {
192187
/*
193188
* This exception doesn't happen normally, only if we have a bug in implementation.
194189
* Report it as a fatal exception.
195190
*/
196191
handleFatalException(e)
197-
} finally {
198-
eventLoop.decrementUseCount(unconfined = true)
199192
}
200193
}
201194

Diff for: kotlinx-coroutines-core/common/test/flow/VirtualTime.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ internal class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : Coroutine
1919
*/
2020
enclosingScope.launch {
2121
while (true) {
22-
val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
22+
val delayNanos = ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent()
2323
?: error("Event loop is missing, virtual time source works only as part of event loop")
2424
if (delayNanos <= 0) continue
2525
if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) {

Diff for: kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt

+3
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ import kotlin.coroutines.*
2525
* they are resubmitted to [Dispatchers.IO].
2626
*/
2727
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
28+
29+
internal fun UnconfinedEventLoop.useAsEventLoopForRunBlockingOrFail(): EventLoop =
30+
tryUseAsEventLoop() ?: throw IllegalStateException("runBlocking can not be run in direct dispatchers")

Diff for: kotlinx-coroutines-core/jsAndWasmJsShared/src/EventLoop.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package kotlinx.coroutines
22

33
import kotlin.coroutines.*
44

5-
internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoop()
5+
internal actual fun createEventLoop(): EventLoop = UnconfinedEventLoopImpl()
66

77
internal actual fun nanoTime(): Long = unsupported()
88

9-
internal class UnconfinedEventLoop : EventLoop() {
9+
private class UnconfinedEventLoopImpl : EventLoop() {
1010
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = unsupported()
1111
}
1212

Diff for: kotlinx-coroutines-core/jvm/src/Builders.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ public actual fun <T> runBlocking(context: CoroutineContext, block: suspend Coro
5858
val newContext: CoroutineContext
5959
if (contextInterceptor == null) {
6060
// create or use private event loop if no dispatcher is specified
61-
eventLoop = ThreadLocalEventLoop.eventLoop
61+
eventLoop = ThreadLocalEventLoop.unconfinedEventLoop.useAsEventLoopForRunBlockingOrFail()
6262
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
6363
} else {
6464
// See if context's interceptor is an event loop that we shall use (to support TestContext)
6565
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
6666
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
67-
?: ThreadLocalEventLoop.currentOrNull()
67+
?: ThreadLocalEventLoop.currentOrNull()?.useAsEventLoopForRunBlockingOrFail()
6868
newContext = GlobalScope.newCoroutineContext(context)
6969
}
7070
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)

Diff for: kotlinx-coroutines-core/jvm/src/DefaultDelay.kt

+31-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
@file:JvmName("DefaultExecutorKt")
12
package kotlinx.coroutines
23

34
import kotlinx.atomicfu.*
@@ -70,20 +71,21 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
7071
val oldName = currentThread.name
7172
currentThread.name = THREAD_NAME
7273
try {
73-
ThreadLocalEventLoop.setEventLoop(DefaultDelayImpl)
74+
ThreadLocalEventLoop.setEventLoop(DelegatingUnconfinedEventLoop)
7475
registerTimeLoopThread()
7576
try {
7677
while (true) {
7778
Thread.interrupted() // just reset interruption flag
7879
val parkNanos = processNextEvent()
7980
if (parkNanos == Long.MAX_VALUE) break // no more events
80-
parkNanos(DefaultDelayImpl, parkNanos)
81+
parkNanos(this@DefaultDelayImpl, parkNanos)
8182
}
8283
} finally {
8384
_thread.value = null
8485
unregisterTimeLoopThread()
86+
ThreadLocalEventLoop.resetEventLoop()
8587
// recheck if queues are empty after _thread reference was set to null (!!!)
86-
if (isEmpty) {
88+
if (delayedQueueIsEmpty) {
8789
notifyAboutThreadExiting()
8890
} else {
8991
/* recreate the thread, as there is still work to do,
@@ -111,7 +113,7 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
111113
if (_thread.value != null) {
112114
val end = System.currentTimeMillis() + timeout.inWholeMilliseconds
113115
while (true) {
114-
check(isEmpty) { "There are tasks in the DefaultExecutor" }
116+
check(delayedQueueIsEmpty) { "There are tasks in the DefaultExecutor" }
115117
synchronized(this) {
116118
unpark(_thread.value ?: return)
117119
val toWait = end - System.currentTimeMillis()
@@ -129,6 +131,31 @@ private object DefaultDelayImpl : EventLoopImplBase(), Runnable {
129131
override fun toString(): String = "DefaultDelay"
130132
}
131133

134+
private object DelegatingUnconfinedEventLoop: UnconfinedEventLoop {
135+
override val thisLoopsTaskCanAvoidYielding: Boolean
136+
get() = defaultDelayRunningUnconfinedLoop()
137+
138+
override val isUnconfinedLoopActive: Boolean get() = false
139+
140+
override fun runUnconfinedEventLoop(initialBlock: () -> Unit) {
141+
ioView.dispatch(ioView, Runnable {
142+
ThreadLocalEventLoop.unconfinedEventLoop.runUnconfinedEventLoop(initialBlock)
143+
})
144+
}
145+
146+
override fun dispatchUnconfined(task: DispatchedTask<*>) =
147+
defaultDelayRunningUnconfinedLoop()
148+
149+
override fun tryUseAsEventLoop(): EventLoop? = null
150+
}
151+
152+
private fun defaultDelayRunningUnconfinedLoop(): Nothing {
153+
throw UnsupportedOperationException(
154+
"This method can only be called from the thread where an unconfined event loop is running, " +
155+
"but no tasks can run on this thread."
156+
)
157+
}
158+
132159
/** A view separate from [Dispatchers.IO].
133160
* [Int.MAX_VALUE] instead of `1` to avoid needlessly using the [LimitedDispatcher] machinery. */
134161
private val ioView = Dispatchers.IO.limitedParallelism(Int.MAX_VALUE)

Diff for: kotlinx-coroutines-core/jvm/src/Dispatchers.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ public actual object Dispatchers {
1919
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
2020

2121
@JvmStatic
22-
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
22+
public actual val Unconfined: CoroutineDispatcher get() =
23+
kotlinx.coroutines.Unconfined
2324

2425
/**
2526
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.

Diff for: kotlinx-coroutines-core/jvm/src/EventLoop.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.curr
4646
public fun processNextEventInCurrentThread(): Long =
4747
// This API is used in Ktor for serverless integration where a single thread awaits a blocking call
4848
// (and, to avoid actual blocking, does something via this call), see #850
49-
ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: Long.MAX_VALUE
49+
ThreadLocalEventLoop.currentOrNull()?.tryUseAsEventLoop()?.processNextEvent() ?: Long.MAX_VALUE
5050

5151
internal actual inline fun platformAutoreleasePool(crossinline block: () -> Unit) = block()
5252

Diff for: kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.junit.runners.*
99
import kotlin.test.*
1010

1111
@RunWith(Parameterized::class)
12-
class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() {
12+
class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase(disableOutCheck = true) {
1313
companion object {
1414
@Parameterized.Parameters(name = "{0}")
1515
@JvmStatic

Diff for: kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class DispatcherGuideTest {
2020
test("ExampleContext02") { kotlinx.coroutines.guide.exampleContext02.main() }.verifyLinesStart(
2121
"Unconfined : I'm working in thread main",
2222
"main runBlocking: I'm working in thread main",
23-
"Unconfined : After delay in thread kotlinx.coroutines.DefaultDelay",
23+
"Unconfined : After delay in thread DefaultDispatcher oroutine#2",
2424
"main runBlocking: After delay in thread main"
2525
)
2626
}

0 commit comments

Comments
 (0)