Skip to content

Uninterruptible version of runBlocking #4384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
danysantiago opened this issue Mar 13, 2025 · 16 comments
Closed

Uninterruptible version of runBlocking #4384

danysantiago opened this issue Mar 13, 2025 · 16 comments

Comments

@danysantiago
Copy link

danysantiago commented Mar 13, 2025

Hi - Would it be possible to offer an overload / alternative version of runBlocking that is 'uninterruptible' for JVM, which means that upon the caller thread of runBlocking being interrupted, that runBlocking will not throw an InterruptedException (relevant source code).

Our reason for this request is that we migrated the internals of our libraries to Coroutines, and to keep things simple we have a 'pool of resources' that is suspending (acquiring the resources is suspending) but our APIs have both blocking and suspending versions, the blocking one being kept for compatibility reasons. We opted to use runBlocking to bridge between existing blocking functionality with Coroutines internals but it has introduced the behavior that if the caller thread is interrupted then a InterruptedException is thrown where as before, interruption was ignored by our library. I can acknowledge that the behavior change is 'better' in the sense that once the thread is interrupted the Coroutine is cancelled and work is 'stopped' more quickly, reducing cycles wasted, but the at the same time I have to sympathise with our users who for a long time had never expected our library to throw this exception but now it does.

It would greatly help us if there was a version of runBlocking that took an extra parameter named interruptible, by default being true to keep current behavior, but that if set to false it would simply reset the interrupt flag and not cancel the coroutine, therefore making it uninterruptible. Example:
runBlocking(interruptible = false) { ... }.

I am also open to suggestions with how we could deal with this from our library, considerations so far have been:

  1. Catching the InterruptedException in a loop and simply retrying again:
while (true) {
  try {
    return runBlocking {
      // ...
    }
  } catch (ex: InterruptedException) {
    // ignored
  }
}

This is not great because the Coroutine could have executed enough such that repeating the operation would cause a failure. A more real world example is an insertion of a row in a database table with a unique key, repeating the operation would be a unique key constraint violation.

  1. Creating our own runBlockingUninterruptible:
internal fun <T> runBlockingUninterruptible(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
    val currentThread = Thread.currentThread()
    var result: Result<T> = Result.failure(IllegalStateException("Coroutine has not completed yet."))
    val coroutine = GlobalScope.launch(context) {
        result = runCatching { block() }
    }
    coroutine.invokeOnCompletion {
        LockSupport.unpark(currentThread)
    }
    var wasInterrupted = false
    while (!coroutine.isCompleted) {
        LockSupport.parkNanos(Long.MAX_VALUE)
        wasInterrupted = Thread.interrupted()
    }
    if (wasInterrupted) {
        Thread.interrupted()
    }
    return result.getOrThrow()
}

This is also not great because there is a thread hop that didn't use to exist, and there is a lot of complexities with the thread pool utilized for the coroutine (might be limited). Even though runBlocking's event loop is an implementation detail, we would like to take advantage of it and enable re-using it.

  1. Catch the exception and return some default value:

This is not possible since from a library standpoint we have no knowledge of what would be a 'good' default value. Our bridge also uses a type argument for its return type which further prevents determining the correct value to return. Ultimately it can also be misleading as further processing would be using an incorrect value.

Thanks!

@danysantiago danysantiago changed the title Uninterruptible version runBlocking Uninterruptible version of runBlocking Mar 13, 2025
@fvasco
Copy link
Contributor

fvasco commented Mar 13, 2025

Is ignoring a thread interruption a good practice?
So, should it be promoted in this library?

@dkhalanskyjb
Copy link
Collaborator

What about something like this?

fun <T> runBlockingUninterruptible(context: CoroutineContext, block: CoroutineScope.() -> T): T {
    // spawn on a separate thread backed by the Dispatchers.IO/Default thread pool
    val deferred = GlobalScope.async(Dispatchers.IO.limitedParallelism(1)) {
        runBlocking(context, block)
    }
    while (!deferred.isCompleted) {
        try {
            return runBlocking {
                deferred.await()
            }
        } catch (_: InterruptedException) {
            // either we got interrupted or `await()` finished with an `InterruptedException`
        }
    }
    // we know the `deferred` has completed (maybe with an `InterruptedException`). Return the result.
    return deferred.getCompleted()
}

@danysantiago
Copy link
Author

danysantiago commented Mar 13, 2025

Is ignoring a thread interruption a good practice?

Ignoring thread interruption is not a good practice, which is why my suggestion here is to have an alternative / overload and definitely not making it the default. However there are valid use-cases where interruption is intended to be ignored.

What about something like this?
fun runBlockingUninterruptible(context: CoroutineContext, block: CoroutineScope.() -> T): T {

Thanks for the suggestion @dkhalanskyjb! Similar to my snippet above the biggest downsize to this approach is that we end up using a new thread to do the work where as runBlocking will install an event loop and will attempt to use the blocked thread to dispatch the Coroutine. This has the benefit of installed thread locals staying the same and is as close as possible to executing a coroutine 'blocking-ly'.

Here is an example test regarding the thread locals using your suggestion snippet:

@Test
fun threadLocalTest() {
    val tLocal = ThreadLocal<String>().apply { set("Test") }
    runBlocking {
        assertThat(tLocal.get()).isEqualTo("Test") // OK
    }
    runBlockingUninterruptible {
        assertThat(tLocal.get()).isEqualTo("Test") // FAIL
    }
    tLocal.remove()
}

A hacky idea I was thinking is to capture and sort of reuse the event loop dispatcher and use it for a global coroutine, like this:

fun <T> runBlockingUninterruptible(block: suspend CoroutineScope.() -> T): T {
    // flag to only launch once
    val launched = AtomicBoolean(false)
    // create a deferred for the result
    val deferred = CompletableDeferred<Result<T>>()
    while (!deferred.isCompleted) {
        try {
            return runBlocking {
                // if we haven't yet, launch a global coroutine to run the block
                if (launched.compareAndSet(false, true)) {
                    // get `runBlocking`'s event loop dispatcher (impl detail) so that
                    // the global coroutine uses exactly the same thread as the one being blocked
                    val dispatcher = coroutineContext[ContinuationInterceptor]!!
                    GlobalScope.launch(dispatcher) {
                        val result = runCatching { block() }
                        deferred.complete(result)
                    }
                }
                deferred.await().getOrThrow()
            }
        } catch (_: InterruptedException) {
            // ignored
        }
    }
    return deferred.getCompleted().getOrThrow()
}

What do you think?

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Mar 17, 2025

Nope, this won't work, as the dispatcher you're acquiring will be invalid after the runBlocking finishes once. After that, a cleanup thread will be used to continue execution. Though this does give me an idea:

fun <T> runBlockingUninterruptible(block: suspend CoroutineScope.() -> T): T = runBlocking {
    // the outer runBlocking will never suspend, its body will always be running, so it can not be interrupted
    val dispatcher = coroutineContext[CoroutineDispatcher]!!
    // create a deferred for the result
    val deferred = CompletableDeferred<Result<T>>()
    // the global coroutine uses exactly the same thread as the one being blocked
    GlobalScope.launch(dispatcher) {
        val result = runCatching { block() }
        deferred.complete(result)
    }
    // blockingly process the task until it completes
    while (!deferred.isCompleted) {
        try {
            return@runBlocking runBlocking(dispatcher) {
                deferred.await().getOrThrow()
            }
        } catch (_: InterruptedException) {
            // either `runBlocking` was interrupted or the `deferred` finished with this exception
        }
    }
    deferred.getCompleted().getOrThrow()
}

I haven't tested this, though!

@danysantiago
Copy link
Author

Thanks again for the suggestion Dmitry! This seems to be working based on our tests. I think I will be using this implementation to solve our immediate issue.

In terms of the library offering the requested API (runBlocking(interruptible = false) { ... }), is it still up for consideration?

@dkhalanskyjb
Copy link
Collaborator

If we introduce this in the form you're suggesting, I'm 100% certain someone will think interruptible = false makes their code inside runBlocking non-interruptible. I've seen people thinking that even ReentrantLock.lock() ensures that the code won't be interrupted before unlock() (example: https://stackoverflow.com/questions/72844502/java-code-calling-lock-lock-not-lock-interruptibly-still-gets-interrupted), and there were many similar questions regarding the basic principles of interruptions in our issue tracker and Slack channels. Apparently, interruptions cause much confusion.

In principle, I'm not against exposing some parts of runBlocking (like allowing manually starting and controlling an event loop, for example), though this would need to be a scary-looking and technical API to avoid confusing people into thinking they need it. So, to answer your question, no, I don't think we'll be adding 'runBlocking(interruptible = false)`.

@danysantiago
Copy link
Author

Thanks again for the consideration and for the various suggestions.

I do agree that on the long term the API as proposed is not great. I am happy we have a workaround, but I am also concern that we are relying in part on the internal event loop so my proposal for the API was to have guarantees. If the API guarantees can be meet in a different way that would be amazing, as you suggested by evolving some of the event loop to be public APIs and having it be part of an advance construct, probably opt-in in the beginning.

I'll let you decided what to do with this issue specifically (rename it or close it), on our side we'll use the created runBlockingUninterruptible for now and we'll monitor how it performs.

@dkhalanskyjb
Copy link
Collaborator

The desire for stronger guarantees is clear. I personally am certain that nothing we do in the library will introduce incompatible changes to the pieces used in the final version of runBlockingInterruptible, but of course, we don't know what we don't know.

I propose the following: we add runBlockingInterruptible to our own test suit; we will avoid checking in any code that breaks the test; if breaking runBlockingInterruptible is somehow unavoidable, we'll notify you well in advance (what's the best way to contact your team?) and look for migration options.

How does that sound?

@fvasco
Copy link
Contributor

fvasco commented Mar 21, 2025

I wonder what the expected behavior of the following code would be.

fun main() {
    val t = thread {
        runBlocking(NonCancellable) {
            try {
                awaitCancellation()
            } finally {
                println("Cancelled")
            }
        }
    }
    t.interrupt()
    t.join()
    println("Done")
}

Maybe something isn't working as it should.

Exception in thread "Thread-0" java.lang.InterruptedException
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:94)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:69)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at MainKt.main$lambda$0(Main.kt:8)
	at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:30)
Done

@dkhalanskyjb
Copy link
Collaborator

Yeah, I believe this should hang instead. runBlocking's documentation says: "Runs a new coroutine and blocks the current thread until its completion." In your example, we violate this contract.

@fvasco
Copy link
Contributor

fvasco commented Mar 21, 2025

Maybe the fix could easily solve @danysantiago's problem too.

runBlockingUninterruptible = runBlocking(NonCancellable)

@danysantiago
Copy link
Author

The usage of NonCancellable with runBlocking is interesting but I'm not sure it violates the contract, the warning on its documentation is clear '... is not designed to be used with launch, async, and other coroutine builders.'. It causes the coroutine block to ignore cancellation (it has to be cooperative) which can happen with other constructs such as with suspendCoroutine.

As for adding tests to avoid breaking the version of runBlockingInterruptible discussed, that sounds pretty good, I am happy to make the tests contributions if you let me.

@fvasco
Copy link
Contributor

fvasco commented Mar 21, 2025

There is no parent of runBlocking's Job.
However, fixing the above issue (it is present even without NonCancellable) forces the thread to work until the coroutine's termination, then a InterruptedException should be thrown.

@dkhalanskyjb
Copy link
Collaborator

The issue raised by @fvasco is worth looking into. Here's a more reasonable example that still misbehaves: https://pl.kotl.in/wPtqN2w7v

I am happy to make the tests contributions if you let me.

Thanks, would be nice, though I'd like to take a closer look at the NonCancellable issue first.

@dkhalanskyjb
Copy link
Collaborator

@danysantiago, hi! With #4399, the implementation of runBlockingUninterruptible simplifies to this: da419c4, built from reasonable, reliable APIs. The test suite seems comprehensive to me, but if you'd like to suggest some other tests, please feel free to leave a pull request review.

@danysantiago
Copy link
Author

Thanks @dkhalanskyjb! It looks good to me, I'm glad the fix also simplifies an uninterruptible version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants