-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Uninterruptible version of runBlocking #4384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Is ignoring a thread interruption a good practice? |
What about something like this? fun <T> runBlockingUninterruptible(context: CoroutineContext, block: CoroutineScope.() -> T): T {
// spawn on a separate thread backed by the Dispatchers.IO/Default thread pool
val deferred = GlobalScope.async(Dispatchers.IO.limitedParallelism(1)) {
runBlocking(context, block)
}
while (!deferred.isCompleted) {
try {
return runBlocking {
deferred.await()
}
} catch (_: InterruptedException) {
// either we got interrupted or `await()` finished with an `InterruptedException`
}
}
// we know the `deferred` has completed (maybe with an `InterruptedException`). Return the result.
return deferred.getCompleted()
} |
Ignoring thread interruption is not a good practice, which is why my suggestion here is to have an alternative / overload and definitely not making it the default. However there are valid use-cases where interruption is intended to be ignored.
Thanks for the suggestion @dkhalanskyjb! Similar to my snippet above the biggest downsize to this approach is that we end up using a new thread to do the work where as Here is an example test regarding the thread locals using your suggestion snippet: @Test
fun threadLocalTest() {
val tLocal = ThreadLocal<String>().apply { set("Test") }
runBlocking {
assertThat(tLocal.get()).isEqualTo("Test") // OK
}
runBlockingUninterruptible {
assertThat(tLocal.get()).isEqualTo("Test") // FAIL
}
tLocal.remove()
} A hacky idea I was thinking is to capture and sort of reuse the event loop dispatcher and use it for a global coroutine, like this: fun <T> runBlockingUninterruptible(block: suspend CoroutineScope.() -> T): T {
// flag to only launch once
val launched = AtomicBoolean(false)
// create a deferred for the result
val deferred = CompletableDeferred<Result<T>>()
while (!deferred.isCompleted) {
try {
return runBlocking {
// if we haven't yet, launch a global coroutine to run the block
if (launched.compareAndSet(false, true)) {
// get `runBlocking`'s event loop dispatcher (impl detail) so that
// the global coroutine uses exactly the same thread as the one being blocked
val dispatcher = coroutineContext[ContinuationInterceptor]!!
GlobalScope.launch(dispatcher) {
val result = runCatching { block() }
deferred.complete(result)
}
}
deferred.await().getOrThrow()
}
} catch (_: InterruptedException) {
// ignored
}
}
return deferred.getCompleted().getOrThrow()
} What do you think? |
Nope, this won't work, as the fun <T> runBlockingUninterruptible(block: suspend CoroutineScope.() -> T): T = runBlocking {
// the outer runBlocking will never suspend, its body will always be running, so it can not be interrupted
val dispatcher = coroutineContext[CoroutineDispatcher]!!
// create a deferred for the result
val deferred = CompletableDeferred<Result<T>>()
// the global coroutine uses exactly the same thread as the one being blocked
GlobalScope.launch(dispatcher) {
val result = runCatching { block() }
deferred.complete(result)
}
// blockingly process the task until it completes
while (!deferred.isCompleted) {
try {
return@runBlocking runBlocking(dispatcher) {
deferred.await().getOrThrow()
}
} catch (_: InterruptedException) {
// either `runBlocking` was interrupted or the `deferred` finished with this exception
}
}
deferred.getCompleted().getOrThrow()
} I haven't tested this, though! |
Thanks again for the suggestion Dmitry! This seems to be working based on our tests. I think I will be using this implementation to solve our immediate issue. In terms of the library offering the requested API ( |
If we introduce this in the form you're suggesting, I'm 100% certain someone will think In principle, I'm not against exposing some parts of |
Thanks again for the consideration and for the various suggestions. I do agree that on the long term the API as proposed is not great. I am happy we have a workaround, but I am also concern that we are relying in part on the internal event loop so my proposal for the API was to have guarantees. If the API guarantees can be meet in a different way that would be amazing, as you suggested by evolving some of the event loop to be public APIs and having it be part of an advance construct, probably opt-in in the beginning. I'll let you decided what to do with this issue specifically (rename it or close it), on our side we'll use the created |
The desire for stronger guarantees is clear. I personally am certain that nothing we do in the library will introduce incompatible changes to the pieces used in the final version of I propose the following: we add How does that sound? |
I wonder what the expected behavior of the following code would be. fun main() {
val t = thread {
runBlocking(NonCancellable) {
try {
awaitCancellation()
} finally {
println("Cancelled")
}
}
}
t.interrupt()
t.join()
println("Done")
} Maybe something isn't working as it should.
|
Yeah, I believe this should hang instead. |
Maybe the fix could easily solve @danysantiago's problem too.
|
The usage of As for adding tests to avoid breaking the version of |
There is no parent of |
The issue raised by @fvasco is worth looking into. Here's a more reasonable example that still misbehaves: https://pl.kotl.in/wPtqN2w7v
Thanks, would be nice, though I'd like to take a closer look at the |
@danysantiago, hi! With #4399, the implementation of |
Thanks @dkhalanskyjb! It looks good to me, I'm glad the fix also simplifies an uninterruptible version. |
Hi - Would it be possible to offer an overload / alternative version of
runBlocking
that is 'uninterruptible' for JVM, which means that upon the caller thread ofrunBlocking
being interrupted, thatrunBlocking
will not throw anInterruptedException
(relevant source code).Our reason for this request is that we migrated the internals of our libraries to Coroutines, and to keep things simple we have a 'pool of resources' that is suspending (acquiring the resources is suspending) but our APIs have both blocking and suspending versions, the blocking one being kept for compatibility reasons. We opted to use
runBlocking
to bridge between existing blocking functionality with Coroutines internals but it has introduced the behavior that if the caller thread is interrupted then aInterruptedException
is thrown where as before, interruption was ignored by our library. I can acknowledge that the behavior change is 'better' in the sense that once the thread is interrupted the Coroutine is cancelled and work is 'stopped' more quickly, reducing cycles wasted, but the at the same time I have to sympathise with our users who for a long time had never expected our library to throw this exception but now it does.It would greatly help us if there was a version of
runBlocking
that took an extra parameter namedinterruptible
, by default beingtrue
to keep current behavior, but that if set tofalse
it would simply reset the interrupt flag and not cancel the coroutine, therefore making it uninterruptible. Example:runBlocking(interruptible = false) { ... }
.I am also open to suggestions with how we could deal with this from our library, considerations so far have been:
InterruptedException
in a loop and simply retrying again:This is not great because the Coroutine could have executed enough such that repeating the operation would cause a failure. A more real world example is an insertion of a row in a database table with a unique key, repeating the operation would be a unique key constraint violation.
runBlockingUninterruptible
:This is also not great because there is a thread hop that didn't use to exist, and there is a lot of complexities with the thread pool utilized for the coroutine (might be limited). Even though
runBlocking
's event loop is an implementation detail, we would like to take advantage of it and enable re-using it.This is not possible since from a library standpoint we have no knowledge of what would be a 'good' default value. Our bridge also uses a type argument for its return type which further prevents determining the correct value to return. Ultimately it can also be misleading as further processing would be using an incorrect value.
Thanks!
The text was updated successfully, but these errors were encountered: