Skip to content

collectLatest does not cancel runInterruptible if dispatchers are the same #3109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
aasitnikov opened this issue Dec 23, 2021 · 8 comments
Assignees
Labels

Comments

@aasitnikov
Copy link

aasitnikov commented Dec 23, 2021

val flow = flowOf(1, 2)
launch(Dispatchers.IO) {
    flow.collectLatest {
        println("start $it")
        runInterruptible(Dispatchers.IO) {
            Thread.sleep(5000)
            println("Finish interruptible")
        }
        if (it == 1) throw Exception("unexpected")
    }
}

I expect this code to print
start 1
start 2
finish interruptible

But instead it prints
start 1
finish interruptible
exception java.lang.Exception: unexpected

@qwwdfsad qwwdfsad self-assigned this Jan 10, 2022
@qwwdfsad
Copy link
Collaborator

Thanks for the report!

The problem is straightforward -- runInterruptible uses withContext(...) under the hood, which short-circuits the execution directly, skipping a dispatch if the specified dispatcher is equal to the current one, and effectively blocking the current thread which emits values to the collector.

It is a perfectly sound behaviour for the withContext itself, but I'm not sure about runInterruptible.
On the one hand -- it creates potentially puzzling behaviour for cases like this.
On the other hand, the change may significantly worsen the performance of the interruption-friendly code written in an idiomatic manner.

For example, it's easy to have the following in the project:

fun readFile() = runInterruptinble { ... }
fun writeFile() = runInterruptible { ... }
fun otherIoStuff() = runInterruptible { ... }

// Use-site, the caller knows that these files are in cache and access is fast
withContext(Dispatchers.IO) {
    repeat(N) { 
        readFile()
        writeFile()
    }
}

And with the change that fixes the current behaviour, the code will do up to 2 * N context switches that may dominate in the execution time.

Taking it into the account, I tend to think the behaviour is "as designed", but I'm open to other opinions.

Could you please elaborate on how serious is this issue for your use case? Was it hard to find and/or surprising?

@aasitnikov
Copy link
Author

In my app I have a feature to download some files from network. It is implemented by having StateFlow<List<DownloadFile>> and a coroutine that collects this flow and calls suspend fun downloadFile() for list.first() of emitted list. Flow is collected with collectLatest to cancel the downloadFile() if user cancels downloading. suspend fun downloadFile() uses runInterruptible(IO) { ... }, because actual file loading is implemented as interruptible blocking IO.

It worked perfectly fine, because collecting coroutine was launched in a shared single thread dispatcher. But after some refactoring this dispatcher was changed to Dispatchers.IO. This change lead to downloading not being cancelled, and it looked strange - on UI files had progress bar spinning, after pressing cancel button progress bars for all files disappeared as expected, but after a few seconds one file suddenly changed its status to downloaded. After some debugging I figured that collectLatest lambda wasn't cancelling, and at first my suspicion fell on dispatcher change from single thread to IO, which was right. This behaviour was surprising, but reading withContext's sources helped me understand why this was happening.

I tried to add additional dispatch doing coroutineScope { launch { } } before downloadFile(), but I thought it would be better to use new limitedParalellism() api - in that case withContext() started to make dispatch itself because runInterruptible and collectLatest dispatchers became different.

I think it's at least worth mentioning somewhere in docs about this behaviour - that if dispatchers are the same, then block will be started undispatched.

@dkhalanskyjb
Copy link
Collaborator

Looks like runInterruptible works fine here, but the behavior of collectLatest is surprising. Given a multithreaded dispatcher, I'd intuitively expect it to attempt to produce new elements concurrently with running action.

@qwwdfsad
Copy link
Collaborator

It actually attempts to do so except the very first emit, see:
https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/flow/internal/Merge.kt#L33

Fixing this may lead to potentially (?) unexpected behaviour in single-threaded dispatchers (namely, runBlocking and Dispatchers.Main):
flowOf(1, 2, 3).collectLatest { ... } will now receive only the last element instead of all in the absence of suspension points.

@dkhalanskyjb
Copy link
Collaborator

Would this really be unexpected? If we want the latest things (and we are collecting the latest)—why waste our time with things that aren't the latest?

An argument can be made though that a hypothetical collectLatest2 that always dispatches can be emulated with collectLatest { yield(); action(it) } (and possibly conflate().collectLatest { action(it) }?), but not vice versa. So, if we did change the behavior, we would be reducing the set of available behaviors.

yorickhenning pushed a commit to yorickhenning/kotlinx.coroutines that referenced this issue Jan 28, 2022
dee-tree pushed a commit to dee-tree/kotlinx.coroutines that referenced this issue Jul 21, 2022
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
@syt0r
Copy link

syt0r commented Feb 14, 2023

Hey. I'm facing this issue. In my use case I'm dynamically searching for what user is typing into text input. I'm processing search with Channel and consumeAsFlow function that contains runInterruptible invocation and since I care only about latest search I wanted to discard unfinished searches, but seems like interruption doesn't work inside of xxxLatest

Personally collectLatest name implies that everything except of latest available value should be interrupted, but it does not work like this with runInterruptible, tried with buffered channels and randevouz + drop_latest without avail. Any ideas how to

@syt0r
Copy link

syt0r commented Feb 14, 2023

There is also 2 things I noticed from my example below (kotlin 1.7.10):

  1. All values are collected even if some of them are getting interrupted immediately after collecting lambda starts to execute, I think they should be skipped, happens with both buffered and rendevouz + drop_oldest channels
  2. Although coroutines are interrupted with cancellationexception it takes 6 seconds to do it. Considering 1 point it probably happens right before or right after runInterruptible invocation

Snippet:

// Initialization of channel
    private val searchQueriesChannel = Channel<String>(Channel.BUFFERED)
// Handling of updates
viewModelScope.launch  {
 searchQueriesChannel.consumeAsFlow()
            .collectLatest { input ->
                Logger.d("start searching for $input")
                state.value = state.value.copy(isLoading = true)
                kotlin.runCatching {
                    val result = runInterruptible(coroutineContext + Dispatchers.IO) {
                        Logger.d("processing input $input in background")
                        processInputUseCase.process(input)
                    }
                    Logger.d("finished searching for $input")
                    state.value = result
                }.onFailure { Logger.d("search for $input was interrupted") }
            }
}
// Sending new values
        Logger.d("sending input $input")
        searchQueriesChannel.trySend(input)

Here is an output

00:39:48.592  D  main SearchViewModel:search sending input か
00:39:48.592  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend start searching for か
00:39:48.593  D  DefaultDispatcher-worker-1 SearchViewModel$handleSearchQueries$1$1$1$result$1:invoke processing input か in background
00:39:49.312  D  main SearchViewModel:search sending input かた
00:39:50.024  D  main SearchViewModel:search sending input かたか
00:39:50.808  D  main SearchViewModel:search sending input かたかな
00:39:56.018  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend search for か was interrupted
00:39:56.019  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend start searching for かた
00:39:56.019  D  DefaultDispatcher-worker-1 SearchViewModel$handleSearchQueries$1$1$1$result$1:invoke processing input かた in background
00:39:56.289  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend search for かた was interrupted
00:39:56.289  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend start searching for かたか
00:39:56.290  D  DefaultDispatcher-worker-1 SearchViewModel$handleSearchQueries$1$1$1$result$1:invoke processing input かたか in background
00:39:56.306  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend search for かたか was interrupted
00:39:56.307  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend start searching for かたかな
00:39:56.307  D  DefaultDispatcher-worker-1 SearchViewModel$handleSearchQueries$1$1$1$result$1:invoke processing input かたかな in background
00:39:56.320  D  main SearchViewModel$handleSearchQueries$1$1:invokeSuspend finished searching for かたかな

@danielesegato
Copy link

It seems to me the issue here is with the runInterruptible rather than collectLatest.

The thread blocking behavior is what causes this when the same dispatcher is used. Couldn't you just add a flag to the invocation to allow / disallow undispatched behavior so that the programmer can pick the right one for the use case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants