Skip to content

Commit 9e27cb3

Browse files
committed
(WIP) EventLoop integration
Fixes #860
1 parent b733f7d commit 9e27cb3

File tree

11 files changed

+133
-109
lines changed

11 files changed

+133
-109
lines changed

common/kotlinx-coroutines-core-common/src/Dispatched.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@ internal object UndispatchedEventLoop {
2020
)
2121

2222
@JvmField
23-
internal val threadLocalEventLoop = CommonThreadLocal { EventLoop() }
23+
internal val threadLocalEventLoop = CommonThreadLocal<EventLoop?>()
24+
25+
private val eventLoop: EventLoop
26+
get() = threadLocalEventLoop.get() ?: EventLoop().also {
27+
threadLocalEventLoop.set(it)
28+
}
2429

2530
/**
2631
* Executes given [block] as part of current event loop, updating related to block [continuation]
@@ -30,7 +35,7 @@ internal object UndispatchedEventLoop {
3035
*/
3136
inline fun execute(continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
3237
doYield: Boolean = false, block: () -> Unit) : Boolean {
33-
val eventLoop = threadLocalEventLoop.get()
38+
val eventLoop = eventLoop
3439
if (eventLoop.isActive) {
3540
// If we are yielding and queue is empty, we can bail out as part of fast path
3641
if (doYield && eventLoop.queue.isEmpty) {
@@ -48,13 +53,12 @@ internal object UndispatchedEventLoop {
4853
}
4954

5055
fun resumeUndispatched(task: DispatchedTask<*>): Boolean {
51-
val eventLoop = threadLocalEventLoop.get()
56+
val eventLoop = eventLoop
5257
if (eventLoop.isActive) {
5358
eventLoop.queue.addLast(task)
5459
return true
5560
}
56-
57-
runEventLoop(eventLoop, { task.resume(task.delegate, MODE_UNDISPATCHED) })
61+
runEventLoop(eventLoop) { task.resume(task.delegate, MODE_UNDISPATCHED) }
5862
return false
5963
}
6064

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.internal.*
8+
import kotlin.coroutines.*
9+
import kotlin.jvm.*
10+
11+
/**
12+
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
13+
* be asked to process next event from their event queue.
14+
*
15+
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
16+
* continue processing events when invoked from the event dispatch thread.
17+
*
18+
* @suppress **This an internal API and should not be used from general code.**
19+
*/
20+
internal interface EventLoop: ContinuationInterceptor {
21+
/**
22+
* Processes next event in this event loop.
23+
*
24+
* The result of this function is to be interpreted like this:
25+
* * `<= 0` -- there are potentially more events for immediate processing;
26+
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
27+
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
28+
*/
29+
public fun processNextEvent(): Long
30+
}
31+
32+
@NativeThreadLocal
33+
internal object ThreadEventLoop {
34+
@JvmField
35+
internal val ref = CommonThreadLocal<EventLoop?>()
36+
}
37+
38+

common/kotlinx-coroutines-core-common/src/internal/ThreadLocal.common.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package kotlinx.coroutines.internal
88
@UseExperimental(ExperimentalMultiplatform::class)
99
internal expect annotation class NativeThreadLocal()
1010

11-
internal expect class CommonThreadLocal<T>(supplier: () -> T) {
11+
internal expect class CommonThreadLocal<T>() {
1212
fun get(): T
13+
fun set(value: T)
1314
}

core/kotlinx-coroutines-core/src/Builders.kt

+33-22
Original file line numberDiff line numberDiff line change
@@ -35,35 +35,42 @@ import kotlin.coroutines.*
3535
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
3636
val currentThread = Thread.currentThread()
3737
val contextInterceptor = context[ContinuationInterceptor]
38-
val privateEventLoop = contextInterceptor == null // create private event loop if no dispatcher is specified
39-
val eventLoop = if (privateEventLoop) BlockingEventLoop(currentThread) else contextInterceptor as? EventLoop
40-
val newContext = GlobalScope.newCoroutineContext(
41-
if (privateEventLoop) context + (eventLoop as ContinuationInterceptor) else context
42-
)
43-
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop, privateEventLoop)
38+
val eventLoop: EventLoop?
39+
val topLevelEventLoop: Boolean
40+
val newContext: CoroutineContext
41+
if (contextInterceptor == null) {
42+
// create/use private event loop if no dispatcher is specified
43+
val currentEventLoop: EventLoop? = ThreadEventLoop.ref.get()
44+
val usedEventEventLoop = currentEventLoop ?: BlockingEventLoop(currentThread).also {
45+
ThreadEventLoop.ref.set(it)
46+
}
47+
eventLoop = usedEventEventLoop
48+
topLevelEventLoop = currentEventLoop == null
49+
newContext = GlobalScope.newCoroutineContext(context + usedEventEventLoop)
50+
} else {
51+
eventLoop = contextInterceptor as? EventLoop
52+
topLevelEventLoop = false
53+
newContext = GlobalScope.newCoroutineContext(context)
54+
}
55+
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
4456
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
45-
return coroutine.joinBlocking()
57+
return coroutine.joinBlocking(topLevelEventLoop)
4658
}
4759

4860
private class BlockingCoroutine<T>(
4961
parentContext: CoroutineContext,
5062
private val blockedThread: Thread,
51-
private val eventLoop: EventLoop?,
52-
private val privateEventLoop: Boolean
63+
private val eventLoop: EventLoop?
5364
) : AbstractCoroutine<T>(parentContext, true) {
54-
init {
55-
if (privateEventLoop) require(eventLoop is BlockingEventLoop)
56-
}
57-
5865
override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
5966
// wake up blocked thread
6067
if (Thread.currentThread() != blockedThread)
6168
LockSupport.unpark(blockedThread)
6269
}
6370

6471
@Suppress("UNCHECKED_CAST")
65-
fun joinBlocking(): T {
66-
timeSource.registerTimeLoopThread()
72+
fun joinBlocking(topLevelEventLoop: Boolean): T {
73+
if (topLevelEventLoop) timeSource.registerTimeLoopThread()
6774
while (true) {
6875
@Suppress("DEPRECATION")
6976
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
@@ -72,14 +79,18 @@ private class BlockingCoroutine<T>(
7279
if (isCompleted) break
7380
timeSource.parkNanos(this, parkNanos)
7481
}
75-
// process queued events (that could have been added after last processNextEvent and before cancel
76-
if (privateEventLoop) (eventLoop as BlockingEventLoop).apply {
77-
// We exit the "while" loop above when this coroutine's state "isCompleted",
78-
// Here we should signal that BlockingEventLoop should not accept any more tasks
79-
isCompleted = true
80-
shutdown()
82+
if (topLevelEventLoop) {
83+
// Clean up reference here -- this event loop is shutting down
84+
ThreadEventLoop.ref.set(null)
85+
// process queued events (that could have been added after last processNextEvent and before cancel
86+
(eventLoop as BlockingEventLoop).apply {
87+
// We exit the "while" loop above when this coroutine's state "isCompleted",
88+
// Here we should signal that BlockingEventLoop should not accept any more tasks
89+
isCompleted = true
90+
shutdown()
91+
}
92+
timeSource.unregisterTimeLoopThread()
8193
}
82-
timeSource.unregisterTimeLoopThread()
8394
// now return result
8495
val state = this.state.unboxState()
8596
(state as? CompletedExceptionally)?.let { throw it.cause }

core/kotlinx-coroutines-core/src/DefaultExecutor.kt

+4-11
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ internal actual val DefaultDelay: Delay = DefaultExecutor
1010

1111
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
1212
internal object DefaultExecutor : EventLoopBase(), Runnable {
13-
1413
override val isCompleted: Boolean get() = false
1514

1615
private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
@@ -26,6 +25,9 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
2625
@Volatile
2726
private var _thread: Thread? = null
2827

28+
override val thread: Thread
29+
get() = _thread ?: createThreadSync()
30+
2931
private const val FRESH = 0
3032
private const val ACTIVE = 1
3133
private const val SHUTDOWN_REQ = 2
@@ -81,13 +83,10 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
8183
acknowledgeShutdownIfNeeded()
8284
timeSource.unregisterTimeLoopThread()
8385
// recheck if queues are empty after _thread reference was set to null (!!!)
84-
if (!isEmpty) thread() // recreate thread if it is needed
86+
if (!isEmpty) thread // recreate thread if it is needed
8587
}
8688
}
8789

88-
// ensure that thread is there
89-
private fun thread(): Thread = _thread ?: createThreadSync()
90-
9190
@Synchronized
9291
private fun createThreadSync() = _thread ?:
9392
Thread(this, "kotlinx.coroutines.DefaultExecutor").apply {
@@ -96,12 +95,6 @@ internal object DefaultExecutor : EventLoopBase(), Runnable {
9695
start()
9796
}
9897

99-
override fun unpark() {
100-
timeSource.unpark(thread()) // as a side effect creates thread if it is not there
101-
}
102-
103-
override fun isCorrectThread(): Boolean = true
104-
10598
// used for tests
10699
@Synchronized
107100
internal fun ensureStarted() {

core/kotlinx-coroutines-core/src/EventLoop.kt

+14-39
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,6 @@ import kotlinx.atomicfu.*
88
import kotlinx.coroutines.internal.*
99
import kotlin.coroutines.*
1010

11-
/**
12-
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
13-
* be asked to process next event from their event queue.
14-
*
15-
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
16-
* continue processing events when invoked from the event dispatch thread.
17-
*
18-
* @suppress **This an internal API and should not be used from general code.**
19-
*/
20-
internal interface EventLoop: ContinuationInterceptor {
21-
/**
22-
* Processes next event in this event loop.
23-
*
24-
* The result of this function is to be interpreted like this:
25-
* * `<= 0` -- there are potentially more events for immediate processing;
26-
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
27-
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
28-
*/
29-
public fun processNextEvent(): Long
30-
}
31-
3211
private val DISPOSED_TASK = Symbol("REMOVED_TASK")
3312

3413
// results for scheduleImpl
@@ -61,8 +40,8 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
6140
private val _delayed = atomic<ThreadSafeHeap<DelayedTask>?>(null)
6241

6342
protected abstract val isCompleted: Boolean
64-
protected abstract fun unpark()
65-
protected abstract fun isCorrectThread(): Boolean
43+
44+
protected abstract val thread: Thread
6645

6746
protected val isEmpty: Boolean
6847
get() = isQueueEmpty && isDelayedEmpty
@@ -95,14 +74,20 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
9574
return (nextDelayedTask.nanoTime - timeSource.nanoTime()).coerceAtLeast(0)
9675
}
9776

77+
private fun unpark() {
78+
val thread = thread
79+
if (Thread.currentThread() !== thread)
80+
timeSource.unpark(thread)
81+
}
82+
9883
override fun dispatch(context: CoroutineContext, block: Runnable) =
9984
execute(block)
10085

10186
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
10287
schedule(DelayedResumeTask(timeMillis, continuation))
10388

10489
override fun processNextEvent(): Long {
105-
if (!isCorrectThread()) return Long.MAX_VALUE
90+
if (Thread.currentThread() !== thread) return Long.MAX_VALUE
10691
// queue all delayed tasks that are due to be executed
10792
val delayed = _delayed.value
10893
if (delayed != null && !delayed.isEmpty) {
@@ -318,27 +303,17 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
318303
}
319304
}
320305

321-
internal abstract class ThreadEventLoop(
322-
private val thread: Thread
306+
internal class BlockingEventLoop(
307+
override val thread: Thread
323308
) : EventLoopBase() {
324-
override fun isCorrectThread(): Boolean = Thread.currentThread() === thread
325-
326-
override fun unpark() {
327-
if (Thread.currentThread() !== thread)
328-
timeSource.unpark(thread)
329-
}
309+
@Volatile
310+
public override var isCompleted: Boolean = false
330311

331312
fun shutdown() {
332313
closeQueue()
333-
assert(isCorrectThread())
334314
// complete processing of all queued tasks
335315
while (processNextEvent() <= 0) { /* spin */ }
336316
// reschedule the rest of delayed tasks
337317
rescheduleAllDelayed()
338318
}
339-
}
340-
341-
internal class BlockingEventLoop(thread: Thread) : ThreadEventLoop(thread) {
342-
@Volatile
343-
public override var isCompleted: Boolean = false
344-
}
319+
}

core/kotlinx-coroutines-core/src/internal/ThreadLocal.kt

+2-5
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,5 @@ package kotlinx.coroutines.internal
66

77
import java.lang.ThreadLocal
88

9-
internal actual typealias CommonThreadLocal<T> = ThreadLocalWithInitialValue<T>
10-
11-
internal class ThreadLocalWithInitialValue<T>(private val supplier: () -> T) : ThreadLocal<T>() {
12-
override fun initialValue(): T = supplier()
13-
}
9+
@Suppress("ACTUAL_WITHOUT_EXPECT") // internal visibility
10+
internal actual typealias CommonThreadLocal<T> = ThreadLocal<T>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlinx.coroutines.channels.*
8+
import org.junit.*
9+
10+
class EventLoopsTest {
11+
@Test
12+
fun testNestedRunBlocking() {
13+
runBlocking { // outer event loop
14+
// Produce string "OK"
15+
val ch = produce { send("OK") }
16+
// try receive this string in a blocking way:
17+
println(runBlocking { ch.receive() }) // it hangs here !!!
18+
}
19+
}
20+
}

js/kotlinx-coroutines-core-js/src/internal/ThreadLocal.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package kotlinx.coroutines.internal
66

7-
internal actual class CommonThreadLocal<T> actual constructor(supplier: () -> T) {
8-
private val value = supplier()
9-
actual fun get(): T = value
7+
internal actual class CommonThreadLocal<T> actual constructor() {
8+
private var value: T? = null
9+
@Suppress("UNCHECKED_CAST")
10+
actual fun get(): T = value as T
11+
actual fun set(value: T) { this.value = value }
1012
}

native/kotlinx-coroutines-core-native/src/EventLoop.kt

-19
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,6 @@ import platform.posix.*
1111
import kotlin.coroutines.*
1212
import kotlin.system.*
1313

14-
/**
15-
* Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
16-
* be asked to process next event from their event queue.
17-
*
18-
* It may optionally implement [Delay] interface and support time-scheduled tasks. It is used by [runBlocking] to
19-
* continue processing events when invoked from the event dispatch thread.
20-
*/
21-
internal interface EventLoop {
22-
/**
23-
* Processes next event in this event loop.
24-
*
25-
* The result of this function is to be interpreted like this:
26-
* * `<= 0` -- there are potentially more events for immediate processing;
27-
* * `> 0` -- a number of nanoseconds to wait for next scheduled event;
28-
* * [Long.MAX_VALUE] -- no more events, or was invoked from the wrong thread.
29-
*/
30-
public fun processNextEvent(): Long
31-
}
32-
3314
/**
3415
* Creates a new event loop.
3516
*/

native/kotlinx-coroutines-core-native/src/internal/ThreadLocal.kt

+6-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import kotlin.native.concurrent.*
88
@Suppress("ACTUAL_WITHOUT_EXPECT")
99
internal actual typealias NativeThreadLocal = kotlin.native.ThreadLocal
1010

11-
internal actual class CommonThreadLocal<T> actual constructor(supplier: () -> T) {
12-
private val value = supplier()
13-
actual fun get(): T = value
14-
}
11+
internal actual class CommonThreadLocal<T> actual constructor() {
12+
private var value: T? = null
13+
@Suppress("UNCHECKED_CAST")
14+
actual fun get(): T = value as T
15+
actual fun set(value: T) { this.value = value }
16+
}

0 commit comments

Comments
 (0)