From b1751db895d6ac3ef44f64a600067d2f44bf8298 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Mon, 23 Mar 2020 12:17:08 +0300 Subject: [PATCH 1/7] Fix DefaultExecutor not being able to exit Solves https://github.com/Kotlin/kotlinx.coroutines/issues/856. --- .../common/src/EventLoop.common.kt | 28 ++++++++++++++----- .../jvm/src/DefaultExecutor.kt | 20 +++++++------ 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index ba331e20df..e625577a1c 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -23,6 +23,8 @@ import kotlin.native.concurrent.* internal abstract class EventLoop : CoroutineDispatcher() { /** * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. + * There are two 32-bit counters encoded in this 64-bit value, allowing to count [Dispatchers.Unconfined] + * separately from `runBlocking`; see [delta] and its uses. */ private var useCount = 0L @@ -51,7 +53,8 @@ internal abstract class EventLoop : CoroutineDispatcher() { * (no check for performance reasons, may be added in the future). */ public open fun processNextEvent(): Long { - if (!processUnconfinedEvent()) return Long.MAX_VALUE + val task = popNextTask() ?: return Long.MAX_VALUE + task.run() return nextTime } @@ -64,11 +67,17 @@ internal abstract class EventLoop : CoroutineDispatcher() { } public fun processUnconfinedEvent(): Boolean { - val queue = unconfinedQueue ?: return false - val task = queue.removeFirstOrNull() ?: return false + val task = popUnconfinedTask() ?: return false task.run() return true } + + public fun popUnconfinedTask(): DispatchedTask<*>? = + unconfinedQueue?.removeFirstOrNull() + + protected open fun popNextTask(onlyUnconfined: Boolean = false): Runnable? = + popUnconfinedTask() + /** * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). @@ -249,9 +258,15 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } - override fun processNextEvent(): Long { + override fun popNextTask(onlyUnconfined: Boolean): Runnable? { + val unconfined = popUnconfinedTask() + if (onlyUnconfined) { + return unconfined + } // unconfined events take priority - if (processUnconfinedEvent()) return nextTime + if (unconfined != null) { + return unconfined + } // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { @@ -269,8 +284,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } // then process one event from queue - dequeue()?.run() - return nextTime + return dequeue() } public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 4e107a7b1d..4f095f536f 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -65,17 +65,19 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { if (!notifyStartup()) return while (true) { Thread.interrupted() // just reset interruption flag - var parkNanos = processNextEvent() + val nextTask = popNextTask() + if (nextTask != null) { + shutdownNanos = Long.MAX_VALUE + nextTask.run() + } + var parkNanos = nextTime if (parkNanos == Long.MAX_VALUE) { // nothing to do, initialize shutdown timeout - if (shutdownNanos == Long.MAX_VALUE) { - val now = nanoTime() - if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS - val tillShutdown = shutdownNanos - now - if (tillShutdown <= 0) return // shut thread down - parkNanos = parkNanos.coerceAtMost(tillShutdown) - } else - parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway + val now = nanoTime() + if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS + val tillShutdown = shutdownNanos - now + if (tillShutdown <= 0) return // shut thread down + parkNanos = parkNanos.coerceAtMost(tillShutdown) } if (parkNanos > 0) { // check if shutdown was requested and bail out in this case From a67305e6910a1106643c623e7d8e7ac1d30a1f8f Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 24 Mar 2020 16:55:45 +0300 Subject: [PATCH 2/7] `poll` -> `dequeue`, remove `onlyUnconfined` parameter --- .../common/src/EventLoop.common.kt | 20 +++++++++---------- .../jvm/src/DefaultExecutor.kt | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index e625577a1c..07f5098bbb 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -53,7 +53,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { * (no check for performance reasons, may be added in the future). */ public open fun processNextEvent(): Long { - val task = popNextTask() ?: return Long.MAX_VALUE + val task = dequeueNextTask() ?: return Long.MAX_VALUE task.run() return nextTime } @@ -67,16 +67,19 @@ internal abstract class EventLoop : CoroutineDispatcher() { } public fun processUnconfinedEvent(): Boolean { - val task = popUnconfinedTask() ?: return false + val task = dequeueUnconfinedTask() ?: return false task.run() return true } - public fun popUnconfinedTask(): DispatchedTask<*>? = + protected fun dequeueUnconfinedTask(): DispatchedTask<*>? = unconfinedQueue?.removeFirstOrNull() - protected open fun popNextTask(onlyUnconfined: Boolean = false): Runnable? = - popUnconfinedTask() + /** + * Get the next event in this event loop, if there is one. + */ + protected open fun dequeueNextTask(): Runnable? = + dequeueUnconfinedTask() /** * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context @@ -258,11 +261,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } - override fun popNextTask(onlyUnconfined: Boolean): Runnable? { - val unconfined = popUnconfinedTask() - if (onlyUnconfined) { - return unconfined - } + protected override fun dequeueNextTask(): Runnable? { + val unconfined = dequeueUnconfinedTask() // unconfined events take priority if (unconfined != null) { return unconfined diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 4f095f536f..62113fdd9a 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -65,7 +65,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { if (!notifyStartup()) return while (true) { Thread.interrupted() // just reset interruption flag - val nextTask = popNextTask() + val nextTask = dequeueNextTask() if (nextTask != null) { shutdownNanos = Long.MAX_VALUE nextTask.run() From dff1531389184eafeeb3dd01c92f3453cc720556 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 25 Mar 2020 00:10:28 +0300 Subject: [PATCH 3/7] Fix wrong refactoring in EventLoopCommon --- kotlinx-coroutines-core/common/src/EventLoop.common.kt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 07f5098bbb..da87977bf2 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -287,6 +287,11 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { return dequeue() } + override fun processNextEvent(): Long { + dequeueNextTask()?.run() + return nextTime + } + public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) public fun enqueue(task: Runnable) { From 021cca3fe2bdb5c1ef9bd460683626b9b5722da1 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 26 Mar 2020 11:35:25 +0300 Subject: [PATCH 4/7] Make `testDelayChannelBackpressure2` not fail This test could in theory already fail on the second `checkNotEmpty`: after the first `checkNotEmpty` has passed, first, the ticker channel awakens to produce a new element, and then the main thread awakens to check if it's there. However, if the ticker channel is sufficiently slowed down, it may not produce the element in time for the main thread to find it. After introducing the change that allows the worker thread in `DefaultExecutor` to shut down, the initial delay of 2500 ms is sufficient for the worker to shut down (which, by default, happens after 1000 ms of inactivity), and then the aforementioned race condition worsens: additional time is required to launch a new worker thread and it's much easier to miss the deadline. Now, the delays are much shorter, meaning that the worker thread is never inactive long enough to shut down. --- .../jvm/test/channels/TickerChannelTest.kt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt index c421bd334a..fcdc6bb4ad 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelTest.kt @@ -47,17 +47,17 @@ class TickerChannelTest : TestBase() { @Test fun testDelayChannelBackpressure2() = withVirtualTimeSource { runTest { - val delayChannel = ticker(delayMillis = 1000, initialDelayMillis = 0) + val delayChannel = ticker(delayMillis = 200, initialDelayMillis = 0) delayChannel.checkNotEmpty() delayChannel.checkEmpty() - delay(2500) + delay(500) delayChannel.checkNotEmpty() - delay(510) + delay(110) delayChannel.checkNotEmpty() - delay(510) + delay(110) delayChannel.checkEmpty() - delay(510) + delay(110) delayChannel.checkNotEmpty() delayChannel.cancel() } From 5f2b0c75bdb85db0c8f47237aeee4be776e0a4c6 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 26 Mar 2020 11:44:12 +0300 Subject: [PATCH 5/7] Revert decomposition of `process` to `dequeue` and `task.run` Also adds the performance optimization. See the discussion on the PR. --- .../common/src/EventLoop.common.kt | 37 ++++++------------- .../jvm/src/DefaultExecutor.kt | 10 ++--- 2 files changed, 14 insertions(+), 33 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index da87977bf2..69ea9fe312 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -23,8 +23,6 @@ import kotlin.native.concurrent.* internal abstract class EventLoop : CoroutineDispatcher() { /** * Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop. - * There are two 32-bit counters encoded in this 64-bit value, allowing to count [Dispatchers.Unconfined] - * separately from `runBlocking`; see [delta] and its uses. */ private var useCount = 0L @@ -53,9 +51,8 @@ internal abstract class EventLoop : CoroutineDispatcher() { * (no check for performance reasons, may be added in the future). */ public open fun processNextEvent(): Long { - val task = dequeueNextTask() ?: return Long.MAX_VALUE - task.run() - return nextTime + if (!processUnconfinedEvent()) return Long.MAX_VALUE + return 0 } protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty @@ -67,20 +64,11 @@ internal abstract class EventLoop : CoroutineDispatcher() { } public fun processUnconfinedEvent(): Boolean { - val task = dequeueUnconfinedTask() ?: return false + val queue = unconfinedQueue ?: return false + val task = queue.removeFirstOrNull() ?: return false task.run() return true } - - protected fun dequeueUnconfinedTask(): DispatchedTask<*>? = - unconfinedQueue?.removeFirstOrNull() - - /** - * Get the next event in this event loop, if there is one. - */ - protected open fun dequeueNextTask(): Runnable? = - dequeueUnconfinedTask() - /** * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). @@ -261,12 +249,9 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } - protected override fun dequeueNextTask(): Runnable? { - val unconfined = dequeueUnconfinedTask() + override fun processNextEvent(): Long { // unconfined events take priority - if (unconfined != null) { - return unconfined - } + if (processUnconfinedEvent()) return 0 // queue all delayed tasks that are due to be executed val delayed = _delayed.value if (delayed != null && !delayed.isEmpty) { @@ -284,11 +269,11 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { } } // then process one event from queue - return dequeue() - } - - override fun processNextEvent(): Long { - dequeueNextTask()?.run() + val task = dequeue() + if (task != null) { + task.run() + return 0 + } return nextTime } diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 62113fdd9a..359bd101e0 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -65,12 +65,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { if (!notifyStartup()) return while (true) { Thread.interrupted() // just reset interruption flag - val nextTask = dequeueNextTask() - if (nextTask != null) { - shutdownNanos = Long.MAX_VALUE - nextTask.run() - } - var parkNanos = nextTime + var parkNanos = processNextEvent() if (parkNanos == Long.MAX_VALUE) { // nothing to do, initialize shutdown timeout val now = nanoTime() @@ -78,7 +73,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { val tillShutdown = shutdownNanos - now if (tillShutdown <= 0) return // shut thread down parkNanos = parkNanos.coerceAtMost(tillShutdown) - } + } else + shutdownNanos = Long.MAX_VALUE if (parkNanos > 0) { // check if shutdown was requested and bail out in this case if (isShutdownRequested) return From 3c9621e6f6a82bc62f4acd89af4cc137ca505e4d Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Thu, 26 Mar 2020 12:48:48 +0300 Subject: [PATCH 6/7] Add a stress test for the DefaultExecutor worker shutting down --- .../jvm/src/DefaultExecutor.kt | 3 ++ .../jvm/test/DefaultExecutorStressTest.kt | 29 ++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index 359bd101e0..ed84f55e74 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -140,4 +140,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { resetAll() // clear queues (this as Object).notifyAll() } + + internal val isThreadPresent + get() = _thread != null } diff --git a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt index b4c6aaed4d..3e1d1f14d4 100644 --- a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt @@ -4,7 +4,8 @@ package kotlinx.coroutines -import org.junit.* +import org.junit.Test +import kotlin.test.* class DefaultExecutorStressTest : TestBase() { @Test @@ -35,4 +36,30 @@ class DefaultExecutorStressTest : TestBase() { } finish(2 + iterations * 4) } + + @Test + fun workerShutsDown() = withVirtualTimeSource { + val iterations = 1_000 * stressTestMultiplier + // wait for the worker to shut down + suspend fun awaitWorkerShutdown() { + val executorTimeoutMs = 1000L + delay(executorTimeoutMs) + while (DefaultExecutor.isThreadPresent) { delay(10) } // hangs if the thread refuses to stop + assertFalse(DefaultExecutor.isThreadPresent) // just to make sure + } + runTest { + awaitWorkerShutdown() // so that the worker shuts down after the initial launch + repeat (iterations) { + val job = launch(Dispatchers.Unconfined) { + // this line runs in the main thread + delay(1) + // this line runs in the DefaultExecutor worker + } + delay(100) // yield the execution, allow the worker to spawn + assertTrue(DefaultExecutor.isThreadPresent) // the worker spawned + job.join() + awaitWorkerShutdown() + } + } + } } From 16e7b74aa64c4c217d65fb16b5b3b3e5444fb875 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Fri, 27 Mar 2020 09:59:25 +0300 Subject: [PATCH 7/7] Fix --- kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt index 3e1d1f14d4..bc2de8c998 100644 --- a/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DefaultExecutorStressTest.kt @@ -38,7 +38,7 @@ class DefaultExecutorStressTest : TestBase() { } @Test - fun workerShutsDown() = withVirtualTimeSource { + fun testWorkerShutdown() = withVirtualTimeSource { val iterations = 1_000 * stressTestMultiplier // wait for the worker to shut down suspend fun awaitWorkerShutdown() {