Skip to content

Commit 4b936b5

Browse files
committed
Improvements to EventLoops implementation:
* Unused ArrayQueue is dropped * executeUnconfined and resumeUnconfined are defined as extension funs * EventLoop.increment renamed to delta * Test for EventLoop added
1 parent 2c1ae01 commit 4b936b5

File tree

4 files changed

+79
-66
lines changed

4 files changed

+79
-66
lines changed

common/kotlinx-coroutines-core-common/src/Dispatched.kt

+17-17
Original file line numberDiff line numberDiff line change
@@ -13,40 +13,40 @@ import kotlin.jvm.*
1313
private val UNDEFINED = Symbol("UNDEFINED")
1414

1515
/**
16-
* Executes given [block] as part of current event loop, updating related to block [continuation]
16+
* Executes given [block] as part of current event loop, updating current continuation
1717
* mode and state if continuation is not resumed immediately.
1818
* [doYield] indicates whether current continuation is yielding (to provide fast-path if event-loop is empty).
1919
* Returns `true` if execution of continuation was queued (trampolined) or `false` otherwise.
2020
*/
21-
private inline fun executeUnconfined(
22-
continuation: DispatchedContinuation<*>, contState: Any?, mode: Int,
23-
doYield: Boolean = false, block: () -> Unit
21+
private inline fun DispatchedContinuation<*>.executeUnconfined(
22+
contState: Any?, mode: Int, doYield: Boolean = false,
23+
block: () -> Unit
2424
) : Boolean {
2525
val eventLoop = ThreadLocalEventLoop.eventLoop
2626
// If we are yielding and unconfined queue is empty, we can bail out as part of fast path
2727
if (doYield && eventLoop.isEmptyUnconfinedQueue) return false
2828
return if (eventLoop.isUnconfinedLoopActive) {
2929
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
30-
continuation._state = contState
31-
continuation.resumeMode = mode
32-
eventLoop.dispatchUnconfined(continuation)
30+
_state = contState
31+
resumeMode = mode
32+
eventLoop.dispatchUnconfined(this)
3333
true // queued into the active loop
3434
} else {
35-
// Was not active -- run event loop until unconfined tasks are executed
35+
// Was not active -- run event loop until all unconfined tasks are executed
3636
runUnconfinedEventLoop(eventLoop, block = block)
3737
false
3838
}
3939
}
4040

41-
private fun resumeUnconfined(task: DispatchedTask<*>) {
41+
private fun DispatchedTask<*>.resumeUnconfined() {
4242
val eventLoop = ThreadLocalEventLoop.eventLoop
4343
if (eventLoop.isUnconfinedLoopActive) {
4444
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
45-
eventLoop.dispatchUnconfined(task)
45+
eventLoop.dispatchUnconfined(this)
4646
} else {
47-
// Was not active -- run event loop until unconfined tasks are executed
47+
// Was not active -- run event loop until all unconfined tasks are executed
4848
runUnconfinedEventLoop(eventLoop) {
49-
task.resume(task.delegate, MODE_UNDISPATCHED)
49+
resume(delegate, MODE_UNDISPATCHED)
5050
}
5151
}
5252
}
@@ -103,7 +103,7 @@ internal class DispatchedContinuation<in T>(
103103
resumeMode = MODE_ATOMIC_DEFAULT
104104
dispatcher.dispatch(context, this)
105105
} else {
106-
executeUnconfined(this, state, MODE_ATOMIC_DEFAULT) {
106+
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
107107
withCoroutineContext(this.context, countOrElement) {
108108
continuation.resumeWith(result)
109109
}
@@ -118,7 +118,7 @@ internal class DispatchedContinuation<in T>(
118118
resumeMode = MODE_CANCELLABLE
119119
dispatcher.dispatch(context, this)
120120
} else {
121-
executeUnconfined(this, value, MODE_CANCELLABLE) {
121+
executeUnconfined(value, MODE_CANCELLABLE) {
122122
if (!resumeCancelled()) {
123123
resumeUndispatched(value)
124124
}
@@ -135,7 +135,7 @@ internal class DispatchedContinuation<in T>(
135135
resumeMode = MODE_CANCELLABLE
136136
dispatcher.dispatch(context, this)
137137
} else {
138-
executeUnconfined(this, state, MODE_CANCELLABLE) {
138+
executeUnconfined(state, MODE_CANCELLABLE) {
139139
if (!resumeCancelled()) {
140140
resumeUndispatchedWithException(exception)
141141
}
@@ -259,7 +259,7 @@ internal abstract class DispatchedTask<in T>(
259259
}
260260

261261
internal fun DispatchedContinuation<Unit>.yieldUndispatched(): Boolean =
262-
executeUnconfined(this, Unit, MODE_CANCELLABLE, doYield = true) {
262+
executeUnconfined(Unit, MODE_CANCELLABLE, doYield = true) {
263263
run()
264264
}
265265

@@ -272,7 +272,7 @@ internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
272272
if (dispatcher.isDispatchNeeded(context)) {
273273
dispatcher.dispatch(context, this)
274274
} else {
275-
resumeUnconfined(this)
275+
resumeUnconfined()
276276
}
277277
} else {
278278
resume(delegate, mode)

common/kotlinx-coroutines-core-common/src/EventLoop.common.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -79,21 +79,21 @@ internal abstract class EventLoop : CoroutineDispatcher() {
7979
get() = useCount > 0
8080

8181
public val isUnconfinedLoopActive: Boolean
82-
get() = useCount >= increment(unconfined = true)
82+
get() = useCount >= delta(unconfined = true)
8383

8484
public val isEmptyUnconfinedQueue: Boolean
8585
get() = queuedUnconfinedTasks == 0
8686

87-
private fun increment(unconfined: Boolean) =
87+
private fun delta(unconfined: Boolean) =
8888
if (unconfined) (1L shl 32) else 1L
8989

9090
fun incrementUseCount(unconfined: Boolean = false) {
91-
useCount += increment(unconfined)
91+
useCount += delta(unconfined)
9292
if (!unconfined) shared = true
9393
}
9494

9595
fun decrementUseCount(unconfined: Boolean = false) {
96-
useCount -= increment(unconfined)
96+
useCount -= delta(unconfined)
9797
if (useCount > 0) return
9898
check(useCount == 0L) { "Extra decrementUseCount" }
9999
if (shared) {

common/kotlinx-coroutines-core-common/src/internal/ArrayQueue.kt

-45
This file was deleted.

core/kotlinx-coroutines-core/test/EventLoopsTest.kt

+58
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
package kotlinx.coroutines
66

7+
import kotlinx.atomicfu.*
78
import kotlinx.coroutines.channels.*
89
import org.junit.*
910
import org.junit.Test
11+
import java.util.concurrent.locks.*
1012
import kotlin.test.*
1113

1214
/**
@@ -68,4 +70,60 @@ class EventLoopsTest : TestBase() {
6870
}
6971
finish(6)
7072
}
73+
74+
/**
75+
* Simple test for [processNextEventInCurrentThread] API use-case.
76+
*/
77+
@Test
78+
fun testProcessNextEventInCurrentThreadSimple() = runTest {
79+
expect(1)
80+
val event = CustomBlockingEvent()
81+
// this coroutine fires event
82+
launch {
83+
expect(3)
84+
event.fireEvent()
85+
}
86+
// main coroutine waits for event (same thread!)
87+
expect(2)
88+
event.blockingAwait()
89+
finish(4)
90+
}
91+
92+
/**
93+
* Test for [processNextEventInCurrentThread] API use-case with delay.
94+
*/
95+
@Test
96+
fun testProcessNextEventInCurrentThreadDelay() = runTest {
97+
expect(1)
98+
val event = CustomBlockingEvent()
99+
// this coroutine fires event
100+
launch {
101+
expect(3)
102+
delay(100)
103+
event.fireEvent()
104+
}
105+
// main coroutine waits for event (same thread!)
106+
expect(2)
107+
event.blockingAwait()
108+
finish(4)
109+
}
110+
111+
class CustomBlockingEvent {
112+
private val waitingThread = atomic<Thread?>(null)
113+
private val fired = atomic(false)
114+
115+
fun fireEvent() {
116+
fired.value = true
117+
waitingThread.value?.let { LockSupport.unpark(it) }
118+
}
119+
120+
fun blockingAwait() {
121+
check(waitingThread.getAndSet(Thread.currentThread()) == null)
122+
while (!fired.getAndSet(false)) {
123+
val time = processNextEventInCurrentThreadprocessNextEventInCurrentThread()
124+
LockSupport.parkNanos(time)
125+
}
126+
waitingThread.value = null
127+
}
128+
}
71129
}

0 commit comments

Comments
 (0)