Skip to content

Closing newFixedThreadPoolContext on K/N may lead to an application crash #3578

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
qwwdfsad opened this issue Jan 5, 2023 · 4 comments
Closed

Comments

@qwwdfsad
Copy link
Member

qwwdfsad commented Jan 5, 2023

We have a non-trivial bug on the edge of MultiWorkerDispatcher shutdown sequence and our channels' linearizability.

  1. All tasks are dispatched within newFixedThreadPoolContext using the channel as a substitute for blocking queue
  2. Shutdown sequence if dispatcher first closes the channel, then invokes Worker.requestTermination for each existing worker in the pool
  3. Channels.close is linearizable via helping technique: when any channel's operation detects close operation in progress, it starts helping it -- it processes a list of existing enqueued waiters about the close and resumes them.
    3.1) Helping is "non-atomic", meaning that first helper removes the enqueued waiter from the internal list and then it resumes it.

Now consider the following scenario:

[T1] close starts closing the channel
[T2] one of the workers start helping, removes single waiter from the queue, gets preempted
[T1] close finishes -- there are no more waiters, channel is in its final state
[T1] All the workers are closed via requestTermination. There is no work for workers, so they are terminated
[T2] finally gets scheduled back and invokes resume on the waiter, leading to Worker.executeAfter on an already destroyed worker

Relevant stacktrace:

Uncaught Kotlin exception: kotlin.IllegalStateException: Worker is already terminated
Invalid connection: com.apple.coresymbolicationd
    at 0   workerWithNewMM.kexe                0x1040bae7d        ThrowWorkerAlreadyTerminated + 157 (/opt/buildAgent/work/5f69639f351c4725/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/native/concurrent/Internal.kt:68:15)
    at 1   workerWithNewMM.kexe                0x1040bdd4f        kfun:kotlin.native.concurrent.Worker#executeAfter(kotlin.Long;kotlin.Function0<kotlin.Unit>){} + 639 (/opt/buildAgent/work/5f69639f351c4725/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/native/concurrent/Worker.kt:125:9)
    at 2   workerWithNewMM.kexe                0x10416097c        kfun:kotlinx.coroutines.EventLoopImplBase#enqueue(kotlinx.coroutines.Runnable){} + 140 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/EventLoop.common.kt:291:13)
Child process terminated with signal 6: Abort trap
    at 3   workerWithNewMM.kexe                0x104156c10        kfun:kotlinx.coroutines.CancellableContinuationImpl.dispatchResume#internal + 688 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt:<unknown>)
    at 4   workerWithNewMM.kexe                0x104157474        kfun:kotlinx.coroutines.CancellableContinuationImpl.resumeImpl#internal + 564 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt:451:21)
    at 5   workerWithNewMM.kexe                0x10415758d        kfun:kotlinx.coroutines.CancellableContinuationImpl#resumeImpl$default(kotlin.Any?;kotlin.Int;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?;kotlin.Int){} + 141 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt:440:13)
    at 6   workerWithNewMM.kexe                0x1041561e7        kfun:kotlinx.coroutines.CancellableContinuationImpl#resumeWith(kotlin.Result<1:0>){} + 471 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt:348:9)
    at 7   workerWithNewMM.kexe                0x1041871b1        kfun:kotlinx.coroutines.channels.BufferedChannel.closeWaiter#internal + 1441 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt:1753:37)
    at 8   workerWithNewMM.kexe                0x104185fb4        kfun:kotlinx.coroutines.channels.BufferedChannel.completeClose#internal + 2500 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt:1739:42)
    at 9   workerWithNewMM.kexe                0x104187802        kfun:kotlinx.coroutines.channels.BufferedChannel.isClosed#internal + 146 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt:<unknown>)
    at 10  workerWithNewMM.kexe                0x104183b92        kfun:kotlinx.coroutines.channels.BufferedChannel.BufferedChannelIterator.$hasNextCOROUTINE$2784#invokeSuspend(kotlin.Result<kotlin.Any?>){}kotlin.Any? + 418 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt:<unknown>)
    at 11  workerWithNewMM.kexe                0x104184846        kfun:kotlinx.coroutines.channels.BufferedChannel.BufferedChannelIterator#hasNext(){}kotlin.Boolean + 198 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt:1395:26)
@qwwdfsad qwwdfsad self-assigned this Jan 5, 2023
@qwwdfsad qwwdfsad changed the title Closing newFixedThreadPoolContext on K/N may lead to an application Closing newFixedThreadPoolContext on K/N may lead to an application crash Jan 5, 2023
@qwwdfsad
Copy link
Member Author

qwwdfsad commented Jan 6, 2023

Both me and K/N team feel reluctant about adding any new API to Workers (which we even may mark as obsolete, see https://youtrack.jetbrains.com/issue/KT-54702) and it doesn't seem like we can fix this issue in a reasonable manner without diving into internals.

Yet the bug is really unpleasant and eventually, it will start hurting people.
The easiest solution I know, and I'm really sorry about that, is to catch IllegalStateException and ignore it in this particular piece of code. @dkhalanskyjb WDYT?

@dkhalanskyjb
Copy link
Collaborator

Option 1

I experimented a bit, but have no actual understanding. Here's the test that I was using to arrive at my conclusions:

fun testRunBlocking() {
    val worker = Worker.start()
    val latch1 = Channel<Unit>()
    val latch2 = Channel<Unit>()
    val finished = atomic(false)
    println("Before execution")
    worker.executeAfter(0L) {
        try {
            runBlocking {
                println("Inside ${Worker.current}: initial")
                latch1.send(Unit)
                println("Inside ${Worker.current}: sent the notification, sleeping...")
                latch2.receive()
                println("Unreachable ${Worker.current}: the worker can't wake up any more")
            }
        } catch (e: IllegalStateException) {
            println("Got the expected exception $e")
        } finally {
            println("The `finally` block ran successfully")
            finished.value = true
        }
    }
    while(true) {
        if (latch1.tryReceive().isSuccess) {
            break
        }
    }
    worker.requestTermination()
    runBlocking {
        delay(1000)
        println("Trying to wake the worker up")
        try {
            latch2.send(Unit)
        } catch (_: Throwable) {
            // ignore
        }
    }
    println("Awoke the worker")
    while(true) {
        if (finished.value) {
            break
        }
    }
}

Even after the worker is terminated, someone will still begrudgingly execute the tasks. In particular, someone does run the finalizer. Trying to query Worker.current().toString() in the finalizer throws the "worker was already terminated" exception, so, likely, it's the worker itself that still executes the code.

So, for some reason, forcing the worker to do more tasks is not incorrect. Therefore, trying to do this shouldn't throw an exception. Therefore, we should catch IllegalStateException in kotlinx.coroutines.EventLoopImplPlatform#unpark. Was this what you meant?

Option 2

If the fact that the finalizer runs in the example above is not guaranteed, it's an error in itself. In that case, we must keep the worker alive with additional scheduled tasks until the coroutine is completed. We could do this by scheduling a no-op every 10 ms or so.

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Jan 10, 2023

After an internal discussion, I ran some more experiments.

One finding: Workers can survive more than one suspension after they are terminated:

    fun testRunBlockingWithFinallyInside() {
        val worker = Worker.start()
        val latch1 = Channel<Unit>()
        val latch2 = Channel<Unit>()
        val latch3 = Channel<Unit>()
        val finished = atomic(false)
        println("Before execution")
        worker.executeAfter(0L) {
            try {
                runBlocking {
                    println("Inside ${Worker.current}: initial")
                    latch1.send(Unit)
                    println("Inside ${Worker.current}: sent the notification, sleeping...")
                    try {
                        latch2.receive()
                        println("Unreachable ${Worker.current}: the worker can't wake up any more")
                    } finally {
                        println("The inner `finally` block ran successfully. Trying to provoke it again...")
                        latch3.receive()
                    }
                }
            } catch (e: IllegalStateException) {
                println("Got the expected exception $e")
            } finally {
                println("The `finally` block ran successfully")
                finished.value = true
            }
        }
        runBlocking {
            latch1.receive()
            worker.requestTermination()
            delay(1000)
            println("Trying to wake the worker up")
            try {
                latch2.send(Unit)
            } catch (_: Throwable) {
                // ignore
            }
            try {
                latch3.send(Unit)
            } catch (_: Throwable) {
                // ignore
            }
        }
        println("Awoke the worker")
        while(true) {
            if (finished.value) {
                break
            }
        }
    }

Trying to suspend after an exception was caught will lead to the continuation being lost, and the test will hang.

Another finding: option 2 does work!

fun testWorkerKeepAlive() {
    val worker = Worker.start()
    val latch1 = Channel<Unit>()
    worker.executeAfter(0) {
        var i = 0
        fun keepAlive() {
            worker.executeAfter(1000) {
                keepAlive()
            }
        }
        keepAlive()
        runBlocking {
            latch1.send(Unit)
            while (true) {
                println("Inside ${Worker.current}: $i")
                delay(100)
                i++
            }
        }
    }
    runBlocking {
        latch1.receive()
        worker.requestTermination()
        var i = 0
        while (true) {
            println("Outside: $i")
            delay(100)
            i++
        }
    }
}

Despite being terminated, the worker keeps on chugging for minutes.

@qwwdfsad
Copy link
Member Author

qwwdfsad commented Mar 9, 2023

Fixed by #3585

@qwwdfsad qwwdfsad closed this as completed Mar 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants