Skip to content

Commit 96b41b4

Browse files
authored
Fix DefaultExecutor not being able to exit (#1876)
* Fix DefaultExecutor not being able to exit. * Also adds the performance optimization. See the discussion on the PR. * Add a stress test for the DefaultExecutor worker shutting down. * Make `testDelayChannelBackpressure2` not fail: This test could in theory already fail on the second `checkNotEmpty`: after the first `checkNotEmpty` has passed, first, the ticker channel awakens to produce a new element, and then the main thread awakens to check if it's there. However, if the ticker channel is sufficiently slowed down, it may not produce the element in time for the main thread to find it. After introducing the change that allows the worker thread in `DefaultExecutor` to shut down, the initial delay of 2500 ms is sufficient for the worker to shut down (which, by default, happens after 1000 ms of inactivity), and then the aforementioned race condition worsens: additional time is required to launch a new worker thread and it's much easier to miss the deadline. Now, the delays are much shorter, meaning that the worker thread is never inactive long enough to shut down. Fixes #856
1 parent 9cbad7d commit 96b41b4

File tree

4 files changed

+50
-18
lines changed

4 files changed

+50
-18
lines changed

kotlinx-coroutines-core/common/src/EventLoop.common.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
5252
*/
5353
public open fun processNextEvent(): Long {
5454
if (!processUnconfinedEvent()) return Long.MAX_VALUE
55-
return nextTime
55+
return 0
5656
}
5757

5858
protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty
@@ -251,7 +251,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
251251

252252
override fun processNextEvent(): Long {
253253
// unconfined events take priority
254-
if (processUnconfinedEvent()) return nextTime
254+
if (processUnconfinedEvent()) return 0
255255
// queue all delayed tasks that are due to be executed
256256
val delayed = _delayed.value
257257
if (delayed != null && !delayed.isEmpty) {
@@ -269,7 +269,11 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
269269
}
270270
}
271271
// then process one event from queue
272-
dequeue()?.run()
272+
val task = dequeue()
273+
if (task != null) {
274+
task.run()
275+
return 0
276+
}
273277
return nextTime
274278
}
275279

kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt

+10-9
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
6868
var parkNanos = processNextEvent()
6969
if (parkNanos == Long.MAX_VALUE) {
7070
// nothing to do, initialize shutdown timeout
71-
if (shutdownNanos == Long.MAX_VALUE) {
72-
val now = nanoTime()
73-
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
74-
val tillShutdown = shutdownNanos - now
75-
if (tillShutdown <= 0) return // shut thread down
76-
parkNanos = parkNanos.coerceAtMost(tillShutdown)
77-
} else
78-
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
79-
}
71+
val now = nanoTime()
72+
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
73+
val tillShutdown = shutdownNanos - now
74+
if (tillShutdown <= 0) return // shut thread down
75+
parkNanos = parkNanos.coerceAtMost(tillShutdown)
76+
} else
77+
shutdownNanos = Long.MAX_VALUE
8078
if (parkNanos > 0) {
8179
// check if shutdown was requested and bail out in this case
8280
if (isShutdownRequested) return
@@ -142,4 +140,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
142140
resetAll() // clear queues
143141
(this as Object).notifyAll()
144142
}
143+
144+
internal val isThreadPresent
145+
get() = _thread != null
145146
}

kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt

+28-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
package kotlinx.coroutines
66

7-
import org.junit.*
7+
import org.junit.Test
8+
import kotlin.test.*
89

910
class DefaultExecutorStressTest : TestBase() {
1011
@Test
@@ -35,4 +36,30 @@ class DefaultExecutorStressTest : TestBase() {
3536
}
3637
finish(2 + iterations * 4)
3738
}
39+
40+
@Test
41+
fun testWorkerShutdown() = withVirtualTimeSource {
42+
val iterations = 1_000 * stressTestMultiplier
43+
// wait for the worker to shut down
44+
suspend fun awaitWorkerShutdown() {
45+
val executorTimeoutMs = 1000L
46+
delay(executorTimeoutMs)
47+
while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop
48+
assertFalse(DefaultExecutor.isThreadPresent) // just to make sure
49+
}
50+
runTest {
51+
awaitWorkerShutdown() // so that the worker shuts down after the initial launch
52+
repeat (iterations) {
53+
val job = launch(Dispatchers.Unconfined) {
54+
// this line runs in the main thread
55+
delay(1)
56+
// this line runs in the DefaultExecutor worker
57+
}
58+
delay(100) // yield the execution, allow the worker to spawn
59+
assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned
60+
job.join()
61+
awaitWorkerShutdown()
62+
}
63+
}
64+
}
3865
}

kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,17 @@ class TickerChannelTest : TestBase() {
4747
@Test
4848
fun testDelayChannelBackpressure2() = withVirtualTimeSource {
4949
runTest {
50-
val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
50+
val delayChannel = ticker(delayMillis = 200, initialDelayMillis = 0)
5151
delayChannel.checkNotEmpty()
5252
delayChannel.checkEmpty()
5353

54-
delay(2500)
54+
delay(500)
5555
delayChannel.checkNotEmpty()
56-
delay(510)
56+
delay(110)
5757
delayChannel.checkNotEmpty()
58-
delay(510)
58+
delay(110)
5959
delayChannel.checkEmpty()
60-
delay(510)
60+
delay(110)
6161
delayChannel.checkNotEmpty()
6262
delayChannel.cancel()
6363
}

0 commit comments

Comments
 (0)