-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Flow.catch skips upstream exceptions #4159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Could you provide any reproducer for the scenario where an exception is thrown upstream but is completely ignored? |
In this scenario, it is possible to observe "Catch block" upstream exception without "Catch operator: " being invoked though |
suspend fun main() {
val flow =
flow<String> {
emit("yo")
throw UnsupportedOperationException()
}
.catch { println(".catch caught $it") }
println(flow.first())
} outputs
whilst: suspend fun main() {
val flow =
flow<String> {
emit("yo")
throw UnsupportedOperationException()
}
.catch { println(".catch caught $it") }
.flowOn(Dispatchers.Default)
println(flow.first())
} outputs
whilst: suspend fun main() {
val flow =
flow<String> {
emit("yo")
throw UnsupportedOperationException()
}
.flowOn(Dispatchers.Default)
.catch { println(".catch caught $it") }
println(flow.first())
} outputs
Was about to create a new issue for this but came across this one, is it the same bug? |
@qwwdfsad do you have any insight into what is going on here? |
@nbransby this is a leaking behaviour of multiple aspects: concurrency imposed by It's a bit more complicated than just an omission or a trivial bug and I'm yet to make my mind about this behaviour, its validity and whether we can fix it without breaking even more stuff in the meantime |
Yes, it looks like it is the same bug. |
Have you made your mind up yet? I'm finding the catch operator effectively useless now as it can't be trusted to actually catch upstream exceptions, it seems the closer I move it to the throwing upstream operator the greater likelihood it will catch but obviously this fuzzy behavior is not viable. Another example: suspend fun main() {
val flow = flow { emit("a"); delay(1000); emit("b") }
.flatMapLatest {
flow<String> {
emit("yo")
throw UnsupportedOperationException()
}
}
.catch { println(".catch caught $it") }
println(flow.take(SOME).toList())
} when SOME = 2 we get
but when SOME = 1:
Perhaps the documentation for catch can be amended to reflect the true behavior whilst this issue remains open? https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/catch.html |
Also if you can shed any light on this stacktrace that would be super useful, I tried to create a minimal reproducer but didn't have any luck. It's the same issue: an upstream exception is escaping a
I don't understand the circular reference here but guessing something in // Return exception from upstream or null
@Suppress("NAME_SHADOWING")
internal suspend fun <T> Flow<T>.catchImpl(
collector: FlowCollector<T>
): Throwable? {
var fromDownstream: Throwable? = null
try {
collect {
try {
collector.emit(it)
} catch (e: Throwable) {
fromDownstream = e
throw e
}
}
} catch (e: Throwable) {
// Otherwise, smartcast is impossible
val fromDownstream = fromDownstream
/*
* First check ensures that we catch an original exception, not one rethrown by an operator.
* Seconds check ignores cancellation causes, they cannot be caught.
*/
if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
throw e // Rethrow exceptions from downstream and cancellation causes
} else {
/*
* The exception came from the upstream [semi-] independently.
* For pure failures, when the downstream functions normally, we handle the exception as intended.
* But if the downstream has failed prior to or concurrently
* with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost.
*/
if (fromDownstream == null) {
return e
}
/*
* We consider the upstream exception as the superseding one when both upstream and downstream
* fail, suppressing the downstream exception, and operating similarly to `finally` block with
* the useful addition of adding the original downstream exception to suppressed ones.
*
* That's important for the following scenarios:
* ```
* flow {
* val resource = ...
* try {
* ... emit as well ...
* } finally {
* resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception
* }
* }.catch { } // or retry
* .collect { ... }
* ```
* when *the downstream* throws.
*/
if (e is CancellationException) {
fromDownstream.addSuppressed(e)
throw fromDownstream
} else {
e.addSuppressed(fromDownstream)
throw e
}
}
}
return null
} |
Note: there's some difference between |
As for circular references, here's a compact reproducer that demonstrates them: https://pl.kotl.in/5sUH_6v9F |
Thanks, so in my case its probably the leaky catch missing the upstream exception in the producer which goes on to crash the collectors coroutine scope that leads to the the suppressed JobCancellationException |
My workaround for now is to restrict If I need the exception to propagate across a coroutine boundary introduced by operators such as |
I have this problem too. Now I can't use Also it might be not, or not only, the
if |
Are there any updates regarding this issue? Look like a very critical problem to me, yet it doesn't look like there is a timeline for fixing this. |
It looks like under some race conditions using the Flow.catch operator skips upstream exceptions. For example:
We are using pretty old 1.6.1, but since then the catch operator implementation was not changed.
From the source code of the catch operator it looks like the behaviour is possible when both downstream and upstream exceptions are caught, which can happen in race conditions.
We believe it's totally unclear from the documentation and especially from the semantics of 'catch' operator, that any upstream exception could skip the operator. For now we had to switch channelFlow builder (instead of flow) and to use try-catch.
Could it be that the code here should be replaced with "return e"?
The text was updated successfully, but these errors were encountered: