-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closing newFixedThreadPoolContext on K/N may lead to an application crash #3578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Both me and K/N team feel reluctant about adding any new API to Workers (which we even may mark as obsolete, see https://youtrack.jetbrains.com/issue/KT-54702) and it doesn't seem like we can fix this issue in a reasonable manner without diving into internals. Yet the bug is really unpleasant and eventually, it will start hurting people. |
Option 1I experimented a bit, but have no actual understanding. Here's the test that I was using to arrive at my conclusions: fun testRunBlocking() {
val worker = Worker.start()
val latch1 = Channel<Unit>()
val latch2 = Channel<Unit>()
val finished = atomic(false)
println("Before execution")
worker.executeAfter(0L) {
try {
runBlocking {
println("Inside ${Worker.current}: initial")
latch1.send(Unit)
println("Inside ${Worker.current}: sent the notification, sleeping...")
latch2.receive()
println("Unreachable ${Worker.current}: the worker can't wake up any more")
}
} catch (e: IllegalStateException) {
println("Got the expected exception $e")
} finally {
println("The `finally` block ran successfully")
finished.value = true
}
}
while(true) {
if (latch1.tryReceive().isSuccess) {
break
}
}
worker.requestTermination()
runBlocking {
delay(1000)
println("Trying to wake the worker up")
try {
latch2.send(Unit)
} catch (_: Throwable) {
// ignore
}
}
println("Awoke the worker")
while(true) {
if (finished.value) {
break
}
}
} Even after the worker is terminated, someone will still begrudgingly execute the tasks. In particular, someone does run the finalizer. Trying to query So, for some reason, forcing the worker to do more tasks is not incorrect. Therefore, trying to do this shouldn't throw an exception. Therefore, we should catch Option 2If the fact that the finalizer runs in the example above is not guaranteed, it's an error in itself. In that case, we must keep the worker alive with additional scheduled tasks until the coroutine is completed. We could do this by scheduling a no-op every 10 ms or so. |
After an internal discussion, I ran some more experiments. One finding: Workers can survive more than one suspension after they are terminated: fun testRunBlockingWithFinallyInside() {
val worker = Worker.start()
val latch1 = Channel<Unit>()
val latch2 = Channel<Unit>()
val latch3 = Channel<Unit>()
val finished = atomic(false)
println("Before execution")
worker.executeAfter(0L) {
try {
runBlocking {
println("Inside ${Worker.current}: initial")
latch1.send(Unit)
println("Inside ${Worker.current}: sent the notification, sleeping...")
try {
latch2.receive()
println("Unreachable ${Worker.current}: the worker can't wake up any more")
} finally {
println("The inner `finally` block ran successfully. Trying to provoke it again...")
latch3.receive()
}
}
} catch (e: IllegalStateException) {
println("Got the expected exception $e")
} finally {
println("The `finally` block ran successfully")
finished.value = true
}
}
runBlocking {
latch1.receive()
worker.requestTermination()
delay(1000)
println("Trying to wake the worker up")
try {
latch2.send(Unit)
} catch (_: Throwable) {
// ignore
}
try {
latch3.send(Unit)
} catch (_: Throwable) {
// ignore
}
}
println("Awoke the worker")
while(true) {
if (finished.value) {
break
}
}
} Trying to suspend after an exception was caught will lead to the continuation being lost, and the test will hang. Another finding: option 2 does work! fun testWorkerKeepAlive() {
val worker = Worker.start()
val latch1 = Channel<Unit>()
worker.executeAfter(0) {
var i = 0
fun keepAlive() {
worker.executeAfter(1000) {
keepAlive()
}
}
keepAlive()
runBlocking {
latch1.send(Unit)
while (true) {
println("Inside ${Worker.current}: $i")
delay(100)
i++
}
}
}
runBlocking {
latch1.receive()
worker.requestTermination()
var i = 0
while (true) {
println("Outside: $i")
delay(100)
i++
}
}
}
Despite being terminated, the worker keeps on chugging for minutes. |
Fixed by #3585 |
We have a non-trivial bug on the edge of
MultiWorkerDispatcher
shutdown sequence and our channels' linearizability.newFixedThreadPoolContext
using the channel as a substitute for blocking queueWorker.requestTermination
for each existing worker in the poolChannels.close
is linearizable via helping technique: when any channel's operation detectsclose
operation in progress, it starts helping it -- it processes a list of existing enqueued waiters about the close andresume
s them.3.1) Helping is "non-atomic", meaning that first helper removes the enqueued waiter from the internal list and then it resumes it.
Now consider the following scenario:
[T1]
close
starts closing the channel[T2] one of the workers start helping, removes single waiter from the queue, gets preempted
[T1]
close
finishes -- there are no more waiters, channel is in its final state[T1] All the workers are closed via
requestTermination
. There is no work for workers, so they are terminated[T2] finally gets scheduled back and invokes
resume
on the waiter, leading toWorker.executeAfter
on an already destroyed workerRelevant stacktrace:
The text was updated successfully, but these errors were encountered: