Skip to content

collectLatest job is not cancelled when blocked #3679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
dovchinnikov opened this issue Mar 13, 2023 · 3 comments
Open

collectLatest job is not cancelled when blocked #3679

dovchinnikov opened this issue Mar 13, 2023 · 3 comments

Comments

@dovchinnikov
Copy link
Contributor

dovchinnikov commented Mar 13, 2023

In IJ we have a blocking API which reacts on its own cancellation handles. I'm attaching invokeOnCompletion(onCancelling = true) handler to cancel IJ own cancellation handle, but the invokeOnCompletion handler is never invoked despite both emitter and collector being executed in Dispatchers.Default, and collector launches a standalone coroutine for each collected element.

https://pl.kotl.in/hnYvONrvy

Provide a Reproducer

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.first

fun main() {
  runBlocking(Dispatchers.Default) {
    val flow = MutableSharedFlow<Int>()
    val collector = launch {
      flow.collectLatest { value ->
        @OptIn(InternalCoroutinesApi::class)
        currentCoroutineContext().job.invokeOnCompletion(onCancelling = true, handler = {
          if (it == null) {
            println("$value completed")
          }
          else {
            println("$value canceled; t: $it")
          }
        })
        Thread.sleep(100)
        println("$value processed")
      }
    }
    flow.subscriptionCount.first { it == 1 }
    repeat(3) {
      flow.emit(it)
    }
    collector.cancel()
  }
}

Expected:

0 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@xxxxxx
0 processed
1 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@xxxxxx
1 processed
2 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@5c79719e
2 processed

Actual:

0 processed
0 completed
1 processed
1 completed
2 canceled; t: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@5c79719e
2 processed
@qwwdfsad
Copy link
Collaborator

A small note: it's reproducible on 1.6.4 coroutines

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Mar 14, 2023

Expected

This scenario is highly concurrent, so there is a chance { value -> block is cancelled even before it has a chance to execute. So, in fact, any of the following scenarios should be expected:

Traces
0 canceled; t: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
0 processed
1 canceled; t: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
1 processed
0 canceled; t: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
0 processed
2 canceled; t: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
2 processed

Or any other combination. The only requirement is that every "processed" should be paired with "cancelled"

Regarding the bug -- it has the very same reason as #3109 and why #3168 is not yet stable.
One more reason to change the current behaviour, thanks for the report

@qwwdfsad qwwdfsad added the flow label Mar 14, 2023
@dovchinnikov
Copy link
Contributor Author

block is cancelled even before it has a chance to execute

Correct, sorry I've omitted this. The point that the Job is not even cancelled.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants