Skip to content

Prevent runBlocking failure when the Worker it runs on terminates #3585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions kotlinx-coroutines-core/native/src/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public actual fun <T> runBlocking(context: CoroutineContext, block: suspend Coro
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
coroutine.keepAlive()
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
Expand All @@ -62,6 +63,18 @@ private class BlockingCoroutine<T>(
) : AbstractCoroutine<T>(parentContext, true, true) {
private val joinWorker = Worker.current

/**
* Send a ping to the worker to prevent it from terminating while this coroutine is running,
* ensuring that continuations don't get dropped and forgotten.
*/
fun keepAlive() {
Worker.current.executeAfter(afterMicroseconds = 100_000) {
if (!isCompleted) {
keepAlive()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really fond of this change.
It has really weird semantics "let's invoke something on a regular basis to workaround the issue that only reproduces during termination", it interferes with nested runBlockings (producing probably unnecessary tasks) and opens a can of worms for "what happens if 'start' or 'joinBlocking' throw system exception (e.g. OOM)". For now it seems it will create a task that is just stuck forever

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all good points, I fixed the code accordingly.

Regarding the weird semantics, I don't see anything weird about it. You could say, "to workaround the issue that only reproduces during termination" about any mechanism that recognizes something canceling or closing, yet such code is common. Here, we're just explicitly making sure the worker knows further work will be expected from it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, "keepalive" is a common term denoting exactly this approach, though from a different IT area: https://en.wikipedia.org/wiki/Keepalive

}
}
}

override val isScopedCoroutine: Boolean get() = true

override fun afterCompletion(state: Any?) {
Expand Down
28 changes: 28 additions & 0 deletions kotlinx-coroutines-core/native/test/WorkerTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*
import kotlin.test.*
Expand Down Expand Up @@ -34,4 +35,31 @@ class WorkerTest : TestBase() {
}.result
worker.requestTermination()
}

/**
* Test that [runBlocking] does not crash after [Worker.requestTermination] is called on the worker that runs it.
*/
@Test
fun testRunBlockingInTerminatedWorker() {
val workerInRunBlocking = Channel<Unit>()
val workerTerminated = Channel<Unit>()
val checkResumption = Channel<Unit>()
val finished = Channel<Unit>()
val worker = Worker.start()
worker.executeAfter(0) {
runBlocking {
workerInRunBlocking.send(Unit)
workerTerminated.receive()
checkResumption.receive()
finished.send(Unit)
}
}
runBlocking {
workerInRunBlocking.receive()
worker.requestTermination()
workerTerminated.send(Unit)
checkResumption.send(Unit)
finished.receive()
}
}
}