Skip to content

Calling Flow.stateIn with a cancelled scope suspends forever #4322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
francescotescari opened this issue Jan 6, 2025 · 4 comments
Closed

Comments

@francescotescari
Copy link
Contributor

Describe the bug

Calling Flow.stateIn with a cancelled scope suspends forever. The expected behavior IMHO is for stateIn to rethrow the cancellation exception of the scope, similar to how scope.async { }.await() and CompleteableDeferred(scope.job) behave on a cancelled scope. This behavior happens not only if the scope is already cancelled, but also if it gets cancelled concurrently with stateIn.

The cause of the issue is that stateIn awaits a CompleteableDeferred that is completed exclusively by a coroutine launched (non-atomically) in the collecting (possibly cancelled) scope.

A possible fix is to bind the CompletableDeferred here to the job of the collecting scope (with CompleteableDeferred(scope.coroutineContext[Job])).

Provide a Reproducer

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

suspend fun main() {
    val flow = flowOf(1, 2, 3)
    val cancelledScope = CoroutineScope(EmptyCoroutineContext).apply { cancel() }
    println("Awaiting stateIn...")
    val stateFlow = flow.stateIn(cancelledScope) // Suspends forever
    println("Done!") // Never printed
}

// prints "Awaiting stateIn..." and hangs forever
@francescotescari francescotescari changed the title Calling Flow.stateIn with a cancelled scope suspends forever. Calling Flow.stateIn with a cancelled scope suspends forever Jan 6, 2025
@dkhalanskyjb
Copy link
Collaborator

I agree both that this is a bug and that your fix is the way to go. Would you like to make a PR? If so, could you also make stateIn fail with a NoSuchElementException("Expected at least one element") if the flow collection completes without emitting a single element?

This was a surprisingly difficult decision. Below are my thoughts.


There's one more bug/feature/behavior of the same form:

flow { }.stateIn(CoroutineScope(EmptyCoroutineContext)) // hangs

We can think of stateIn as shareIn that also keeps track of the "current" emission (the same correspondence as StateFlow itself has with SharedFlow). Let's see how shareIn deals with these issues.

If the scope is cancelled when we attempt to wait for the initial value, it hangs:

val scope = CoroutineScope(EmptyCoroutineScope).apply { cancel() }
val flow = flow {
  emit(1)
  yield()
  emit(2)
}
val result = MutableStateFlow<Int>()
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
result.value = sharedFlow.first() // hangs

If the original flow is empty, attempting to wait for the first value hangs:

val scope = CoroutineScope(EmptyCoroutineScope)
val flow = flow<Int> { }
val result = MutableStateFlow<Int>()
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
result.value = sharedFlow.first() // hangs

From this point of view, it makes sense that stateIn in a cancelled scope or on an empty flow hangs: the sharing has already started before the function even exits, and obtaining the initial value is the first thing that happens during the sharing. If obtaining the shared value hangs, the whole stateIn also hangs.

This is certainly unintuitive, but can be explained in terms of the provided abstractions.

However, there is a crucial detail: stateIn already exhibits behavior that can't be expressed through shareIn:

val scope = CoroutineScope(EmptyCoroutineScope)
val flow = flow<Int> { error("this flow can't be collected") }
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
sharedFlow.first() // cancels `scope` and hangs!
val scope = CoroutineScope(EmptyCoroutineScope)
val flow = flow<Int> { error("this flow can't be collected") }
flow.stateIn(scope) // throws an exception

This behavior was introduced in #2329 without much fanfare, but it already breaks the analogy between shareIn and stateIn in favor of propagating issues with the acquisition of the first element. Not detecting an empty flow or a cancelled scope is inconsistent with that decision.

Then the fix you're proposing does seem reasonable:

  • Launching the procedure of obtaining the value with CoroutineStart.ATOMIC is dangerous: the flow may contain some operations that can not be run after we've witnessed the corresponding scope being cancelled (for example, UI updates can't be performed after the corresponding UI component is destroyed).
  • Without collecting the flow, there is nowhere to obtain the StateFlow value from.
  • Throwing a CancellationException is consistent with what happens if cancellation happens after launch but before the first element is emitted.

@francescotescari
Copy link
Contributor Author

Good point, I completely overlooked the empty case, but completely agree with what you wrote.
I'll open a PR soon with the fix for both cases ⚙️

@francescotescari
Copy link
Contributor Author

Fix has been merged in #4327

@dkhalanskyjb
Copy link
Collaborator

We prefer the bot to close issues manually when the fix enters master (that is, a published release).

svc-squareup-copybara pushed a commit to cashapp/misk that referenced this issue Apr 8, 2025
| Package | Type | Package file | Manager | Update | Change |
|---|---|---|---|---|---|
|
[org.jetbrains.kotlinx:kotlinx-coroutines-test](https://github.com/Kotlin/kotlinx.coroutines)
| dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`1.10.1` -> `1.10.2` |
|
[org.jetbrains.kotlinx:kotlinx-coroutines-slf4j](https://github.com/Kotlin/kotlinx.coroutines)
| dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`1.10.1` -> `1.10.2` |
|
[org.jetbrains.kotlinx:kotlinx-coroutines-core](https://github.com/Kotlin/kotlinx.coroutines)
| dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`1.10.1` -> `1.10.2` |
| [software.amazon.awssdk:sdk-core](https://aws.amazon.com/sdkforjava) |
dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`2.31.16` -> `2.31.17` |
| [software.amazon.awssdk:sqs](https://aws.amazon.com/sdkforjava) |
dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`2.31.16` -> `2.31.17` |
|
[software.amazon.awssdk:dynamodb-enhanced](https://aws.amazon.com/sdkforjava)
| dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`2.31.16` -> `2.31.17` |
| [software.amazon.awssdk:dynamodb](https://aws.amazon.com/sdkforjava) |
dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`2.31.16` -> `2.31.17` |
| [software.amazon.awssdk:aws-core](https://aws.amazon.com/sdkforjava) |
dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`2.31.16` -> `2.31.17` |
| [software.amazon.awssdk:bom](https://aws.amazon.com/sdkforjava) |
dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`2.31.16` -> `2.31.17` |
| [software.amazon.awssdk:auth](https://aws.amazon.com/sdkforjava) |
dependencies | misk/gradle/libs.versions.toml | gradle | patch |
`2.31.16` -> `2.31.17` |

---

### Release Notes

<details>
<summary>Kotlin/kotlinx.coroutines
(org.jetbrains.kotlinx:kotlinx-coroutines-test)</summary>

###
[`v1.10.2`](https://github.com/Kotlin/kotlinx.coroutines/blob/HEAD/CHANGES.md#Version-1102)

[Compare
Source](Kotlin/kotlinx.coroutines@1.10.1...1.10.2)

- Fixed the `kotlinx-coroutines-debug` JAR file including the
`module-info.class` file twice, resulting in failures in various tooling
([#&#8203;4314](Kotlin/kotlinx.coroutines#4314)).
Thanks, [@&#8203;RyuNen344](https://github.com/RyuNen344)!
- Fixed `Flow.stateIn` hanging when the scope is cancelled in advance or
the flow is empty
([#&#8203;4322](Kotlin/kotlinx.coroutines#4322)).
Thanks, [@&#8203;francescotescari](https://github.com/francescotescari)!
- Improved handling of dispatcher failures in `.limitedParallelism`
([#&#8203;4330](Kotlin/kotlinx.coroutines#4330))
and during flow collection
([#&#8203;4272](Kotlin/kotlinx.coroutines#4272)).
- Fixed `runBlocking` failing to run its coroutine to completion in some
cases if its JVM thread got interrupted
([#&#8203;4399](Kotlin/kotlinx.coroutines#4399)).
-   Small tweaks, fixes, and documentation improvements.

</details>

---

### Configuration

📅 **Schedule**: Branch creation - "after 6pm every weekday,before 2am
every weekday" in timezone Australia/Melbourne, Automerge - At any time
(no schedule defined).

🚦 **Automerge**: Disabled by config. Please merge this manually once you
are satisfied.

♻ **Rebasing**: Never, or you tick the rebase/retry checkbox.

👻 **Immortal**: This PR will be recreated if closed unmerged. Get
[config help](https://github.com/renovatebot/renovate/discussions) if
that's undesired.

---

- [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check
this box

---

This PR has been generated by [Renovate
Bot](https://github.com/renovatebot/renovate).

GitOrigin-RevId: 3449fc325ac14573dff5b670e407e99eab23757b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants